[
https://issues.apache.org/jira/browse/BEAM-5600?focusedWorklogId=364766&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-364766
]
ASF GitHub Bot logged work on BEAM-5600:
----------------------------------------
Author: ASF GitHub Bot
Created on: 30/Dec/19 21:27
Start Date: 30/Dec/19 21:27
Worklog Time Spent: 10m
Work Description: lukecwik commented on pull request #10482: [BEAM-5600]
Add unimplemented split API to Runner side SDF libraries.
URL: https://github.com/apache/beam/pull/10482#discussion_r362100325
##########
File path:
runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/control/SdkHarnessClient.java
##########
@@ -231,122 +234,114 @@ public ActiveBundle newBundle(
return fnApiDataService.receive(
LogicalEndpoint.of(bundleId, ptransformId), receiver.getCoder(),
receiver.getReceiver());
}
- }
- /** An active bundle for a particular {@link
BeamFnApi.ProcessBundleDescriptor}. */
- public static class ActiveBundle implements RemoteBundle {
- private final String bundleId;
- private final CompletionStage<BeamFnApi.ProcessBundleResponse> response;
- private final Map<String, CloseableFnDataReceiver> inputReceivers;
- private final Map<String, InboundDataClient> outputClients;
- private final StateDelegator.Registration stateRegistration;
- private final BundleProgressHandler progressHandler;
- private final BundleCheckpointHandler checkpointHandler;
- private final BundleFinalizationHandler finalizationHandler;
-
- private ActiveBundle(
- String bundleId,
- CompletionStage<ProcessBundleResponse> response,
- Map<String, CloseableFnDataReceiver> inputReceivers,
- Map<String, InboundDataClient> outputClients,
- StateDelegator.Registration stateRegistration,
- BundleProgressHandler progressHandler,
- BundleCheckpointHandler checkpointHandler,
- BundleFinalizationHandler finalizationHandler) {
- this.bundleId = bundleId;
- this.response = response;
- this.inputReceivers = inputReceivers;
- this.outputClients = outputClients;
- this.stateRegistration = stateRegistration;
- this.progressHandler = progressHandler;
- this.checkpointHandler = checkpointHandler;
- this.finalizationHandler = finalizationHandler;
- }
+ /** An active bundle for a particular {@link
BeamFnApi.ProcessBundleDescriptor}. */
+ public class ActiveBundle implements RemoteBundle {
+ private final String bundleId;
+ private final CompletionStage<BeamFnApi.ProcessBundleResponse> response;
+ private final Map<String, CloseableFnDataReceiver> inputReceivers;
+ private final Map<String, InboundDataClient> outputClients;
+ private final StateDelegator.Registration stateRegistration;
+ private final BundleProgressHandler progressHandler;
+ private final BundleSplitHandler splitHandler;
+ private final BundleCheckpointHandler checkpointHandler;
+ private final BundleFinalizationHandler finalizationHandler;
+
+ private ActiveBundle(
+ String bundleId,
+ CompletionStage<ProcessBundleResponse> response,
+ Map<String, CloseableFnDataReceiver> inputReceivers,
+ Map<String, InboundDataClient> outputClients,
+ StateDelegator.Registration stateRegistration,
+ BundleProgressHandler progressHandler,
+ BundleSplitHandler splitHandler,
+ BundleCheckpointHandler checkpointHandler,
+ BundleFinalizationHandler finalizationHandler) {
+ this.bundleId = bundleId;
+ this.response = response;
+ this.inputReceivers = inputReceivers;
+ this.outputClients = outputClients;
+ this.stateRegistration = stateRegistration;
+ this.progressHandler = progressHandler;
+ this.splitHandler = splitHandler;
+ this.checkpointHandler = checkpointHandler;
+ this.finalizationHandler = finalizationHandler;
+ }
- /** Returns an id used to represent this bundle. */
- @Override
- public String getId() {
- return bundleId;
- }
+ /** Returns an id used to represent this bundle. */
+ @Override
+ public String getId() {
+ return bundleId;
+ }
- /**
- * Get a map of PCollection ids to {@link FnDataReceiver receiver}s which
consume input
- * elements, forwarding them to the remote environment.
- */
- @Override
- public Map<String, FnDataReceiver> getInputReceivers() {
- return (Map) inputReceivers;
- }
+ /**
+ * Get a map of PCollection ids to {@link FnDataReceiver receiver}s
which consume input
+ * elements, forwarding them to the remote environment.
+ */
+ @Override
+ public Map<String, FnDataReceiver> getInputReceivers() {
+ return (Map) inputReceivers;
+ }
- /**
- * Blocks until bundle processing is finished. This is comprised of:
- *
- * <ul>
- * <li>closing each {@link #getInputReceivers() input receiver}.
- * <li>waiting for the SDK to say that processing the bundle is finished.
- * <li>waiting for all inbound data clients to complete
- * </ul>
- *
- * <p>This method will throw an exception if bundle processing has failed.
{@link
- * Throwable#getSuppressed()} will return all the reasons as to why
processing has failed.
- */
- @Override
- public void close() throws Exception {
- Exception exception = null;
- for (CloseableFnDataReceiver<?> inputReceiver : inputReceivers.values())
{
+ @Override
+ public void split(double fractionOfRemainder) {
+ throw new UnsupportedOperationException("Support splitting, TODO:
BEAM-5600");
Review comment:
I need access to the control service which is why I made the ActiveBundle a
nested class.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 364766)
Time Spent: 1.5h (was: 1h 20m)
> Splitting for SplittableDoFn should be exposed within runner shared libraries
> -----------------------------------------------------------------------------
>
> Key: BEAM-5600
> URL: https://issues.apache.org/jira/browse/BEAM-5600
> Project: Beam
> Issue Type: New Feature
> Components: sdk-java-core
> Reporter: Scott Wegner
> Priority: Major
> Labels: portability
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)