+1 Having a unified termination semantics for all runners is super important.

Stas or Aviem, is it feasible to do this for the Spark runner or the
timeout is due to a technical limitation of spark.

Thomas Weise, Aljoscha anything to say on this?

Aljoscha, what is the current status for the Flink runner. is there
any progress towards BEAM-593 ?


On Tue, Apr 18, 2017 at 5:05 PM, Stas Levin <[email protected]> wrote:
> Ted, the timeout is needed mostly for testing purposes.
> AFAIK there is no easy way to express the fact a source is "done" in a
> Spark native streaming application.
> Moreover, the Spark streaming "native" flow can either "awaitTermination()"
> or "awaitTerminationOrTimeout(...)". If you "awaitTermination" then you're
> blocked until the execution is either stopped or has failed, so if you wish
> to stop the app sooner, say after a certain period of time,
> "awaitTerminationOrTimeout(...)" may be the way to go.
>
> Using the unified approach discussed in this thread, when a source is
> "done" (i.e. the watermark is +Infinity) the app (e.g. runner) would
> gracefully stop.
>
>
>
> On Tue, Apr 18, 2017 at 3:19 PM Ted Yu <[email protected]> wrote:
>
>> Why is the timeout needed for Spark ?
>>
>> Thanks
>>
>> > On Apr 18, 2017, at 3:05 AM, Etienne Chauchot <[email protected]>
>> wrote:
>> >
>> > +1 on "runners really terminate in a timely manner to easily
>> programmatically orchestrate Beam pipelines in a portable way, you do need
>> to know whether
>> > the pipeline will finish without thinking about the specific runner and
>> its options"
>> >
>> > As an example, in Nexmark, we have streaming mode tests, and for the
>> benchmark, we need all the queries to behave the same between runners
>> towards termination.
>> >
>> > For now, to have the consistent behavior, in this mode we need to set a
>> timeout (a bit random and flaky) on waitUntilFinish() for spark but this
>> timeout is not needed for direct runner.
>> >
>> > Etienne
>> >
>> >> Le 02/03/2017 à 19:27, Kenneth Knowles a écrit :
>> >> Isn't this already the case? I think semantically it is an unavoidable
>> >> conclusion, so certainly +1 to that.
>> >>
>> >> The DirectRunner and TestDataflowRunner both have this behavior already.
>> >> I've always considered that a streaming job running forever is just
>> [very]
>> >> suboptimal shutdown latency :-)
>> >>
>> >> Some bits of the discussion on the ticket seem to surround whether or
>> how
>> >> to communicate this property in a generic way. Since a runner owns its
>> >> PipelineResult it doesn't seem necessary.
>> >>
>> >> So is the bottom line just that you want to more strongly insist that
>> >> runners really terminate in a timely manner? I'm +1 to that, too, for
>> >> basically the reason Stas gives: In order to easily programmatically
>> >> orchestrate Beam pipelines in a portable way, you do need to know
>> whether
>> >> the pipeline will finish without thinking about the specific runner and
>> its
>> >> options (as with our RunnableOnService tests).
>> >>
>> >> Kenn
>> >>
>> >> On Thu, Mar 2, 2017 at 9:09 AM, Dan Halperin
>> <[email protected]>
>> >> wrote:
>> >>
>> >>> Note that even "unbounded pipeline in a streaming
>> runner".waitUntilFinish()
>> >>> can return, e.g., if you cancel it or terminate it. It's totally
>> reasonable
>> >>> for users to want to understand and handle these cases.
>> >>>
>> >>> +1
>> >>>
>> >>> Dan
>> >>>
>> >>> On Thu, Mar 2, 2017 at 2:53 AM, Jean-Baptiste Onofré <[email protected]>
>> >>> wrote:
>> >>>
>> >>>> +1
>> >>>>
>> >>>> Good idea !!
>> >>>>
>> >>>> Regards
>> >>>> JB
>> >>>>
>> >>>>
>> >>>>> On 03/02/2017 02:54 AM, Eugene Kirpichov wrote:
>> >>>>>
>> >>>>> Raising this onto the mailing list from
>> >>>>> https://issues.apache.org/jira/browse/BEAM-849
>> >>>>>
>> >>>>> The issue came up: what does it mean for a pipeline to finish, in the
>> >>> Beam
>> >>>>> model?
>> >>>>>
>> >>>>> Note that I am deliberately not talking about "batch" and "streaming"
>> >>>>> pipelines, because this distinction does not exist in the model.
>> Several
>> >>>>> runners have batch/streaming *modes*, which implement the same
>> semantics
>> >>>>> (potentially different subsets: in batch mode typically a runner will
>> >>>>> reject pipelines that have at least one unbounded PCollection) but
>> in an
>> >>>>> operationally different way. However we should define pipeline
>> >>> termination
>> >>>>> at the level of the unified model, and then make sure that all
>> runners
>> >>> in
>> >>>>> all modes implement that properly.
>> >>>>>
>> >>>>> One natural way is to say "a pipeline terminates when the output
>> >>>>> watermarks
>> >>>>> of all of its PCollection's progress to +infinity". (Note: this can
>> be
>> >>>>> generalized, I guess, to having partial executions of a pipeline: if
>> >>>>> you're
>> >>>>> interested in the full contents of only some collections, then you
>> wait
>> >>>>> until only the watermarks of those collections progress to infinity)
>> >>>>>
>> >>>>> A typical "batch" runner mode does not implement watermarks - we can
>> >>> think
>> >>>>> of it as assigning watermark -infinity to an output of a transform
>> that
>> >>>>> hasn't started executing yet, and +infinity to output of a transform
>> >>> that
>> >>>>> has finished executing. This is consistent with how such runners
>> >>> implement
>> >>>>> termination in practice.
>> >>>>>
>> >>>>> Dataflow streaming runner additionally implements such termination
>> for
>> >>>>> pipeline drain operation: it has 2 parts: 1) stop consuming input
>> from
>> >>> the
>> >>>>> sources, and 2) wait until all watermarks progress to infinity.
>> >>>>>
>> >>>>> Let us fill the gap by making this part of the Beam model and
>> declaring
>> >>>>> that all runners should implement this behavior. This will give nice
>> >>>>> properties, e.g.:
>> >>>>> - A pipeline that has only bounded collections can be run by any
>> runner
>> >>> in
>> >>>>> any mode, with the same results and termination behavior (this is
>> >>> actually
>> >>>>> my motivating example for raising this issue is: I was running
>> >>> Splittable
>> >>>>> DoFn tests
>> >>>>> <https://github.com/apache/beam/blob/master/sdks/java/core/
>> >>>>> src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java>
>> >>>>> with the streaming Dataflow runner - these tests produce only bounded
>> >>>>> collections - and noticed that they wouldn't terminate even though
>> all
>> >>>>> data
>> >>>>> was processed)
>> >>>>> - It will be possible to implement pipelines that stream data for a
>> >>> while
>> >>>>> and then eventually successfully terminate based on some condition.
>> >>> E.g. a
>> >>>>> pipeline that watches a continuously growing file until it is marked
>> >>>>> read-only, or a pipeline that reads a Kafka topic partition until it
>> >>>>> receives a "poison pill" message. This seems handy.
>> >>>> --
>> >>>> Jean-Baptiste Onofré
>> >>>> [email protected]
>> >>>> http://blog.nanthrax.net
>> >>>> Talend - http://www.talend.com
>> >
>>

Reply via email to