Hi everyone,
Here is a proposal to address the following issue:
JIRA issue https://issues.apache.org/jira/browse/BEAM-443

Currently, users doesn’t have a consistent way to wait for the
pipeline to finish. Different runners have different implementations.
For example:
1. DirectRunner have a isBlockOnRun in DirectOptions, and users can
configure it by setting this options.
2. Dataflow have a separate BlockingDataflowRunner, and users can
switch runners to control blocking v.s non-blocking.
3. Flink and Spark runners might or might not block depends on their
implementations of run().

Proposal:
Users control whether to wait for the pipeline to finish through
PipelineResult, and be able to cancel a running pipeline.
1. Add PipelineResult.waitToFinish(Duration)
2. Add PipelineResult.cancel()
3. Add PipelineResult.waitToRunning(Duration)
4. PipelineRunner.run() should (but not required) do non-blocking runs

UserCode Scenarios:
// Case 1: don't care whether to block
PipelineResult result = pipeline.run();

// Case 2: wait to finish, and inspect the result.
PipelineResult result = pipeline.run();
result.waitToFinish(Duration);
result.getX(...)

// Case 3: run multiple pipelines, and inspect results.
for (int i = 0; i < 10; ++i ) {
 pipeline[i].run();
}
… poll statuses and inspect results …

// Case 4: test streaming pipeline
PipelineResult result = pipeline.run();
result.waitToRunning(Duration);
result.getAggregatorValues();
... check aggregator ...
result.cancel();

What does everyone think?

Thanks
--
Pei

Reply via email to