Hi, A bit of clarification, the Flink Runner does not terminate a Job when the timeout is reached in waitUntilFinish(Duration). When we reach the timeout we simply return and keep the job running. I thought that was the expected behaviour.
Regarding job termination, I think it’s easy to change the Flink Runner to terminate if the watermark reaches +Inf. We would simply set running to false in the UnboundedSourceWrapper when the watermark reaches +Inf: [1] Best, Aljoscha [1] https://github.com/apache/beam/blob/cec71028ff63c7e1b1565c013ae0e378608cb5f9/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L428-L428 <https://github.com/apache/beam/blob/cec71028ff63c7e1b1565c013ae0e378608cb5f9/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L428-L428> > On 10. May 2017, at 10:58, Jean-Baptiste Onofré <j...@nanthrax.net> wrote: > > OK, now I understand: you are talking about waitUntilFinish(), whereas I was > thinking about a simple run(). > > IMHO spark and flink sound like the most logic behavior for a streaming > pipeline. > > Regards > JB > > On 05/10/2017 10:20 AM, Etienne Chauchot wrote: >> Hi everyone, >> >> I'm reopening this subject because, IMHO, it is important to unify pipelines >> termination semantics in the model. Here are the differences I have observed >> in >> streaming pipelines termination: >> >> - direct runner: when the output watermarks of all of its PCollections >> progress >> to +infinity >> >> - apex runner: when the output watermarks of all of its PCollections >> progress to >> +infinity >> >> - dataflow runner: when the output watermarks of all of its PCollections >> progress to +infinity >> >> - spark runner: streaming pipelines do not terminate unless timeout is set in >> pipelineResult.waitUntilFinish() >> >> - flink runner: streaming pipelines do not terminate unless timeout is set in >> pipelineResult.waitUntilFinish() (thanks to Aljoscha for timeout support PR >> https://github.com/apache/beam/pull/2915#pullrequestreview-37090326) >> >> >> => Is the direct/apex/dataflow behavior the correct "beam model" behavior? >> >> >> I know that, at least for spark (mails in this thread), there is no easy way >> to >> know that we're done reading a source, so it might be very difficult (at >> least >> for this runner) to unify toward +infinity behavior if it is chosen as the >> standard behavior. >> >> Best >> >> Etienne >> >> Le 18/04/2017 à 12:05, Etienne Chauchot a écrit : >>> +1 on "runners really terminate in a timely manner to easily >>> programmatically >>> orchestrate Beam pipelines in a portable way, you do need to know whether >>> the pipeline will finish without thinking about the specific runner and its >>> options" >>> >>> As an example, in Nexmark, we have streaming mode tests, and for the >>> benchmark, we need all the queries to behave the same between runners >>> towards >>> termination. >>> >>> For now, to have the consistent behavior, in this mode we need to set a >>> timeout (a bit random and flaky) on waitUntilFinish() for spark but this >>> timeout is not needed for direct runner. >>> >>> Etienne >>> >>> Le 02/03/2017 à 19:27, Kenneth Knowles a écrit : >>>> Isn't this already the case? I think semantically it is an unavoidable >>>> conclusion, so certainly +1 to that. >>>> >>>> The DirectRunner and TestDataflowRunner both have this behavior already. >>>> I've always considered that a streaming job running forever is just [very] >>>> suboptimal shutdown latency :-) >>>> >>>> Some bits of the discussion on the ticket seem to surround whether or how >>>> to communicate this property in a generic way. Since a runner owns its >>>> PipelineResult it doesn't seem necessary. >>>> >>>> So is the bottom line just that you want to more strongly insist that >>>> runners really terminate in a timely manner? I'm +1 to that, too, for >>>> basically the reason Stas gives: In order to easily programmatically >>>> orchestrate Beam pipelines in a portable way, you do need to know whether >>>> the pipeline will finish without thinking about the specific runner and its >>>> options (as with our RunnableOnService tests). >>>> >>>> Kenn >>>> >>>> On Thu, Mar 2, 2017 at 9:09 AM, Dan Halperin <dhalp...@google.com.invalid> >>>> wrote: >>>> >>>>> Note that even "unbounded pipeline in a streaming >>>>> runner".waitUntilFinish() >>>>> can return, e.g., if you cancel it or terminate it. It's totally >>>>> reasonable >>>>> for users to want to understand and handle these cases. >>>>> >>>>> +1 >>>>> >>>>> Dan >>>>> >>>>> On Thu, Mar 2, 2017 at 2:53 AM, Jean-Baptiste Onofré <j...@nanthrax.net> >>>>> wrote: >>>>> >>>>>> +1 >>>>>> >>>>>> Good idea !! >>>>>> >>>>>> Regards >>>>>> JB >>>>>> >>>>>> >>>>>> On 03/02/2017 02:54 AM, Eugene Kirpichov wrote: >>>>>> >>>>>>> Raising this onto the mailing list from >>>>>>> https://issues.apache.org/jira/browse/BEAM-849 >>>>>>> >>>>>>> The issue came up: what does it mean for a pipeline to finish, in the >>>>> Beam >>>>>>> model? >>>>>>> >>>>>>> Note that I am deliberately not talking about "batch" and "streaming" >>>>>>> pipelines, because this distinction does not exist in the model. Several >>>>>>> runners have batch/streaming *modes*, which implement the same semantics >>>>>>> (potentially different subsets: in batch mode typically a runner will >>>>>>> reject pipelines that have at least one unbounded PCollection) but in an >>>>>>> operationally different way. However we should define pipeline >>>>> termination >>>>>>> at the level of the unified model, and then make sure that all runners >>>>> in >>>>>>> all modes implement that properly. >>>>>>> >>>>>>> One natural way is to say "a pipeline terminates when the output >>>>>>> watermarks >>>>>>> of all of its PCollection's progress to +infinity". (Note: this can be >>>>>>> generalized, I guess, to having partial executions of a pipeline: if >>>>>>> you're >>>>>>> interested in the full contents of only some collections, then you wait >>>>>>> until only the watermarks of those collections progress to infinity) >>>>>>> >>>>>>> A typical "batch" runner mode does not implement watermarks - we can >>>>> think >>>>>>> of it as assigning watermark -infinity to an output of a transform that >>>>>>> hasn't started executing yet, and +infinity to output of a transform >>>>> that >>>>>>> has finished executing. This is consistent with how such runners >>>>> implement >>>>>>> termination in practice. >>>>>>> >>>>>>> Dataflow streaming runner additionally implements such termination for >>>>>>> pipeline drain operation: it has 2 parts: 1) stop consuming input from >>>>> the >>>>>>> sources, and 2) wait until all watermarks progress to infinity. >>>>>>> >>>>>>> Let us fill the gap by making this part of the Beam model and declaring >>>>>>> that all runners should implement this behavior. This will give nice >>>>>>> properties, e.g.: >>>>>>> - A pipeline that has only bounded collections can be run by any runner >>>>> in >>>>>>> any mode, with the same results and termination behavior (this is >>>>> actually >>>>>>> my motivating example for raising this issue is: I was running >>>>> Splittable >>>>>>> DoFn tests >>>>>>> <https://github.com/apache/beam/blob/master/sdks/java/core/ >>>>>>> src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java> >>>>>>> with the streaming Dataflow runner - these tests produce only bounded >>>>>>> collections - and noticed that they wouldn't terminate even though all >>>>>>> data >>>>>>> was processed) >>>>>>> - It will be possible to implement pipelines that stream data for a >>>>> while >>>>>>> and then eventually successfully terminate based on some condition. >>>>> E.g. a >>>>>>> pipeline that watches a continuously growing file until it is marked >>>>>>> read-only, or a pipeline that reads a Kafka topic partition until it >>>>>>> receives a "poison pill" message. This seems handy. >>>>>>> >>>>>>> >>>>>> -- >>>>>> Jean-Baptiste Onofré >>>>>> jbono...@apache.org >>>>>> http://blog.nanthrax.net >>>>>> Talend - http://www.talend.com >>>>>> >>> >> > > -- > Jean-Baptiste Onofré > jbono...@apache.org > http://blog.nanthrax.net > Talend - http://www.talend.com