Raising this onto the mailing list from
https://issues.apache.org/jira/browse/BEAM-849

The issue came up: what does it mean for a pipeline to finish, in the Beam
model?

Note that I am deliberately not talking about "batch" and "streaming"
pipelines, because this distinction does not exist in the model. Several
runners have batch/streaming *modes*, which implement the same semantics
(potentially different subsets: in batch mode typically a runner will
reject pipelines that have at least one unbounded PCollection) but in an
operationally different way. However we should define pipeline termination
at the level of the unified model, and then make sure that all runners in
all modes implement that properly.

One natural way is to say "a pipeline terminates when the output watermarks
of all of its PCollection's progress to +infinity". (Note: this can be
generalized, I guess, to having partial executions of a pipeline: if you're
interested in the full contents of only some collections, then you wait
until only the watermarks of those collections progress to infinity)

A typical "batch" runner mode does not implement watermarks - we can think
of it as assigning watermark -infinity to an output of a transform that
hasn't started executing yet, and +infinity to output of a transform that
has finished executing. This is consistent with how such runners implement
termination in practice.

Dataflow streaming runner additionally implements such termination for
pipeline drain operation: it has 2 parts: 1) stop consuming input from the
sources, and 2) wait until all watermarks progress to infinity.

Let us fill the gap by making this part of the Beam model and declaring
that all runners should implement this behavior. This will give nice
properties, e.g.:
- A pipeline that has only bounded collections can be run by any runner in
any mode, with the same results and termination behavior (this is actually
my motivating example for raising this issue is: I was running Splittable
DoFn tests
<https://github.com/apache/beam/blob/master/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java>
with the streaming Dataflow runner - these tests produce only bounded
collections - and noticed that they wouldn't terminate even though all data
was processed)
- It will be possible to implement pipelines that stream data for a while
and then eventually successfully terminate based on some condition. E.g. a
pipeline that watches a continuously growing file until it is marked
read-only, or a pipeline that reads a Kafka topic partition until it
receives a "poison pill" message. This seems handy.

Reply via email to