OK, I'm glad everybody is in agreement on this. I raised this point because
we've been discussing implementing this behavior in the Dataflow streaming
runner, and I wanted to make sure that people are okay with it from a
conceptual point of view before proceeding.

On Thu, Mar 2, 2017 at 10:27 AM Kenneth Knowles <k...@google.com.invalid>
wrote:

Isn't this already the case? I think semantically it is an unavoidable
conclusion, so certainly +1 to that.

The DirectRunner and TestDataflowRunner both have this behavior already.
I've always considered that a streaming job running forever is just [very]
suboptimal shutdown latency :-)

Some bits of the discussion on the ticket seem to surround whether or how
to communicate this property in a generic way. Since a runner owns its
PipelineResult it doesn't seem necessary.

So is the bottom line just that you want to more strongly insist that
runners really terminate in a timely manner? I'm +1 to that, too, for
basically the reason Stas gives: In order to easily programmatically
orchestrate Beam pipelines in a portable way, you do need to know whether
the pipeline will finish without thinking about the specific runner and its
options (as with our RunnableOnService tests).

Kenn

On Thu, Mar 2, 2017 at 9:09 AM, Dan Halperin <dhalp...@google.com.invalid>
wrote:

> Note that even "unbounded pipeline in a streaming
runner".waitUntilFinish()
> can return, e.g., if you cancel it or terminate it. It's totally
reasonable
> for users to want to understand and handle these cases.
>
> +1
>
> Dan
>
> On Thu, Mar 2, 2017 at 2:53 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
> wrote:
>
> > +1
> >
> > Good idea !!
> >
> > Regards
> > JB
> >
> >
> > On 03/02/2017 02:54 AM, Eugene Kirpichov wrote:
> >
> >> Raising this onto the mailing list from
> >> https://issues.apache.org/jira/browse/BEAM-849
> >>
> >> The issue came up: what does it mean for a pipeline to finish, in the
> Beam
> >> model?
> >>
> >> Note that I am deliberately not talking about "batch" and "streaming"
> >> pipelines, because this distinction does not exist in the model.
Several
> >> runners have batch/streaming *modes*, which implement the same
semantics
> >> (potentially different subsets: in batch mode typically a runner will
> >> reject pipelines that have at least one unbounded PCollection) but in
an
> >> operationally different way. However we should define pipeline
> termination
> >> at the level of the unified model, and then make sure that all runners
> in
> >> all modes implement that properly.
> >>
> >> One natural way is to say "a pipeline terminates when the output
> >> watermarks
> >> of all of its PCollection's progress to +infinity". (Note: this can be
> >> generalized, I guess, to having partial executions of a pipeline: if
> >> you're
> >> interested in the full contents of only some collections, then you wait
> >> until only the watermarks of those collections progress to infinity)
> >>
> >> A typical "batch" runner mode does not implement watermarks - we can
> think
> >> of it as assigning watermark -infinity to an output of a transform that
> >> hasn't started executing yet, and +infinity to output of a transform
> that
> >> has finished executing. This is consistent with how such runners
> implement
> >> termination in practice.
> >>
> >> Dataflow streaming runner additionally implements such termination for
> >> pipeline drain operation: it has 2 parts: 1) stop consuming input from
> the
> >> sources, and 2) wait until all watermarks progress to infinity.
> >>
> >> Let us fill the gap by making this part of the Beam model and declaring
> >> that all runners should implement this behavior. This will give nice
> >> properties, e.g.:
> >> - A pipeline that has only bounded collections can be run by any runner
> in
> >> any mode, with the same results and termination behavior (this is
> actually
> >> my motivating example for raising this issue is: I was running
> Splittable
> >> DoFn tests
> >> <https://github.com/apache/beam/blob/master/sdks/java/core/
> >> src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java>
> >> with the streaming Dataflow runner - these tests produce only bounded
> >> collections - and noticed that they wouldn't terminate even though all
> >> data
> >> was processed)
> >> - It will be possible to implement pipelines that stream data for a
> while
> >> and then eventually successfully terminate based on some condition.
> E.g. a
> >> pipeline that watches a continuously growing file until it is marked
> >> read-only, or a pipeline that reads a Kafka topic partition until it
> >> receives a "poison pill" message. This seems handy.
> >>
> >>
> > --
> > Jean-Baptiste Onofré
> > jbono...@apache.org
> > http://blog.nanthrax.net
> > Talend - http://www.talend.com
> >
>

Reply via email to