On Thu, Jul 21, 2016 at 3:24 PM, Pei He <[email protected]> wrote: > I think the two streaming use cases can be done in users code by: > 1. sleeping to block for a Duration. > 2. catch the interrupt signal (such as CTRL-C), and then call > PipelineResult.cancel().
I think it's risky to kill the launched pipeline when the local program terminates (unexpectedly or otherwise). There's also the usecase of starting your streaming pipeline and having it periodically print out counters, which runs until you are happy with the results, your screen session terminates, you restart your computer, or whatever. > But, it brought up the question what WaitUntilFinish() should do in > streaming mode: > 1. Can we define finish in streaming mode as "all data watermarks > reach to infinity"? This is when a streaming pipeline finished > processing bounded sources. Yes. > 2. If there are unbounded sources, WaitUntilFinish() will throw. > (PCollections have the information whether it is bounded or unbounded) I don't think sources can always know. (E.g. you may have a technically unbounded source that decides at some point it's actually done.) > On Thu, Jul 21, 2016 at 6:45 AM, Amit Sela <[email protected]> wrote: >> Generally this makes sense, though I'm more comfortable thinking of it more >> in the sense of: >> >> 1. batch >> 1. blocking - wait (for results) >> 2. non-blocking. >> 2. streaming >> 1. blocking - wait(Duration) >> 2. blocking - waitForInterruption() - some signal that terminates the >> job. >> 3. non-blocking. >> >> My 2¢, >> Amit >> >> On Thu, Jul 21, 2016 at 1:39 AM Pei He <[email protected]> wrote: >> >>> Hi everyone, >>> Here is a proposal to address the following issue: >>> JIRA issue https://issues.apache.org/jira/browse/BEAM-443 >>> >>> Currently, users doesn’t have a consistent way to wait for the >>> pipeline to finish. Different runners have different implementations. >>> For example: >>> 1. DirectRunner have a isBlockOnRun in DirectOptions, and users can >>> configure it by setting this options. >>> 2. Dataflow have a separate BlockingDataflowRunner, and users can >>> switch runners to control blocking v.s non-blocking. >>> 3. Flink and Spark runners might or might not block depends on their >>> implementations of run(). >>> >>> Proposal: >>> Users control whether to wait for the pipeline to finish through >>> PipelineResult, and be able to cancel a running pipeline. >>> 1. Add PipelineResult.waitToFinish(Duration) >>> 2. Add PipelineResult.cancel() >>> 3. Add PipelineResult.waitToRunning(Duration) >>> 4. PipelineRunner.run() should (but not required) do non-blocking runs >>> >>> UserCode Scenarios: >>> // Case 1: don't care whether to block >>> PipelineResult result = pipeline.run(); >>> >>> // Case 2: wait to finish, and inspect the result. >>> PipelineResult result = pipeline.run(); >>> result.waitToFinish(Duration); >>> result.getX(...) >>> >>> // Case 3: run multiple pipelines, and inspect results. >>> for (int i = 0; i < 10; ++i ) { >>> pipeline[i].run(); >>> } >>> … poll statuses and inspect results … >>> >>> // Case 4: test streaming pipeline >>> PipelineResult result = pipeline.run(); >>> result.waitToRunning(Duration); >>> result.getAggregatorValues(); >>> ... check aggregator ... >>> result.cancel(); >>> >>> What does everyone think? >>> >>> Thanks >>> -- >>> Pei >>>
