[
https://issues.apache.org/jira/browse/BEAM-2597?focusedWorklogId=99123&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-99123
]
ASF GitHub Bot logged work on BEAM-2597:
----------------------------------------
Author: ASF GitHub Bot
Created on: 07/May/18 18:59
Start Date: 07/May/18 18:59
Worklog Time Spent: 10m
Work Description: tgroh commented on a change in pull request #5285:
[BEAM-2597] Flink batch ExecutableStage operator
URL: https://github.com/apache/beam/pull/5285#discussion_r186516016
##########
File path:
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
##########
@@ -38,36 +53,115 @@
public class FlinkExecutableStageFunction<InputT>
extends RichMapPartitionFunction<WindowedValue<InputT>, RawUnionValue> {
+ // Main constructor fields. All must be Serializable because Flink
distributes Functions to
+ // task managers via java serialization.
+
// The executable stage this function will run.
private final RunnerApi.ExecutableStagePayload stagePayload;
// Pipeline options. Used for provisioning api.
- private final Struct pipelineOptions;
+ private final JobInfo jobInfo;
// Map from PCollection id to the union tag used to represent this
PCollection in the output.
private final Map<String, Integer> outputMap;
+ private final SerializableSupplier<FlinkBundleFactory> bundleFactorySupplier;
+ private final FlinkStateRequestHandlerFactory stateHandlerFactory;
+
+ // Worker-local fields. These should only be constructed and consumed on
Flink TaskManagers.
+ private transient RuntimeContext runtimeContext;
+ private transient StateRequestHandler stateRequestHandler;
+ private transient StageBundleFactory stageBundleFactory;
+ private transient CloseableDistributedCache distributedCache;
public FlinkExecutableStageFunction(
RunnerApi.ExecutableStagePayload stagePayload,
- Struct pipelineOptions,
- Map<String, Integer> outputMap) {
+ JobInfo jobInfo,
+ Map<String, Integer> outputMap,
+ SerializableSupplier<FlinkBundleFactory> bundleFactorySupplier,
+ FlinkStateRequestHandlerFactory stateHandlerFactory) {
this.stagePayload = stagePayload;
- this.pipelineOptions = pipelineOptions;
+ this.jobInfo = jobInfo;
this.outputMap = outputMap;
+ this.bundleFactorySupplier = bundleFactorySupplier;
+ this.stateHandlerFactory = stateHandlerFactory;
}
@Override
public void open(Configuration parameters) throws Exception {
- throw new UnsupportedOperationException();
+ ExecutableStage executableStage =
ExecutableStage.fromPayload(stagePayload);
+ runtimeContext = getRuntimeContext();
+ // NOTE: It's safe to reuse the state handler between partitions because
each partition uses the
+ // same backing runtime context and broadcast variables. We use checkState
below to catch errors
+ // in backward-incompatible Flink changes.
+ stateRequestHandler = stateHandlerFactory.forStage(executableStage,
runtimeContext);
+ distributedCache =
CloseableDistributedCache.wrapping(runtimeContext.getDistributedCache());
+ FlinkBundleFactory flinkBundleFactory = bundleFactorySupplier.get();
+ // TODO: Do we really want this layer of indirection when accessing the
stage bundle factory?
+ // It's a little strange because this operator is responsible for the
lifetime of the stage
+ // bundle "factory" (manager?) but not the job or Flink bundle factories.
How do we make
+ // ownership of the higher level "factories" explicit? Do we care?
+ JobBundleFactory jobBundleFactory =
+ flinkBundleFactory.getJobBundleFactory(jobInfo, distributedCache);
+ stageBundleFactory = jobBundleFactory.forStage(executableStage);
}
@Override
public void mapPartition(
Iterable<WindowedValue<InputT>> iterable, Collector<RawUnionValue>
collector)
throws Exception {
- throw new UnsupportedOperationException();
+ checkState(
+ runtimeContext == getRuntimeContext(),
+ "RuntimeContext changed from under us. State handler invalid.");
+ checkState(
+ stageBundleFactory != null, "%s not yet prepared",
StageBundleFactory.class.getName());
+ checkState(
+ stateRequestHandler != null, "%s not yet prepared",
StateRequestHandler.class.getName());
+
+ try (RemoteBundle<InputT> bundle =
+ stageBundleFactory.getBundle(
+ new ReceiverFactory(collector, outputMap), stateRequestHandler)) {
+ FnDataReceiver<WindowedValue<InputT>> receiver =
bundle.getInputReceiver();
+ for (WindowedValue<InputT> input : iterable) {
+ receiver.accept(input);
+ }
+ }
+ // NOTE: RemoteBundle.close() blocks on completion of all data receivers.
This is necessary to
+ // safely reference the partition-scoped Collector from receivers.
}
@Override
public void close() throws Exception {
- throw new UnsupportedOperationException();
+ // TODO: Wrap close calls to ensure all calls are attempted?
Review comment:
```
try (AutoCloseable cache = distributedCache;
AutoCloseable factory = stageBundleFactory;) {}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 99123)
Time Spent: 40m (was: 0.5h)
> FlinkRunner ExecutableStage batch operator
> ------------------------------------------
>
> Key: BEAM-2597
> URL: https://issues.apache.org/jira/browse/BEAM-2597
> Project: Beam
> Issue Type: Sub-task
> Components: runner-flink
> Reporter: Kenneth Knowles
> Assignee: Ben Sidhom
> Priority: Major
> Labels: portability
> Time Spent: 40m
> Remaining Estimate: 0h
>
> This operator will execute user code in the context of an SDK harness by
> constructing a ProcessBundleDescriptor from an ExecutableStage (physical
> stage plan) and sending instructions/elements over the control and data
> planes.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)