[ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85299&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85299
 ]

ASF GitHub Bot logged work on BEAM-3326:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Mar/18 15:44
            Start Date: 28/Mar/18 15:44
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #4963: [BEAM-3326] 
Abstract away closing the inbound receiver, waiting for the bundle to finish, 
waiting for outbound to complete within the ActiveBundle.
URL: https://github.com/apache/beam/pull/4963
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
index f71616a9aae..de1aa57fe32 100644
--- 
a/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
+++ 
b/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
@@ -38,6 +38,7 @@
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.fn.data.InboundDataClient;
 import org.apache.beam.sdk.fn.data.LogicalEndpoint;
+import org.apache.beam.sdk.util.MoreFutures;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -96,6 +97,13 @@ private BundleProcessor(
      *
      * <p>The input channels for the returned {@link ActiveBundle} are derived 
from the instructions
      * in the {@link BeamFnApi.ProcessBundleDescriptor}.
+     *
+     * <p>NOTE: It is important to {@link #close()} each bundle after all 
elements are emitted.
+     * <pre>{@code
+     * try (ActiveBundle<InputT> bundle = SdkHarnessClient.newBundle(...)) {
+     *   // send all elements
+     * }
+     * }</pre>
      */
     public ActiveBundle<T> newBundle(
         Map<BeamFnApi.Target, RemoteOutputReceiver<?>> outputReceivers) {
@@ -133,7 +141,7 @@ private BundleProcessor(
           fnApiDataService.send(
               LogicalEndpoint.of(bundleId, remoteInput.getTarget()), 
remoteInput.getCoder());
 
-      return ActiveBundle.create(bundleId, specificResponse, dataReceiver, 
outputClients);
+      return new ActiveBundle(bundleId, specificResponse, dataReceiver, 
outputClients);
     }
 
     private <OutputT> InboundDataClient attachReceiver(
@@ -146,22 +154,92 @@ private BundleProcessor(
   }
 
   /** An active bundle for a particular {@link 
BeamFnApi.ProcessBundleDescriptor}. */
-  @AutoValue
-  public abstract static class ActiveBundle<InputT> {
-    public abstract String getBundleId();
-
-    public abstract CompletionStage<BeamFnApi.ProcessBundleResponse> 
getBundleResponse();
+  public static class ActiveBundle<InputT> implements AutoCloseable {
+    private final String bundleId;
+    private final CompletionStage<BeamFnApi.ProcessBundleResponse> response;
+    private final CloseableFnDataReceiver<WindowedValue<InputT>> inputReceiver;
+    private final Map<BeamFnApi.Target, InboundDataClient> outputClients;
 
-    public abstract CloseableFnDataReceiver<WindowedValue<InputT>> 
getInputReceiver();
-    public abstract Map<BeamFnApi.Target, InboundDataClient> 
getOutputClients();
-
-    public static <InputT> ActiveBundle<InputT> create(
+    private ActiveBundle(
         String bundleId,
         CompletionStage<BeamFnApi.ProcessBundleResponse> response,
-        CloseableFnDataReceiver<WindowedValue<InputT>> dataReceiver,
+        CloseableFnDataReceiver<WindowedValue<InputT>> inputReceiver,
         Map<BeamFnApi.Target, InboundDataClient> outputClients) {
-      return new AutoValue_SdkHarnessClient_ActiveBundle<>(
-          bundleId, response, dataReceiver, outputClients);
+      this.bundleId = bundleId;
+      this.response = response;
+      this.inputReceiver = inputReceiver;
+      this.outputClients = outputClients;
+    }
+
+    /**
+     * Returns an id used to represent this bundle.
+     */
+    public String getBundleId() {
+      return bundleId;
+    }
+
+    /**
+     * Returns a {@link FnDataReceiver receiver} which consumes input elements 
forwarding them
+     * to the SDK. When
+     */
+    public FnDataReceiver<WindowedValue<InputT>> getInputReceiver() {
+      return inputReceiver;
+    }
+
+    /**
+     * Blocks till bundle processing is finished. This is comprised of:
+     * <ul>
+     *   <li>closing the {@link #getInputReceiver() input receiver}.</li>
+     *   <li>waiting for the SDK to say that processing the bundle is 
finished.</li>
+     *   <li>waiting for all inbound data clients to complete</li>
+     * </ul>
+     *
+     * <p>This method will throw an exception if bundle processing has failed.
+     * {@link Throwable#getSuppressed()} will return all the reasons as to why 
processing has
+     * failed.
+     */
+    @Override
+    public void close() throws Exception {
+      Exception exception = null;
+      try {
+        inputReceiver.close();
+      } catch (Exception e) {
+        exception = e;
+      }
+      try {
+        // We don't have to worry about the completion stage.
+        if (exception == null) {
+          MoreFutures.get(response);
+        } else {
+          // TODO: Handle aborting the bundle being processed.
+          throw new IllegalStateException("Processing bundle failed, TODO: 
abort bundle.");
+        }
+      } catch (Exception e) {
+        if (exception == null) {
+          exception = e;
+        } else {
+          exception.addSuppressed(e);
+        }
+      }
+      for (InboundDataClient outputClient : outputClients.values()) {
+        try {
+          // If we failed processing this bundle, we should cancel all inbound 
data.
+          if (exception == null) {
+            outputClient.awaitCompletion();
+          } else {
+            outputClient.cancel();
+          }
+        } catch (Exception e) {
+          if (exception == null) {
+            exception = e;
+          } else {
+            exception.addSuppressed(e);
+          }
+        }
+      }
+      if (exception != null) {
+        throw exception;
+      }
     }
   }
 
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
index dd2f5af1c10..962cb790b27 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java
@@ -197,21 +197,10 @@ public void process(ProcessContext ctxt) {
           targetCoder.getKey(),
           RemoteOutputReceiver.of(targetCoder.getValue(), 
outputContents::add));
     }
-    ActiveBundle<byte[]> bundle = processor.newBundle(outputReceivers);
     // The impulse example
-    bundle.getInputReceiver().accept(WindowedValue.valueInGlobalWindow(new 
byte[0]));
-    bundle.getInputReceiver().close();
-    bundle
-        .getOutputClients()
-        .values()
-        .forEach(
-            inboundDataClient -> {
-              try {
-                inboundDataClient.awaitCompletion();
-              } catch (Exception e) {
-                throw new IllegalStateException(e);
-              }
-            });
+    try (ActiveBundle<byte[]> bundle = processor.newBundle(outputReceivers)) {
+      bundle.getInputReceiver().accept(WindowedValue.valueInGlobalWindow(new 
byte[0]));
+    }
     for (Collection<WindowedValue<?>> windowedValues : outputValues.values()) {
       assertThat(
           windowedValues,
diff --git 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
index 33a47d3a2c0..dc97cebb1d1 100644
--- 
a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
+++ 
b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClientTest.java
@@ -19,10 +19,15 @@
 
 import static com.google.common.collect.Iterables.getOnlyElement;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
 import com.google.common.collect.ImmutableMap;
@@ -32,6 +37,7 @@
 import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.InstructionResponse;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi.ProcessBundleDescriptor;
@@ -53,6 +59,7 @@
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
+import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.fn.data.InboundDataClient;
 import org.apache.beam.sdk.fn.data.RemoteGrpcPortRead;
 import org.apache.beam.sdk.fn.data.RemoteGrpcPortWrite;
@@ -61,7 +68,6 @@
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow.Coder;
-import org.apache.beam.sdk.util.MoreFutures;
 import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.values.TupleTag;
@@ -148,19 +154,18 @@ public void testNewBundleNoDataDoesNotCrash() throws 
Exception {
             descriptor, RemoteInputDestination.of(coder, 
Target.getDefaultInstance()));
     when(dataService.send(any(), 
eq(coder))).thenReturn(mock(CloseableFnDataReceiver.class));
 
-    ActiveBundle<String> activeBundle =
-        processor.newBundle(Collections.emptyMap());
-
-    // Correlating the ProcessBundleRequest and ProcessBundleResponse is owned 
by the underlying
-    // FnApiControlClient. The SdkHarnessClient owns just wrapping the request 
and unwrapping
-    // the response.
-    //
-    // Currently there are no fields so there's nothing to check. This test is 
formulated
-    // to match the pattern it should have if/when the response is meaningful.
-    BeamFnApi.ProcessBundleResponse response = 
BeamFnApi.ProcessBundleResponse.getDefaultInstance();
-    processBundleResponseFuture.complete(
-        
BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
-    MoreFutures.get(activeBundle.getBundleResponse());
+    try (ActiveBundle<String> activeBundle = 
processor.newBundle(Collections.emptyMap())) {
+      // Correlating the ProcessBundleRequest and ProcessBundleResponse is 
owned by the underlying
+      // FnApiControlClient. The SdkHarnessClient owns just wrapping the 
request and unwrapping
+      // the response.
+      //
+      // Currently there are no fields so there's nothing to check. This test 
is formulated
+      // to match the pattern it should have if/when the response is 
meaningful.
+      BeamFnApi.ProcessBundleResponse response = 
BeamFnApi.ProcessBundleResponse
+          .getDefaultInstance();
+      processBundleResponseFuture.complete(
+          
BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
+    }
   }
 
   @Test
@@ -192,25 +197,19 @@ public void testNewBundleAndProcessElements() throws 
Exception {
                 sdkGrpcReadTarget));
 
     Collection<WindowedValue<String>> outputs = new ArrayList<>();
-    ActiveBundle<?> activeBundle =
+    try (ActiveBundle<String> activeBundle =
         processor.newBundle(
             Collections.singletonMap(
                 sdkGrpcWriteTarget,
                 RemoteOutputReceiver.of(
                     FullWindowedValueCoder.of(
                         LengthPrefixCoder.of(StringUtf8Coder.of()), 
Coder.INSTANCE),
-                    outputs::add)));
-
-    try (CloseableFnDataReceiver<WindowedValue<String>> bundleInputReceiver =
-        (CloseableFnDataReceiver) activeBundle.getInputReceiver()) {
+                    outputs::add)))) {
+      FnDataReceiver<WindowedValue<String>> bundleInputReceiver = 
activeBundle.getInputReceiver();
       bundleInputReceiver.accept(WindowedValue.valueInGlobalWindow("foo"));
       bundleInputReceiver.accept(WindowedValue.valueInGlobalWindow("bar"));
       bundleInputReceiver.accept(WindowedValue.valueInGlobalWindow("baz"));
     }
-    MoreFutures.get(activeBundle.getBundleResponse());
-    for (InboundDataClient outputClient : 
activeBundle.getOutputClients().values()) {
-      outputClient.awaitCompletion();
-    }
 
     // The bundle can be a simple function of some sort, but needs to be 
complete.
     assertThat(
@@ -221,6 +220,135 @@ public void testNewBundleAndProcessElements() throws 
Exception {
             WindowedValue.valueInGlobalWindow("eggs")));
   }
 
+  @Test
+  public void handleCleanupWhenInputSenderFails() throws Exception {
+    String descriptorId1 = "descriptor1";
+    Exception testException = new Exception();
+
+    InboundDataClient mockOutputReceiver = mock(InboundDataClient.class);
+    CloseableFnDataReceiver mockInputSender = 
mock(CloseableFnDataReceiver.class);
+
+    ProcessBundleDescriptor descriptor =
+        ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build();
+    CompletableFuture<InstructionResponse> processBundleResponseFuture =
+        new CompletableFuture<>();
+    when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
+        .thenReturn(new CompletableFuture<>())
+        .thenReturn(processBundleResponseFuture);
+
+    FullWindowedValueCoder<String> coder =
+        FullWindowedValueCoder.of(StringUtf8Coder.of(), Coder.INSTANCE);
+    BundleProcessor<String> processor =
+        sdkHarnessClient.getProcessor(
+            descriptor, RemoteInputDestination.of(coder, 
Target.getDefaultInstance()));
+    when(dataService.receive(any(), any(), 
any())).thenReturn(mockOutputReceiver);
+    when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender);
+
+    doThrow(testException).when(mockInputSender).close();
+
+    RemoteOutputReceiver mockRemoteOutputReceiver = 
mock(RemoteOutputReceiver.class);
+
+    try {
+      try (ActiveBundle<String> activeBundle = processor.newBundle(
+          ImmutableMap.of(Target.getDefaultInstance(), 
mockRemoteOutputReceiver))) {
+        // We shouldn't be required to complete the process bundle response 
future.
+      }
+      fail("Exception expected");
+    } catch (Exception e) {
+      assertEquals(testException, e);
+
+      verify(mockOutputReceiver).cancel();
+      verifyNoMoreInteractions(mockOutputReceiver);
+    }
+  }
+
+  @Test
+  public void handleCleanupWhenProcessingBundleFails() throws Exception {
+    String descriptorId1 = "descriptor1";
+    Exception testException = new Exception();
+
+    InboundDataClient mockOutputReceiver = mock(InboundDataClient.class);
+    CloseableFnDataReceiver mockInputSender = 
mock(CloseableFnDataReceiver.class);
+
+    ProcessBundleDescriptor descriptor =
+        ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build();
+    CompletableFuture<InstructionResponse> processBundleResponseFuture =
+        new CompletableFuture<>();
+    when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
+        .thenReturn(new CompletableFuture<>())
+        .thenReturn(processBundleResponseFuture);
+
+    FullWindowedValueCoder<String> coder =
+        FullWindowedValueCoder.of(StringUtf8Coder.of(), Coder.INSTANCE);
+    BundleProcessor<String> processor =
+        sdkHarnessClient.getProcessor(
+            descriptor, RemoteInputDestination.of(coder, 
Target.getDefaultInstance()));
+    when(dataService.receive(any(), any(), 
any())).thenReturn(mockOutputReceiver);
+    when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender);
+
+    RemoteOutputReceiver mockRemoteOutputReceiver = 
mock(RemoteOutputReceiver.class);
+
+    try {
+      try (ActiveBundle<String> activeBundle = processor.newBundle(
+          ImmutableMap.of(Target.getDefaultInstance(), 
mockRemoteOutputReceiver))) {
+        processBundleResponseFuture.completeExceptionally(testException);
+      }
+      fail("Exception expected");
+    } catch (ExecutionException e) {
+      assertEquals(testException, e.getCause());
+
+      verify(mockOutputReceiver).cancel();
+      verifyNoMoreInteractions(mockOutputReceiver);
+    }
+  }
+
+  @Test
+  public void handleCleanupWhenAwaitingOnClosingOutputReceivers() throws 
Exception {
+    String descriptorId1 = "descriptor1";
+    Exception testException = new Exception();
+
+    InboundDataClient mockOutputReceiver = mock(InboundDataClient.class);
+    CloseableFnDataReceiver mockInputSender = 
mock(CloseableFnDataReceiver.class);
+
+    ProcessBundleDescriptor descriptor =
+        ProcessBundleDescriptor.newBuilder().setId(descriptorId1).build();
+    CompletableFuture<InstructionResponse> processBundleResponseFuture =
+        new CompletableFuture<>();
+    when(fnApiControlClient.handle(any(BeamFnApi.InstructionRequest.class)))
+        .thenReturn(new CompletableFuture<>())
+        .thenReturn(processBundleResponseFuture);
+
+    FullWindowedValueCoder<String> coder =
+        FullWindowedValueCoder.of(StringUtf8Coder.of(), Coder.INSTANCE);
+    BundleProcessor<String> processor =
+        sdkHarnessClient.getProcessor(
+            descriptor, RemoteInputDestination.of(coder, 
Target.getDefaultInstance()));
+    when(dataService.receive(any(), any(), 
any())).thenReturn(mockOutputReceiver);
+    when(dataService.send(any(), eq(coder))).thenReturn(mockInputSender);
+    doThrow(testException).when(mockOutputReceiver).awaitCompletion();
+
+    RemoteOutputReceiver mockRemoteOutputReceiver = 
mock(RemoteOutputReceiver.class);
+
+    try {
+      try (ActiveBundle<String> activeBundle = processor.newBundle(
+          ImmutableMap.of(Target.getDefaultInstance(), 
mockRemoteOutputReceiver))) {
+        // Correlating the ProcessBundleRequest and ProcessBundleResponse is 
owned by the underlying
+        // FnApiControlClient. The SdkHarnessClient owns just wrapping the 
request and unwrapping
+        // the response.
+        //
+        // Currently there are no fields so there's nothing to check. This 
test is formulated
+        // to match the pattern it should have if/when the response is 
meaningful.
+        BeamFnApi.ProcessBundleResponse response = 
BeamFnApi.ProcessBundleResponse
+            .getDefaultInstance();
+        processBundleResponseFuture.complete(
+            
BeamFnApi.InstructionResponse.newBuilder().setProcessBundle(response).build());
+      }
+      fail("Exception expected");
+    } catch (Exception e) {
+      assertEquals(testException, e);
+    }
+  }
+
   private BeamFnApi.ProcessBundleDescriptor getProcessBundleDescriptor(
       Endpoints.ApiServiceDescriptor endpoint) throws IOException {
     Pipeline userPipeline = Pipeline.create();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 85299)
    Time Spent: 4h 20m  (was: 4h 10m)

> Execute a Stage via the portability framework in the ReferenceRunner
> --------------------------------------------------------------------
>
>                 Key: BEAM-3326
>                 URL: https://issues.apache.org/jira/browse/BEAM-3326
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-core
>            Reporter: Thomas Groh
>            Assignee: Thomas Groh
>            Priority: Major
>              Labels: portability
>          Time Spent: 4h 20m
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to