[ 
https://issues.apache.org/jira/browse/BEAM-3326?focusedWorklogId=85347&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-85347
 ]

ASF GitHub Bot logged work on BEAM-3326:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 28/Mar/18 18:35
            Start Date: 28/Mar/18 18:35
    Worklog Time Spent: 10m 
      Work Description: lukecwik commented on a change in pull request #4963: 
[BEAM-3326] Abstract away closing the inbound receiver, waiting for the bundle 
to finish, waiting for outbound to complete within the ActiveBundle.
URL: https://github.com/apache/beam/pull/4963#discussion_r177849543
 
 

 ##########
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
 ##########
 @@ -146,22 +154,92 @@ private BundleProcessor(
   }
 
   /** An active bundle for a particular {@link 
BeamFnApi.ProcessBundleDescriptor}. */
-  @AutoValue
-  public abstract static class ActiveBundle<InputT> {
-    public abstract String getBundleId();
-
-    public abstract CompletionStage<BeamFnApi.ProcessBundleResponse> 
getBundleResponse();
+  public static class ActiveBundle<InputT> implements AutoCloseable {
+    private final String bundleId;
+    private final CompletionStage<BeamFnApi.ProcessBundleResponse> response;
+    private final CloseableFnDataReceiver<WindowedValue<InputT>> inputReceiver;
+    private final Map<BeamFnApi.Target, InboundDataClient> outputClients;
 
-    public abstract CloseableFnDataReceiver<WindowedValue<InputT>> 
getInputReceiver();
-    public abstract Map<BeamFnApi.Target, InboundDataClient> 
getOutputClients();
-
-    public static <InputT> ActiveBundle<InputT> create(
+    private ActiveBundle(
         String bundleId,
         CompletionStage<BeamFnApi.ProcessBundleResponse> response,
-        CloseableFnDataReceiver<WindowedValue<InputT>> dataReceiver,
+        CloseableFnDataReceiver<WindowedValue<InputT>> inputReceiver,
         Map<BeamFnApi.Target, InboundDataClient> outputClients) {
-      return new AutoValue_SdkHarnessClient_ActiveBundle<>(
-          bundleId, response, dataReceiver, outputClients);
+      this.bundleId = bundleId;
+      this.response = response;
+      this.inputReceiver = inputReceiver;
+      this.outputClients = outputClients;
+    }
+
+    /**
+     * Returns an id used to represent this bundle.
+     */
+    public String getBundleId() {
+      return bundleId;
+    }
+
+    /**
+     * Returns a {@link FnDataReceiver receiver} which consumes input elements 
forwarding them
+     * to the SDK. When
+     */
+    public FnDataReceiver<WindowedValue<InputT>> getInputReceiver() {
+      return inputReceiver;
+    }
+
+    /**
+     * Blocks till bundle processing is finished. This is comprised of:
+     * <ul>
+     *   <li>closing the {@link #getInputReceiver() input receiver}.</li>
+     *   <li>waiting for the SDK to say that processing the bundle is 
finished.</li>
+     *   <li>waiting for all inbound data clients to complete</li>
+     * </ul>
+     *
+     * <p>This method will throw an exception if bundle processing has failed.
+     * {@link Throwable#getSuppressed()} will return all the reasons as to why 
processing has
+     * failed.
+     */
+    @Override
+    public void close() throws Exception {
+      Exception exception = null;
+      try {
+        inputReceiver.close();
+      } catch (Exception e) {
+        exception = e;
+      }
+      try {
+        // We don't have to worry about the completion stage.
+        if (exception == null) {
+          MoreFutures.get(response);
+        } else {
+          // TODO: Handle aborting the bundle being processed.
+          throw new IllegalStateException("Processing bundle failed, TODO: 
abort bundle.");
+        }
+      } catch (Exception e) {
+        if (exception == null) {
+          exception = e;
+        } else {
+          exception.addSuppressed(e);
+        }
+      }
+      for (InboundDataClient outputClient : outputClients.values()) {
+        try {
+          // If we failed processing this bundle, we should cancel all inbound 
data.
+          if (exception == null) {
+            outputClient.awaitCompletion();
 
 Review comment:
   We await completion on all outbound clients.
   
   No, the bundle result represents to the best knowledge that the SDK did 
everything it needed to. There can still be wire transfer/network/decoding 
failures that the SDK wouldn't be aware of after sending bundle process 
completion.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 85347)
    Time Spent: 5h  (was: 4h 50m)

> Execute a Stage via the portability framework in the ReferenceRunner
> --------------------------------------------------------------------
>
>                 Key: BEAM-3326
>                 URL: https://issues.apache.org/jira/browse/BEAM-3326
>             Project: Beam
>          Issue Type: New Feature
>          Components: runner-core
>            Reporter: Thomas Groh
>            Assignee: Thomas Groh
>            Priority: Major
>              Labels: portability
>          Time Spent: 5h
>  Remaining Estimate: 0h
>
> This is the supertask for remote execution in the Universal Local Runner 
> (BEAM-2899).
> This executes a stage remotely via portability framework APIs



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to