I see, I haven't thought about watermarks for that but it makes sense. In
Flink, we could just observe watermarks in the sources and shut down
sources when we see a Long.MAX_VALUE watermark. This in turn would bring
down the whole pipeline, starting from the sources.

On Tue, 24 May 2016 at 00:07 Thomas Groh <tg...@google.com.invalid> wrote:

> This is different than the quiescence property proposed in the document -
> quiescence is an idleness property ("the pipeline cannot make progress"),
> but not a completeness property ("the pipeline will never make progress").
>
> However, the existing property of watermarks does mostly solve this problem
> - given that allowed lateness is finite, if all of the root transforms of a
> Pipeline advance their watermarks to positive infinity, the pipeline will
> be complete (as all new inputs are droppably late, so they will be
> dropped). This also causes all windows that contain elements to be closed
> (which will cause the execution of the appropriate PAsserts on those
> windows). This notion of completion is runner-independent; however,
> shutting down the pipeline requires runners to either provide hooks to
> allow users to observe this completed state, or the runner to notice that
> all PTransforms have completed and shut down the pipeline. Notably, this
> notion of completion is simpler than quiescence (as it only requires access
> to the watermarks of the system), so runners can implement it
> independently.
>
> On Mon, May 16, 2016 at 9:00 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> > Hi,
> > sorry for resurrecting such an old thread but are there already thoughts
> on
> > how the quiescence handling will work for runner-independent tests?
> >
> > I was thinking about how to make the RunnableOnService tests work when
> > executed in "true-streaming" mode, i.e. when the job would normally never
> > finish? Right now, the tests work because the sources finish at some
> point
> > and we verify that the PAssert DoFn sees the correct results. With
> > streaming runners this "finished" bit is hard to do and I feel that it is
> > related to the quiescence idea expression in the document.
> >
> > Cheers,
> > Aljoscha
> >
> > On Thu, 31 Mar 2016 at 19:32 Ben Chambers <bchamb...@google.com.invalid>
> > wrote:
> >
> > > On Mon, Mar 28, 2016 at 4:29 PM Robert Bradshaw
> > > <rober...@google.com.invalid>
> > > wrote:
> > >
> > > > On Fri, Mar 25, 2016 at 4:28 PM, Ben Chambers
> > > > <bchamb...@google.com.invalid> wrote:
> > > > > My only concern is that in the example, you first need to declare
> all
> > > the
> > > > > inputs, then the pipeline to be tested, then all the outputs. This
> > can
> > > > lead
> > > > > to tests that are hard to follow, since what you're really testing
> is
> > > an
> > > > > interleaving more like "When these inputs arrive, I get this
> output.
> > > Then
> > > > > when this happens, I get that output. Etc.".
> > > >
> > > > +1 to pursuing this direction.
> > > >
> > > > > What if instea of returning a PTransform<PBegin, PCollection<Long>>
> > we
> > > > had
> > > > > a "TestSource".
> > > >
> > > > I think TestSource is a PTransform<PBegin, PCollection<Long>>.
> > > >
> > >
> > > Maybe? If we want it to easily support multiple inputs, maybe you do
> > > `testSource.getInput(tag)` to get the `PTransform<PBegin,
> > PCollection<T>>`
> > > associated with a given tag? But yes, I intended the `TestSource` to be
> > > usable within the pipeline to actually produce the data.
> > >
> > > >
> > > > > so we did something like:
> > > > >
> > > > > TestPipeline p = TestPipeline.create();
> > > > > TestSource source = p.testSource();
> > > > >
> > > > > // Set up pipeline reading from source.
> > > > > PCollection<Long> sum = ...;
> > > >
> > > > I'm really curious what the "..." looks like. How are we using the
> > > source?
> > > >
> > >
> > > Either `p.apply(source)` or `p.apply(source.forTag(tag))`. Not sure
> about
> > > naming, of course.
> > >
> > > >
> > > > > BeamAssert sumAssert = BeamAssert.sum();
> > > >
> > > > Did you mean BeamAssert.that(sum)?
> > > >
> > >
> > > Almost certainly. Or maybe `BeamAssert.on(sum)`. But something like
> that.
> > >
> > > > // Test for the Speculative Pane
> > > > > source.addElements(...);
> > > > > source.advanceWatermark(...);
> > > > > sumAssert.thatWindowPane(...);
> > > > >
> > > > > // Test for the On Time Pane
> > > > > source.addElements(...)
> > > > > source.advanceWatermark(...);
> > > > > sumAssert.thatWindowPane(...);
> > > > >
> > > > > etc.
> > > >
> > > > Is there a p.run() at the end?
> > > >
> > >
> > > Almost certainly.
> > >
> > >
> > > > > We could also allow TestSource to work with multiple input
> pipelines
> > > like
> > > > > this:
> > > > >
> > > > > TestSource<Integer> intSource = p.testSource(new
> > > > TypeDescriptor<Integer>());
> > > > > TestSource<Long> longSource = p.testSource(new
> > TypeDescriptor<Long>());
> > > > > ...
> > > > > intSource.addElements(...);
> > > > > longSource.addElements(...);
> > > > > etc.
> > > >
> > > > Would we get at total ordering on the addition of
> elements/advancement
> > > > of watermarks across sources by the temporal ordering of these
> > > > operations in the users program (e.g. by incrementing some global
> > > > counter)?
> > > >
> > >
> > > Ideally? I was focusing on the interleaving of inputs/assertions, but
> we
> > > can talk more about this.
> > >
> > >
> > > > > On Fri, Mar 25, 2016 at 4:08 PM Thomas Groh
> <tg...@google.com.invalid
> > >
> > > > > wrote:
> > > > >
> > > > >> Hey everyone;
> > > > >>
> > > > >> I'd still be happy to get feedback. I'm going to start working on
> > this
> > > > >> early next week
> > > > >>
> > > > >> Thanks,
> > > > >>
> > > > >> Thomas
> > > > >>
> > > > >> On Mon, Mar 21, 2016 at 5:38 PM, Thomas Groh <tg...@google.com>
> > > wrote:
> > > > >>
> > > > >> > Hey everyone,
> > > > >> >
> > > > >> > I've been working on a proposal to expand the capabilities of
> our
> > > > testing
> > > > >> > API, mostly around writing deterministic tests for pipelines
> that
> > > have
> > > > >> > interesting triggering behavior, especially speculative and late
> > > > >> triggers.
> > > > >> >
> > > > >> > I've shared a doc here
> > > > >> > <
> > > > >>
> > > >
> > >
> >
> https://docs.google.com/document/d/1fZUUbG2LxBtqCVabQshldXIhkMcXepsbv2vuuny8Ix4/edit?usp=sharing
> > > > >
> > > > >> containing
> > > > >> > the proposal and some examples, with world comment access +
> > explicit
> > > > >> > committer edit access. I'd welcome any feedback you all have.
> > > > >> >
> > > > >> > Thanks,
> > > > >> >
> > > > >> > Thomas
> > > > >> >
> > > > >>
> > > >
> > >
> >
>

Reply via email to