[
https://issues.apache.org/jira/browse/BEAM-3977?focusedWorklogId=87208&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-87208
]
ASF GitHub Bot logged work on BEAM-3977:
----------------------------------------
Author: ASF GitHub Bot
Created on: 03/Apr/18 19:04
Start Date: 03/Apr/18 19:04
Worklog Time Spent: 10m
Work Description: axelmagn commented on a change in pull request #4988:
[BEAM-3977] Move out nested classes from SdkHarnessClient.
URL: https://github.com/apache/beam/pull/4988#discussion_r178929996
##########
File path:
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/BundleProcessor.java
##########
@@ -0,0 +1,151 @@
+package org.apache.beam.runners.fnexecution.control;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletionStage;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
+import org.apache.beam.runners.fnexecution.data.FnDataService;
+import org.apache.beam.runners.fnexecution.data.RemoteInputDestination;
+import org.apache.beam.runners.fnexecution.state.StateDelegator;
+import org.apache.beam.runners.fnexecution.state.StateRequestHandler;
+import org.apache.beam.sdk.fn.data.CloseableFnDataReceiver;
+import org.apache.beam.sdk.fn.data.InboundDataClient;
+import org.apache.beam.sdk.fn.data.LogicalEndpoint;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A processor capable of creating bundles for some registered
+ * {@link BeamFnApi.ProcessBundleDescriptor}.
+ */
+public class BundleProcessor<T> {
+ private static final Logger LOG =
LoggerFactory.getLogger(BundleProcessor.class);
+
+ private final BeamFnApi.ProcessBundleDescriptor processBundleDescriptor;
+ private final CompletionStage<BeamFnApi.RegisterResponse> registrationFuture;
+ private final RemoteInputDestination<WindowedValue<T>> remoteInput;
+ private final StateDelegator stateDelegator;
+ private InstructionRequestHandler fnApiControlClient;
+ private FnDataService fnApiDataService;
+ private SdkHarnessClient.IdGenerator idGenerator;
+
+ BundleProcessor(
+ InstructionRequestHandler fnApiControlClient,
+ FnDataService fnApiDataService,
+ SdkHarnessClient.IdGenerator idGenerator,
+ BeamFnApi.ProcessBundleDescriptor processBundleDescriptor,
+ CompletionStage<BeamFnApi.RegisterResponse> registrationFuture,
+ RemoteInputDestination<WindowedValue<T>> remoteInput,
+ StateDelegator stateDelegator) {
+ this.processBundleDescriptor = processBundleDescriptor;
+ this.registrationFuture = registrationFuture;
+ this.remoteInput = remoteInput;
+ this.stateDelegator = stateDelegator;
+ this.fnApiControlClient = fnApiControlClient;
+ this.fnApiDataService = fnApiDataService;
+ this.idGenerator = idGenerator;
+ }
+
+ public CompletionStage<BeamFnApi.RegisterResponse> getRegistrationFuture() {
Review comment:
I mean it really depends on the intended usage pattern.
getRegistrationFuture would make sense if you want to attach an async callback
once registration completes.
However, since all code that currently calls it would benefit from your
simplification, I'm into it.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 87208)
Time Spent: 1.5h (was: 1h 20m)
> Member classes of SdkHarnessClient should have their own files.
> ---------------------------------------------------------------
>
> Key: BEAM-3977
> URL: https://issues.apache.org/jira/browse/BEAM-3977
> Project: Beam
> Issue Type: Improvement
> Components: runner-core
> Reporter: Axel Magnuson
> Assignee: Axel Magnuson
> Priority: Minor
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> SdkHarnessClient contains quite a few nested classes that could be split out.
> of these, BundleProcessor and ActiveBundle have grown up to be first class
> concepts that we interact with just as much as the SdkHarnessClient.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)