I see, I haven't thought about watermarks for that but it makes sense. In Flink, we could just observe watermarks in the sources and shut down sources when we see a Long.MAX_VALUE watermark. This in turn would bring down the whole pipeline, starting from the sources.
On Tue, 24 May 2016 at 00:07 Thomas Groh <tg...@google.com.invalid> wrote: > This is different than the quiescence property proposed in the document - > quiescence is an idleness property ("the pipeline cannot make progress"), > but not a completeness property ("the pipeline will never make progress"). > > However, the existing property of watermarks does mostly solve this problem > - given that allowed lateness is finite, if all of the root transforms of a > Pipeline advance their watermarks to positive infinity, the pipeline will > be complete (as all new inputs are droppably late, so they will be > dropped). This also causes all windows that contain elements to be closed > (which will cause the execution of the appropriate PAsserts on those > windows). This notion of completion is runner-independent; however, > shutting down the pipeline requires runners to either provide hooks to > allow users to observe this completed state, or the runner to notice that > all PTransforms have completed and shut down the pipeline. Notably, this > notion of completion is simpler than quiescence (as it only requires access > to the watermarks of the system), so runners can implement it > independently. > > On Mon, May 16, 2016 at 9:00 AM, Aljoscha Krettek <aljos...@apache.org> > wrote: > > > Hi, > > sorry for resurrecting such an old thread but are there already thoughts > on > > how the quiescence handling will work for runner-independent tests? > > > > I was thinking about how to make the RunnableOnService tests work when > > executed in "true-streaming" mode, i.e. when the job would normally never > > finish? Right now, the tests work because the sources finish at some > point > > and we verify that the PAssert DoFn sees the correct results. With > > streaming runners this "finished" bit is hard to do and I feel that it is > > related to the quiescence idea expression in the document. > > > > Cheers, > > Aljoscha > > > > On Thu, 31 Mar 2016 at 19:32 Ben Chambers <bchamb...@google.com.invalid> > > wrote: > > > > > On Mon, Mar 28, 2016 at 4:29 PM Robert Bradshaw > > > <rober...@google.com.invalid> > > > wrote: > > > > > > > On Fri, Mar 25, 2016 at 4:28 PM, Ben Chambers > > > > <bchamb...@google.com.invalid> wrote: > > > > > My only concern is that in the example, you first need to declare > all > > > the > > > > > inputs, then the pipeline to be tested, then all the outputs. This > > can > > > > lead > > > > > to tests that are hard to follow, since what you're really testing > is > > > an > > > > > interleaving more like "When these inputs arrive, I get this > output. > > > Then > > > > > when this happens, I get that output. Etc.". > > > > > > > > +1 to pursuing this direction. > > > > > > > > > What if instea of returning a PTransform<PBegin, PCollection<Long>> > > we > > > > had > > > > > a "TestSource". > > > > > > > > I think TestSource is a PTransform<PBegin, PCollection<Long>>. > > > > > > > > > > Maybe? If we want it to easily support multiple inputs, maybe you do > > > `testSource.getInput(tag)` to get the `PTransform<PBegin, > > PCollection<T>>` > > > associated with a given tag? But yes, I intended the `TestSource` to be > > > usable within the pipeline to actually produce the data. > > > > > > > > > > > > so we did something like: > > > > > > > > > > TestPipeline p = TestPipeline.create(); > > > > > TestSource source = p.testSource(); > > > > > > > > > > // Set up pipeline reading from source. > > > > > PCollection<Long> sum = ...; > > > > > > > > I'm really curious what the "..." looks like. How are we using the > > > source? > > > > > > > > > > Either `p.apply(source)` or `p.apply(source.forTag(tag))`. Not sure > about > > > naming, of course. > > > > > > > > > > > > BeamAssert sumAssert = BeamAssert.sum(); > > > > > > > > Did you mean BeamAssert.that(sum)? > > > > > > > > > > Almost certainly. Or maybe `BeamAssert.on(sum)`. But something like > that. > > > > > > > // Test for the Speculative Pane > > > > > source.addElements(...); > > > > > source.advanceWatermark(...); > > > > > sumAssert.thatWindowPane(...); > > > > > > > > > > // Test for the On Time Pane > > > > > source.addElements(...) > > > > > source.advanceWatermark(...); > > > > > sumAssert.thatWindowPane(...); > > > > > > > > > > etc. > > > > > > > > Is there a p.run() at the end? > > > > > > > > > > Almost certainly. > > > > > > > > > > > We could also allow TestSource to work with multiple input > pipelines > > > like > > > > > this: > > > > > > > > > > TestSource<Integer> intSource = p.testSource(new > > > > TypeDescriptor<Integer>()); > > > > > TestSource<Long> longSource = p.testSource(new > > TypeDescriptor<Long>()); > > > > > ... > > > > > intSource.addElements(...); > > > > > longSource.addElements(...); > > > > > etc. > > > > > > > > Would we get at total ordering on the addition of > elements/advancement > > > > of watermarks across sources by the temporal ordering of these > > > > operations in the users program (e.g. by incrementing some global > > > > counter)? > > > > > > > > > > Ideally? I was focusing on the interleaving of inputs/assertions, but > we > > > can talk more about this. > > > > > > > > > > > On Fri, Mar 25, 2016 at 4:08 PM Thomas Groh > <tg...@google.com.invalid > > > > > > > > wrote: > > > > > > > > > >> Hey everyone; > > > > >> > > > > >> I'd still be happy to get feedback. I'm going to start working on > > this > > > > >> early next week > > > > >> > > > > >> Thanks, > > > > >> > > > > >> Thomas > > > > >> > > > > >> On Mon, Mar 21, 2016 at 5:38 PM, Thomas Groh <tg...@google.com> > > > wrote: > > > > >> > > > > >> > Hey everyone, > > > > >> > > > > > >> > I've been working on a proposal to expand the capabilities of > our > > > > testing > > > > >> > API, mostly around writing deterministic tests for pipelines > that > > > have > > > > >> > interesting triggering behavior, especially speculative and late > > > > >> triggers. > > > > >> > > > > > >> > I've shared a doc here > > > > >> > < > > > > >> > > > > > > > > > > https://docs.google.com/document/d/1fZUUbG2LxBtqCVabQshldXIhkMcXepsbv2vuuny8Ix4/edit?usp=sharing > > > > > > > > > >> containing > > > > >> > the proposal and some examples, with world comment access + > > explicit > > > > >> > committer edit access. I'd welcome any feedback you all have. > > > > >> > > > > > >> > Thanks, > > > > >> > > > > > >> > Thomas > > > > >> > > > > > >> > > > > > > > > > >