[ 
https://issues.apache.org/jira/browse/BEAM-3977?focusedWorklogId=86634&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-86634
 ]

ASF GitHub Bot logged work on BEAM-3977:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 02/Apr/18 16:33
            Start Date: 02/Apr/18 16:33
    Worklog Time Spent: 10m 
      Work Description: tgroh commented on a change in pull request #4988: 
[BEAM-3977] Move out nested classes from SdkHarnessClient.
URL: https://github.com/apache/beam/pull/4988#discussion_r178582362
 
 

 ##########
 File path: 
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleProcessor.java
 ##########
 @@ -0,0 +1,151 @@
+package org.apache.beam.runners.fnexecution.control;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletionStage;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.runners.fnexecution.data.FnDataService;
+import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
+import org.apache.beam.runners.fnexecution.state.StateDelegator;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
+import org.apache.beam.sdk.fn.data.InboundDataClient;
+import org.apache.beam.sdk.fn.data.LogicalEndpoint;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A processor capable of creating bundles for some registered
+ * {@link BeamFnApi.ProcessBundleDescriptor}.
+ */
+public class BundleProcessor<T> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BundleProcessor.class);
+
+  private final BeamFnApi.ProcessBundleDescriptor processBundleDescriptor;
+  private final CompletionStage<BeamFnApi.RegisterResponse> registrationFuture;
+  private final RemoteInputDestination<WindowedValue<T>> remoteInput;
+  private final StateDelegator stateDelegator;
+  private InstructionRequestHandler fnApiControlClient;
+  private FnDataService fnApiDataService;
+  private SdkHarnessClient.IdGenerator idGenerator;
+
+  BundleProcessor(
+      InstructionRequestHandler fnApiControlClient,
+      FnDataService fnApiDataService,
+      SdkHarnessClient.IdGenerator idGenerator,
+      BeamFnApi.ProcessBundleDescriptor processBundleDescriptor,
+      CompletionStage<BeamFnApi.RegisterResponse> registrationFuture,
+      RemoteInputDestination<WindowedValue<T>> remoteInput,
+      StateDelegator stateDelegator) {
+    this.processBundleDescriptor = processBundleDescriptor;
+    this.registrationFuture = registrationFuture;
+    this.remoteInput = remoteInput;
+    this.stateDelegator = stateDelegator;
+    this.fnApiControlClient = fnApiControlClient;
+    this.fnApiDataService = fnApiDataService;
+    this.idGenerator = idGenerator;
+  }
+
+  public CompletionStage<BeamFnApi.RegisterResponse> getRegistrationFuture() {
 
 Review comment:
   We probably want to expose something simpler (`isRegistrationComplete() 
throws Exception` or whatever).
   
   This may not even need to be exposed outside of the package at all, and the 
`SdkHarnessClient` could handle awaiting the response and making sure that it 
was successful or not.
   
   It might be worthwhile to have the `BundleProcessor` update its state if the 
stage completes exceptionally.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 86634)
    Time Spent: 0.5h  (was: 20m)

> Member classes of SdkHarnessClient should have their own files.
> ---------------------------------------------------------------
>
>                 Key: BEAM-3977
>                 URL: https://issues.apache.org/jira/browse/BEAM-3977
>             Project: Beam
>          Issue Type: Improvement
>          Components: runner-core
>            Reporter: Axel Magnuson
>            Assignee: Axel Magnuson
>            Priority: Minor
>          Time Spent: 0.5h
>  Remaining Estimate: 0h
>
> SdkHarnessClient contains quite a few nested classes that could be split out. 
>  of these, BundleProcessor and ActiveBundle have grown up to be first class 
> concepts that we interact with just as much as the SdkHarnessClient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to