Hi everyone,
Here is a proposal to address the following issue:
JIRA issue https://issues.apache.org/jira/browse/BEAM-443
Currently, users doesn’t have a consistent way to wait for the
pipeline to finish. Different runners have different implementations.
For example:
1. DirectRunner have a isBlockOnRun in DirectOptions, and users can
configure it by setting this options.
2. Dataflow have a separate BlockingDataflowRunner, and users can
switch runners to control blocking v.s non-blocking.
3. Flink and Spark runners might or might not block depends on their
implementations of run().
Proposal:
Users control whether to wait for the pipeline to finish through
PipelineResult, and be able to cancel a running pipeline.
1. Add PipelineResult.waitToFinish(Duration)
2. Add PipelineResult.cancel()
3. Add PipelineResult.waitToRunning(Duration)
4. PipelineRunner.run() should (but not required) do non-blocking runs
UserCode Scenarios:
// Case 1: don't care whether to block
PipelineResult result = pipeline.run();
// Case 2: wait to finish, and inspect the result.
PipelineResult result = pipeline.run();
result.waitToFinish(Duration);
result.getX(...)
// Case 3: run multiple pipelines, and inspect results.
for (int i = 0; i < 10; ++i ) {
pipeline[i].run();
}
… poll statuses and inspect results …
// Case 4: test streaming pipeline
PipelineResult result = pipeline.run();
result.waitToRunning(Duration);
result.getAggregatorValues();
... check aggregator ...
result.cancel();
What does everyone think?
Thanks
--
Pei