No worries :-) and thanks for the detailed answers!

I still have one question, though: you wrote that "The side input is
considered ready when there has been any data output/added to the
PCollection that it is being read as a side input. So the upstream trigger
controls this." How does this work with side inputs that consist of
multiple elements, i.e. ListPCollectionView and MapPCollectionView. For
them, do we also consider the side input as ready once the first element
arrives? That's why I was wondering about the triggers being responsible
for deciding when a side input is ready.

Cheers,
Aljoscha

On Thu, 28 Apr 2016 at 18:55 Kenneth Knowles <k...@google.com.invalid> wrote:

> On Thu, Apr 28, 2016 at 1:26 AM, Aljoscha Krettek <aljos...@apache.org>
> wrote:
>
> > Bump.
> >
> > I'm afraid this might have gotten lost during the conferences/summits.
> >
> > On Thu, 21 Apr 2016 at 13:30 Aljoscha Krettek <aljos...@apache.org>
> wrote:
> >
> > > Ok, I'll try and start such a design. Before I can start, I have a few
> > > questions about how the side inputs actually work.  Some of it is in
> the
> > > docs, but some is also conjecture on my part about how
> MillWheel/Windmill
> > > (I don't know, is the first the system and the second a worker in
> there?)
> > > works.
> > >
> > > I'll write the questions as a set of assumptions and please correct me
> on
> > > those where I'm wrong:
> >
>
> Sorry for the delayed reply. I may have taken you too literally. Since
> nothing seemed wrong, I didn't say anything at all :-)
>
>
> >
> > > 1. Side inputs are always global/broadcast, they are never scoped to a
> > key.
> >
>
> True. A side input is a way of reading/viewing an entire PCollection.
>
>
> > 2. Mapping of main-input window to side-input window is done by the
> > > side-input WindowFn.
> >
>
> True.
>
>
> > If the side input has a larger window than the main
> > > input, processing of main-input elements will block until the side
> input
> > > becomes available. (Side inputs for a larger side-input window can
> become
> > > available early if using a speculative trigger)
> >
>
> True to this point: The main input element waits for the necessary side
> input window to be ready. It doesn't necessarily have to do with window
> size. It could just be a scheduling thing, or other runner-specific reason.
>
>
> >
> > > 2.5 If we update the side input because a speculative trigger fires
> again
> > > we don't reprocess the main-input elements that were already processed.
> > > Only new elements see the updated side input.
> >
>
> True. This is a place where the decision is due mostly to feasibility of
> implementation. It is easy to create a pipeline where this behavior is not
> ideal.
>
>
> > 3. The decision about whether side inputs are "available", i.e. complete
> > > in case of list/map side inputs is made by a Trigger. (This is also
> true
> > > for updates to side input caused by speculative triggers firing again.)
> > > This uses the continuation trigger feature, which is easy for time
> > triggers
> > > and interesting for the AfterPane.elementCountAtLeast() trigger which
> > > changes to AfterPane.elementCountAtLeast(1) on continuation and other
> > > speculative/late triggers.
> >
>
> True up to this point: The side input is considered ready when there has
> been any data output/added to the PCollection that it is being read as a
> side input. So the upstream trigger controls this. I don't know if the
> continuation trigger is important. The goal of the continuation trigger is
> to handle an implementation detail: Once an upstream trigger has already
> regulated the flow, we want downstream stuff to just proceed as fast as
> reasonable.
>
>
> > 4. About the StreamingSideInputDoFnRunner.java
> > > <
> >
> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/1e5524a7f5d0d774488cb0206ea6433085461775/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingSideInputDoFnRunner.java
> >
> > posted
> > > by Kenneth. It uses StateInternals to buffer the elements of the main
> > input
> > > in a BagState.  I was under the assumption that state is always scoped
> > to a
> > > key but side inputs can also be used on non-keyed streams. In this
> case,
> > > the state is scoped to the key group (the unit/granularity that is used
> > to
> > > rebalance in case of rescaling) and when we access the state we get all
> > > elements for the key groups for which our parallel worker instance is
> > > responsible. (This is the part where I am completely unsure about what
> is
> > > happening... :-O)
> >
>
> In this case, the key that is scoping the state is just the sharding key
> for the computation, just like you say. It may not be the actual key of a
> KV element, and the PCollection may not even be keyed, but the computation
> always has such a sharding.
>
> I believe replacing the runner-specific pieces of that code might be most
> of the way to what you want. And I think the runner-specific pieces are
> really just there because it didn't matter to make it more general, so it
> will be easy to change. The upcoming in-process runner is developing some
> abstractions in this area. But I can easily believe that there are other
> ways of designing this. For example in batch mode we simply schedule the
> side input first. Even in streaming style, the runner backend might be able
> to take some more responsibility. So that code is just one point in the
> design space.
>
> Kenn
>

Reply via email to