[ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=80533&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-80533
 ]

ASF GitHub Bot logged work on BEAM-3326:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Mar/18 20:30
            Start Date: 14/Mar/18 20:30
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #4825: [BEAM-3326] Add an 
InProcess SdkHarness Rule
URL: https://github.com/apache/beam/pull/4825
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
index 9d443427dc1..5a82a32a0b4 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolService.java
@@ -18,7 +18,10 @@
 package org.apache.beam.runners.fnexecution.control;
 
 import io.grpc.stub.StreamObserver;
+import java.util.Collection;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
 import org.apache.beam.runners.fnexecution.FnService;
@@ -31,6 +34,8 @@
   private static final Logger LOGGER = 
LoggerFactory.getLogger(FnApiControlClientPoolService.class);
 
   private final BlockingQueue<FnApiControlClient> clientPool;
+  private final Collection<FnApiControlClient> vendedClients = new 
CopyOnWriteArrayList<>();
+  private AtomicBoolean closed = new AtomicBoolean();
 
   private FnApiControlClientPoolService(BlockingQueue<FnApiControlClient> 
clientPool) {
     this.clientPool = clientPool;
@@ -61,6 +66,12 @@ public static FnApiControlClientPoolService 
offeringClientsToPool(
     LOGGER.info("Beam Fn Control client connected.");
     FnApiControlClient newClient = 
FnApiControlClient.forRequestObserver(requestObserver);
     try {
+      // Add the client to the pool of vended clients before making it 
available - we should close
+      // the client when we close even if no one has picked it up yet. This 
can occur after the
+      // service is closed, in which case the client will be discarded when 
the service is
+      // discarded, which should be performed by a call to #shutdownNow. The 
remote caller must be
+      // able to handle an unexpectedly terminated connection.
+      vendedClients.add(newClient);
       clientPool.put(newClient);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
@@ -70,7 +81,11 @@ public static FnApiControlClientPoolService 
offeringClientsToPool(
   }
 
   @Override
-  public void close() throws Exception {
-    // The clients in the pool are owned by the consumer, which is responsible 
for closing them
+  public void close() {
+    if (!closed.getAndSet(true)) {
+      for (FnApiControlClient vended : vendedClients) {
+        vended.close();
+      }
+    }
   }
 }
diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
index 00b2a9afb92..a13d3d7d38c 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/data/GrpcDataService.java
@@ -53,8 +53,7 @@
     implements FnService, FnDataService {
   private static final Logger LOG = 
LoggerFactory.getLogger(GrpcDataService.class);
 
-  public static GrpcDataService create(
-      ExecutorService executor) {
+  public static GrpcDataService create(ExecutorService executor) {
     return new GrpcDataService(executor);
   }
 
@@ -62,6 +61,9 @@ public static GrpcDataService create(
   /**
    * A collection of multiplexers which are not used to send data. A handle to 
these multiplexers is
    * maintained in order to perform an orderly shutdown.
+   *
+   * <p>TODO: (BEAM-3811) Replace with some cancellable collection, to ensure 
that new clients of a
+   * closed {@link GrpcDataService} are closed with that {@link 
GrpcDataService}.
    */
   private final Queue<BeamFnDataGrpcMultiplexer> additionalMultiplexers;
 
@@ -105,7 +107,9 @@ public void close() throws Exception {
         // Shutdown remaining clients
       }
     }
-    connectedClient.get().close();
+    if (!connectedClient.isCancelled()) {
+      connectedClient.get().close();
+    }
   }
 
   @Override
@@ -130,16 +134,17 @@ public void close() throws Exception {
         throw new RuntimeException(e.getCause());
       }
     } else {
-      executor.submit(() -> {
-        try {
-          connectedClient.get().registerConsumer(inputLocation, observer);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          throw new RuntimeException(e);
-        } catch (ExecutionException e) {
-          throw new RuntimeException(e.getCause());
-        }
-      });
+      executor.submit(
+          () -> {
+            try {
+              connectedClient.get().registerConsumer(inputLocation, observer);
+            } catch (InterruptedException e) {
+              Thread.currentThread().interrupt();
+              throw new RuntimeException(e);
+            } catch (ExecutionException e) {
+              throw new RuntimeException(e.getCause());
+            }
+          });
     }
     return observer;
   }
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java
new file mode 100644
index 00000000000..530b919187a
--- /dev/null
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/InProcessSdkHarness.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.runners.fnexecution;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import io.grpc.ManagedChannel;
+import io.grpc.inprocess.InProcessChannelBuilder;
+import java.io.IOException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import org.apache.beam.fn.harness.FnHarness;
+import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
+import org.apache.beam.runners.fnexecution.control.FnApiControlClient;
+import 
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService;
+import org.apache.beam.runners.fnexecution.control.SdkHarnessClient;
+import org.apache.beam.runners.fnexecution.data.GrpcDataService;
+import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
+import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
+import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
+import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.TestRule;
+
+/**
+ * A {@link TestRule} which creates a {@link FnHarness} in a thread, services 
required for that
+ * {@link FnHarness} to properly execute, and provides access to the 
associated client and harness
+ * during test execution.
+ */
+public class InProcessSdkHarness extends ExternalResource implements TestRule {
+
+  public static InProcessSdkHarness create() {
+    return new InProcessSdkHarness();
+  }
+
+  private ExecutorService executor;
+  private GrpcFnServer<GrpcLoggingService> loggingServer;
+  private GrpcFnServer<GrpcDataService> dataServer;
+  private GrpcFnServer<FnApiControlClientPoolService> controlServer;
+
+  private SdkHarnessClient client;
+
+  private InProcessSdkHarness() {}
+
+  public SdkHarnessClient client() {
+    return client;
+  }
+
+  public ApiServiceDescriptor dataEndpoint() {
+    return dataServer.getApiServiceDescriptor();
+  }
+
+  protected void before() throws IOException, InterruptedException {
+    InProcessServerFactory serverFactory = InProcessServerFactory.create();
+    executor = Executors.newCachedThreadPool(new 
ThreadFactoryBuilder().setDaemon(true).build());
+    SynchronousQueue<FnApiControlClient> clientPool = new SynchronousQueue<>();
+    FnApiControlClientPoolService clientPoolService =
+        FnApiControlClientPoolService.offeringClientsToPool(clientPool);
+
+    loggingServer =
+        GrpcFnServer.allocatePortAndCreateFor(
+            GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), 
serverFactory);
+    dataServer =
+        
GrpcFnServer.allocatePortAndCreateFor(GrpcDataService.create(executor), 
serverFactory);
+    controlServer = GrpcFnServer.allocatePortAndCreateFor(clientPoolService, 
serverFactory);
+
+    executor.submit(
+        () -> {
+          FnHarness.main(
+              PipelineOptionsFactory.create(),
+              loggingServer.getApiServiceDescriptor(),
+              controlServer.getApiServiceDescriptor(),
+              new ManagedChannelFactory() {
+                @Override
+                public ManagedChannel forDescriptor(ApiServiceDescriptor 
apiServiceDescriptor) {
+                  return 
InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
+                }
+              },
+              StreamObserverFactory.direct());
+          return null;
+        });
+
+    client = SdkHarnessClient.usingFnApiClient(clientPool.take(), 
dataServer.getService());
+  }
+
+  protected void after() {
+    try (AutoCloseable logs = loggingServer;
+        AutoCloseable data = dataServer;
+        AutoCloseable ctl = controlServer;
+        AutoCloseable c = client; ) {
+      executor.shutdownNow();
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
index 8f4a09f28fb..98f258fcb41 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/FnApiControlClientPoolServiceTest.java
@@ -23,12 +23,23 @@
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
 
+import io.grpc.inprocess.InProcessChannelBuilder;
 import io.grpc.stub.StreamObserver;
+import java.io.IOException;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionRequest;
+import org.apache.beam.model.fnexecution.v1.BeamFnControlGrpc;
+import org.apache.beam.runners.fnexecution.GrpcFnServer;
+import org.apache.beam.runners.fnexecution.InProcessServerFactory;
 import org.apache.beam.sdk.util.MoreFutures;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -40,8 +51,23 @@
   // For ease of straight-line testing, we use a LinkedBlockingQueue; in 
practice a SynchronousQueue
   // for matching incoming connections and server threads is likely.
   private final BlockingQueue<FnApiControlClient> pool = new 
LinkedBlockingQueue<>();
-  private FnApiControlClientPoolService controlService =
+  private final FnApiControlClientPoolService controlService =
       FnApiControlClientPoolService.offeringClientsToPool(pool);
+  private GrpcFnServer<FnApiControlClientPoolService> server;
+  private BeamFnControlGrpc.BeamFnControlStub stub;
+
+  @Before
+  public void setup() throws IOException {
+    server = GrpcFnServer.allocatePortAndCreateFor(controlService, 
InProcessServerFactory.create());
+    stub =
+        BeamFnControlGrpc.newStub(
+            
InProcessChannelBuilder.forName(server.getApiServiceDescriptor().getUrl()).build());
+  }
+
+  @After
+  public void teardown() throws Exception {
+    server.close();
+  }
 
   @Test
   public void testIncomingConnection() throws Exception {
@@ -63,4 +89,34 @@ public void testIncomingConnection() throws Exception {
         
BeamFnApi.InstructionResponse.newBuilder().setInstructionId(id).build());
     MoreFutures.get(responseFuture);
   }
+
+  @Test
+  public void testCloseCompletesClients() throws Exception {
+    CountDownLatch latch = new CountDownLatch(1);
+    AtomicBoolean sawComplete = new AtomicBoolean();
+    stub.control(
+        new StreamObserver<InstructionRequest>() {
+          @Override
+          public void onNext(InstructionRequest value) {
+            Assert.fail("Should never see a request");
+          }
+
+          @Override
+          public void onError(Throwable t) {
+            latch.countDown();
+          }
+
+          @Override
+          public void onCompleted() {
+            sawComplete.set(true);
+            latch.countDown();
+          }
+        });
+
+    pool.take();
+    server.close();
+
+    latch.await();
+    assertThat(sawComplete.get(), is(true));
+  }
 }
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
index 0a18ff6844f..1c4972b735d 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
@@ -26,51 +26,36 @@
 import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableMap;
-import io.grpc.ManagedChannel;
-import io.grpc.inprocess.InProcessChannelBuilder;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
-import org.apache.beam.fn.harness.FnHarness;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.RemoteGrpcPort;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.Target;
 import org.apache.beam.model.pipeline.v1.Endpoints;
-import org.apache.beam.model.pipeline.v1.Endpoints.ApiServiceDescriptor;
 import org.apache.beam.model.pipeline.v1.RunnerApi;
 import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
 import org.apache.beam.runners.core.construction.CoderTranslation;
 import org.apache.beam.runners.core.construction.PipelineTranslation;
-import org.apache.beam.runners.fnexecution.GrpcFnServer;
-import org.apache.beam.runners.fnexecution.InProcessServerFactory;
+import org.apache.beam.runners.fnexecution.InProcessSdkHarness;
 import 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient.ActiveBundle;
 import 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient.BundleProcessor;
 import 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient.RemoteInputDestination;
 import 
org.apache.beam.runners.fnexecution.control.SdkHarnessClient.RemoteOutputReceiver;
 import org.apache.beam.runners.fnexecution.data.FnDataService;
-import org.apache.beam.runners.fnexecution.data.GrpcDataService;
-import org.apache.beam.runners.fnexecution.logging.GrpcLoggingService;
-import org.apache.beam.runners.fnexecution.logging.Slf4jLogWriter;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.LengthPrefixCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.fn.channel.ManagedChannelFactory;
 import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
 import org.apache.beam.sdk.fn.data.InboundDataClient;
 import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
 import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
-import org.apache.beam.sdk.fn.stream.StreamObserverFactory;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
@@ -82,6 +67,7 @@
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -95,6 +81,8 @@
   @Mock public FnApiControlClient fnApiControlClient;
   @Mock public FnDataService dataService;
 
+  @Rule public InProcessSdkHarness harness = InProcessSdkHarness.create();
+
   private SdkHarnessClient sdkHarnessClient;
 
   @Before
@@ -177,38 +165,8 @@ public void testNewBundleNoDataDoesNotCrash() throws 
Exception {
 
   @Test
   public void testNewBundleAndProcessElements() throws Exception {
-    InProcessServerFactory serverFactory = InProcessServerFactory.create();
-    ExecutorService executor = Executors.newCachedThreadPool();
-
-    final GrpcFnServer<GrpcLoggingService> loggingServer =
-        GrpcFnServer.allocatePortAndCreateFor(
-            GrpcLoggingService.forWriter(Slf4jLogWriter.getDefault()), 
serverFactory);
-    GrpcDataService grpcDataService = GrpcDataService.create(executor);
-    GrpcFnServer<GrpcDataService> dataServer =
-        GrpcFnServer.allocatePortAndCreateFor(grpcDataService, serverFactory);
-
-    SynchronousQueue<FnApiControlClient> clientPool = new SynchronousQueue<>();
-    FnApiControlClientPoolService clientPoolService =
-        FnApiControlClientPoolService.offeringClientsToPool(clientPool);
-    final GrpcFnServer<FnApiControlClientPoolService> controlServer =
-        GrpcFnServer.allocatePortAndCreateFor(clientPoolService, 
serverFactory);
-
-    Future<Void> harness = executor.submit(() -> {
-      FnHarness.main(PipelineOptionsFactory.create(),
-          loggingServer.getApiServiceDescriptor(),
-          controlServer.getApiServiceDescriptor(),
-          new ManagedChannelFactory() {
-            @Override
-            public ManagedChannel forDescriptor(ApiServiceDescriptor 
apiServiceDescriptor) {
-              return 
InProcessChannelBuilder.forName(apiServiceDescriptor.getUrl()).build();
-            }
-          },
-          StreamObserverFactory.direct());
-      return null;
-    });
-
     ProcessBundleDescriptor processBundleDescriptor =
-        getProcessBundleDescriptor(dataServer.getApiServiceDescriptor());
+        getProcessBundleDescriptor(harness.dataEndpoint());
 
     BeamFnApi.Target sdkGrpcReadTarget =
         BeamFnApi.Target.newBuilder()
@@ -225,7 +183,7 @@ public ManagedChannel forDescriptor(ApiServiceDescriptor 
apiServiceDescriptor) {
             .setPrimitiveTransformReference("write")
             .build();
 
-    SdkHarnessClient client = 
SdkHarnessClient.usingFnApiClient(clientPool.take(), grpcDataService);
+    SdkHarnessClient client = harness.client();
     BundleProcessor<String> processor =
         client.getProcessor(
             processBundleDescriptor,
@@ -261,7 +219,6 @@ public ManagedChannel forDescriptor(ApiServiceDescriptor 
apiServiceDescriptor) {
             WindowedValue.valueInGlobalWindow("spam"),
             WindowedValue.valueInGlobalWindow("ham"),
             WindowedValue.valueInGlobalWindow("eggs")));
-    executor.shutdownNow();
   }
 
   private BeamFnApi.ProcessBundleDescriptor getProcessBundleDescriptor(


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 80533)
    Time Spent: 3h 10m  (was: 3h)

> Execute a Stage via the portability framework in the ReferenceRunner
> --------------------------------------------------------------------
>
>                 Key: BEAM-3326
>                 URL: https://issues.apache.org/jira/browse/BEAM-3326
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-core
>            Reporter: Thomas Groh
>            Assignee: Thomas Groh
>            Priority: Major
>              Labels: portability
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to