[ 
https://issues.apache.org/jira/browse/BEAM-3977?focusedWorklogId=86636&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86636
 ]

ASF GitHub Bot logged work on BEAM-3977:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Apr/18 16:33
            Start Date: 02/Apr/18 16:33
    Worklog Time Spent: 10m 
      Work Description: tgroh commented on a change in pull request #4988: 
[BEAM-3977] Move out nested classes from SdkHarnessClient.
URL: https://github.com/apache/beam/pull/4988#discussion_r178583120
 
 

 ##########
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleProcessor.java
 ##########
 @@ -0,0 +1,151 @@
+package org.apache.beam.runners.fnexecution.control;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletionStage;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.runners.fnexecution.data.FnDataService;
+import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
+import org.apache.beam.runners.fnexecution.state.StateDelegator;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
+import org.apache.beam.sdk.fn.data.InboundDataClient;
+import org.apache.beam.sdk.fn.data.LogicalEndpoint;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A processor capable of creating bundles for some registered
+ * {@link BeamFnApi.ProcessBundleDescriptor}.
+ */
+public class BundleProcessor<T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BundleProcessor.class);
+
+  private final BeamFnApi.ProcessBundleDescriptor processBundleDescriptor;
+  private final CompletionStage<BeamFnApi.RegisterResponse> registrationFuture;
+  private final RemoteInputDestination<WindowedValue<T>> remoteInput;
+  private final StateDelegator stateDelegator;
+  private InstructionRequestHandler fnApiControlClient;
+  private FnDataService fnApiDataService;
+  private SdkHarnessClient.IdGenerator idGenerator;
+
+  BundleProcessor(
+      InstructionRequestHandler fnApiControlClient,
+      FnDataService fnApiDataService,
+      SdkHarnessClient.IdGenerator idGenerator,
+      BeamFnApi.ProcessBundleDescriptor processBundleDescriptor,
+      CompletionStage<BeamFnApi.RegisterResponse> registrationFuture,
+      RemoteInputDestination<WindowedValue<T>> remoteInput,
+      StateDelegator stateDelegator) {
+    this.processBundleDescriptor = processBundleDescriptor;
+    this.registrationFuture = registrationFuture;
+    this.remoteInput = remoteInput;
+    this.stateDelegator = stateDelegator;
+    this.fnApiControlClient = fnApiControlClient;
+    this.fnApiDataService = fnApiDataService;
+    this.idGenerator = idGenerator;
+  }
+
+  public CompletionStage<BeamFnApi.RegisterResponse> getRegistrationFuture() {
+    return registrationFuture;
+  }
+
+  /**
+   * Start a new bundle for the given {@link 
BeamFnApi.ProcessBundleDescriptor} identifier.
+   *
+   * <p>The input channels for the returned {@link ActiveBundle} are derived 
from the instructions
+   * in the {@link BeamFnApi.ProcessBundleDescriptor}.
+   *
+   * <p>NOTE: It is important to {@link ActiveBundle#close()} each bundle 
after all elements are
+   * emitted.
+   *
+   * <pre>{@code
+   * try (ActiveBundle<InputT> bundle = SdkHarnessClient.newBundle(...)) {
+   *   FnDataReceiver<InputT> inputReceiver = bundle.getInputReceiver();
+   *   // send all elements ...
+   * }
+   * }</pre>
+   */
+  public ActiveBundle<T> newBundle(
+      Map<BeamFnApi.Target, RemoteOutputReceiver<?>> outputReceivers) {
+    return newBundle(outputReceivers, request -> {
+      throw new UnsupportedOperationException(String.format(
+          "The %s does not have a registered state handler.",
+          ActiveBundle.class.getSimpleName()));
+    });
+  }
+
+  /**
+   * Start a new bundle for the given {@link 
BeamFnApi.ProcessBundleDescriptor} identifier.
+   *
+   * <p>The input channels for the returned {@link ActiveBundle} are derived 
from the instructions
+   * in the {@link BeamFnApi.ProcessBundleDescriptor}.
+   *
+   * <p>NOTE: It is important to {@link ActiveBundle#close()} each bundle 
after all elements are
+   * emitted.
+   *
+   * <pre>{@code
+   * try (ActiveBundle<InputT> bundle = SdkHarnessClient.newBundle(...)) {
+   *   FnDataReceiver<InputT> inputReceiver = bundle.getInputReceiver();
+   *   // send all elements ...
+   * }
+   * }</pre>
+   */
+  public ActiveBundle<T> newBundle(
+      Map<BeamFnApi.Target, RemoteOutputReceiver<?>> outputReceivers,
+      StateRequestHandler stateRequestHandler) {
+    String bundleId = idGenerator.getId();
+
+    final CompletionStage<BeamFnApi.InstructionResponse> genericResponse =
+        fnApiControlClient.handle(
+            BeamFnApi.InstructionRequest.newBuilder()
+                .setInstructionId(bundleId)
+                .setProcessBundle(
+                    BeamFnApi.ProcessBundleRequest.newBuilder()
+                        
.setProcessBundleDescriptorReference(processBundleDescriptor.getId()))
+                .build());
+    LOG.debug(
+        "Sent {} with ID {} for {} with ID {}",
+        BeamFnApi.ProcessBundleRequest.class.getSimpleName(),
+        bundleId,
+        BeamFnApi.ProcessBundleDescriptor.class.getSimpleName(),
+        processBundleDescriptor.getId());
+
+    CompletionStage<BeamFnApi.ProcessBundleResponse> specificResponse =
+        
genericResponse.thenApply(BeamFnApi.InstructionResponse::getProcessBundle);
+    Map<BeamFnApi.Target, InboundDataClient> outputClients = new HashMap<>();
+    for (Map.Entry<BeamFnApi.Target, RemoteOutputReceiver<?>> targetReceiver
+        : outputReceivers.entrySet()) {
+      InboundDataClient outputClient =
+          attachReceiver(
+              bundleId,
+              targetReceiver.getKey(),
+              (RemoteOutputReceiver) targetReceiver.getValue());
+      outputClients.put(targetReceiver.getKey(), outputClient);
+    }
+
+    CloseableFnDataReceiver<WindowedValue<T>> dataReceiver =
+        fnApiDataService.send(
+            LogicalEndpoint.of(bundleId, remoteInput.getTarget()), 
remoteInput.getCoder());
+
+    return new ActiveBundle(
 
 Review comment:
   do diamonds work here?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 86636)
    Time Spent: 50m  (was: 40m)

> Member classes of SdkHarnessClient should have their own files.
> ---------------------------------------------------------------
>
>                 Key: BEAM-3977
>                 URL: https://issues.apache.org/jira/browse/BEAM-3977
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-core
>            Reporter: Axel Magnuson
>            Assignee: Axel Magnuson
>            Priority: Minor
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> SdkHarnessClient contains quite a few nested classes that could be split out. 
>  of these, BundleProcessor and ActiveBundle have grown up to be first class 
> concepts that we interact with just as much as the SdkHarnessClient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to