Hi,
A bit of clarification, the Flink Runner does not terminate a Job when the 
timeout is reached in waitUntilFinish(Duration). When we reach the timeout we 
simply return and keep the job running. I thought that was the expected 
behaviour.

Regarding job termination, I think it’s easy to change the Flink Runner to 
terminate if the watermark reaches +Inf. We would simply set running to false 
in the UnboundedSourceWrapper when the watermark reaches +Inf: [1]

Best,
Aljoscha

[1] 
https://github.com/apache/beam/blob/cec71028ff63c7e1b1565c013ae0e378608cb5f9/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L428-L428
 
<https://github.com/apache/beam/blob/cec71028ff63c7e1b1565c013ae0e378608cb5f9/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L428-L428>

> On 10. May 2017, at 10:58, Jean-Baptiste Onofré <j...@nanthrax.net> wrote:
> 
> OK, now I understand: you are talking about waitUntilFinish(), whereas I was 
> thinking about a simple run().
> 
> IMHO spark and flink sound like the most logic behavior for a streaming 
> pipeline.
> 
> Regards
> JB
> 
> On 05/10/2017 10:20 AM, Etienne Chauchot wrote:
>> Hi everyone,
>> 
>> I'm reopening this subject because, IMHO, it is important to unify pipelines
>> termination semantics in the model. Here are the differences I have observed 
>> in
>> streaming pipelines termination:
>> 
>> - direct runner: when the output watermarks of all of its PCollections 
>> progress
>> to +infinity
>> 
>> - apex runner: when the output watermarks of all of its PCollections 
>> progress to
>> +infinity
>> 
>> - dataflow runner: when the output watermarks of all of its PCollections
>> progress to +infinity
>> 
>> - spark runner: streaming pipelines do not terminate unless timeout is set in
>> pipelineResult.waitUntilFinish()
>> 
>> - flink runner: streaming pipelines do not terminate unless timeout is set in
>> pipelineResult.waitUntilFinish() (thanks to Aljoscha for timeout support PR
>> https://github.com/apache/beam/pull/2915#pullrequestreview-37090326)
>> 
>> 
>> => Is the direct/apex/dataflow behavior the correct "beam model" behavior?
>> 
>> 
>> I know that, at least for spark (mails in this thread), there is no easy way 
>> to
>> know that we're done reading a source, so it might be very difficult (at 
>> least
>> for this runner) to unify toward +infinity behavior if it is chosen as the
>> standard behavior.
>> 
>> Best
>> 
>> Etienne
>> 
>> Le 18/04/2017 à 12:05, Etienne Chauchot a écrit :
>>> +1 on "runners really terminate in a timely manner to easily 
>>> programmatically
>>> orchestrate Beam pipelines in a portable way, you do need to know whether
>>> the pipeline will finish without thinking about the specific runner and its
>>> options"
>>> 
>>> As an example, in Nexmark, we have streaming mode tests, and for the
>>> benchmark, we need all the queries to behave the same between runners 
>>> towards
>>> termination.
>>> 
>>> For now, to have the consistent behavior, in this mode we need to set a
>>> timeout (a bit random and flaky) on waitUntilFinish() for spark but this
>>> timeout is not needed for direct runner.
>>> 
>>> Etienne
>>> 
>>> Le 02/03/2017 à 19:27, Kenneth Knowles a écrit :
>>>> Isn't this already the case? I think semantically it is an unavoidable
>>>> conclusion, so certainly +1 to that.
>>>> 
>>>> The DirectRunner and TestDataflowRunner both have this behavior already.
>>>> I've always considered that a streaming job running forever is just [very]
>>>> suboptimal shutdown latency :-)
>>>> 
>>>> Some bits of the discussion on the ticket seem to surround whether or how
>>>> to communicate this property in a generic way. Since a runner owns its
>>>> PipelineResult it doesn't seem necessary.
>>>> 
>>>> So is the bottom line just that you want to more strongly insist that
>>>> runners really terminate in a timely manner? I'm +1 to that, too, for
>>>> basically the reason Stas gives: In order to easily programmatically
>>>> orchestrate Beam pipelines in a portable way, you do need to know whether
>>>> the pipeline will finish without thinking about the specific runner and its
>>>> options (as with our RunnableOnService tests).
>>>> 
>>>> Kenn
>>>> 
>>>> On Thu, Mar 2, 2017 at 9:09 AM, Dan Halperin <dhalp...@google.com.invalid>
>>>> wrote:
>>>> 
>>>>> Note that even "unbounded pipeline in a streaming 
>>>>> runner".waitUntilFinish()
>>>>> can return, e.g., if you cancel it or terminate it. It's totally 
>>>>> reasonable
>>>>> for users to want to understand and handle these cases.
>>>>> 
>>>>> +1
>>>>> 
>>>>> Dan
>>>>> 
>>>>> On Thu, Mar 2, 2017 at 2:53 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
>>>>> wrote:
>>>>> 
>>>>>> +1
>>>>>> 
>>>>>> Good idea !!
>>>>>> 
>>>>>> Regards
>>>>>> JB
>>>>>> 
>>>>>> 
>>>>>> On 03/02/2017 02:54 AM, Eugene Kirpichov wrote:
>>>>>> 
>>>>>>> Raising this onto the mailing list from
>>>>>>> https://issues.apache.org/jira/browse/BEAM-849
>>>>>>> 
>>>>>>> The issue came up: what does it mean for a pipeline to finish, in the
>>>>> Beam
>>>>>>> model?
>>>>>>> 
>>>>>>> Note that I am deliberately not talking about "batch" and "streaming"
>>>>>>> pipelines, because this distinction does not exist in the model. Several
>>>>>>> runners have batch/streaming *modes*, which implement the same semantics
>>>>>>> (potentially different subsets: in batch mode typically a runner will
>>>>>>> reject pipelines that have at least one unbounded PCollection) but in an
>>>>>>> operationally different way. However we should define pipeline
>>>>> termination
>>>>>>> at the level of the unified model, and then make sure that all runners
>>>>> in
>>>>>>> all modes implement that properly.
>>>>>>> 
>>>>>>> One natural way is to say "a pipeline terminates when the output
>>>>>>> watermarks
>>>>>>> of all of its PCollection's progress to +infinity". (Note: this can be
>>>>>>> generalized, I guess, to having partial executions of a pipeline: if
>>>>>>> you're
>>>>>>> interested in the full contents of only some collections, then you wait
>>>>>>> until only the watermarks of those collections progress to infinity)
>>>>>>> 
>>>>>>> A typical "batch" runner mode does not implement watermarks - we can
>>>>> think
>>>>>>> of it as assigning watermark -infinity to an output of a transform that
>>>>>>> hasn't started executing yet, and +infinity to output of a transform
>>>>> that
>>>>>>> has finished executing. This is consistent with how such runners
>>>>> implement
>>>>>>> termination in practice.
>>>>>>> 
>>>>>>> Dataflow streaming runner additionally implements such termination for
>>>>>>> pipeline drain operation: it has 2 parts: 1) stop consuming input from
>>>>> the
>>>>>>> sources, and 2) wait until all watermarks progress to infinity.
>>>>>>> 
>>>>>>> Let us fill the gap by making this part of the Beam model and declaring
>>>>>>> that all runners should implement this behavior. This will give nice
>>>>>>> properties, e.g.:
>>>>>>> - A pipeline that has only bounded collections can be run by any runner
>>>>> in
>>>>>>> any mode, with the same results and termination behavior (this is
>>>>> actually
>>>>>>> my motivating example for raising this issue is: I was running
>>>>> Splittable
>>>>>>> DoFn tests
>>>>>>> <https://github.com/apache/beam/blob/master/sdks/java/core/
>>>>>>> src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java>
>>>>>>> with the streaming Dataflow runner - these tests produce only bounded
>>>>>>> collections - and noticed that they wouldn't terminate even though all
>>>>>>> data
>>>>>>> was processed)
>>>>>>> - It will be possible to implement pipelines that stream data for a
>>>>> while
>>>>>>> and then eventually successfully terminate based on some condition.
>>>>> E.g. a
>>>>>>> pipeline that watches a continuously growing file until it is marked
>>>>>>> read-only, or a pipeline that reads a Kafka topic partition until it
>>>>>>> receives a "poison pill" message. This seems handy.
>>>>>>> 
>>>>>>> 
>>>>>> --
>>>>>> Jean-Baptiste Onofré
>>>>>> jbono...@apache.org
>>>>>> http://blog.nanthrax.net
>>>>>> Talend - http://www.talend.com
>>>>>> 
>>> 
>> 
> 
> -- 
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com

Reply via email to