On Thu, Apr 28, 2016 at 1:26 AM, Aljoscha Krettek <[email protected]>
wrote:

> Bump.
>
> I'm afraid this might have gotten lost during the conferences/summits.
>
> On Thu, 21 Apr 2016 at 13:30 Aljoscha Krettek <[email protected]> wrote:
>
> > Ok, I'll try and start such a design. Before I can start, I have a few
> > questions about how the side inputs actually work.  Some of it is in the
> > docs, but some is also conjecture on my part about how MillWheel/Windmill
> > (I don't know, is the first the system and the second a worker in there?)
> > works.
> >
> > I'll write the questions as a set of assumptions and please correct me on
> > those where I'm wrong:
>

Sorry for the delayed reply. I may have taken you too literally. Since
nothing seemed wrong, I didn't say anything at all :-)


>
> > 1. Side inputs are always global/broadcast, they are never scoped to a
> key.
>

True. A side input is a way of reading/viewing an entire PCollection.


> 2. Mapping of main-input window to side-input window is done by the
> > side-input WindowFn.
>

True.


> If the side input has a larger window than the main
> > input, processing of main-input elements will block until the side input
> > becomes available. (Side inputs for a larger side-input window can become
> > available early if using a speculative trigger)
>

True to this point: The main input element waits for the necessary side
input window to be ready. It doesn't necessarily have to do with window
size. It could just be a scheduling thing, or other runner-specific reason.


>
> > 2.5 If we update the side input because a speculative trigger fires again
> > we don't reprocess the main-input elements that were already processed.
> > Only new elements see the updated side input.
>

True. This is a place where the decision is due mostly to feasibility of
implementation. It is easy to create a pipeline where this behavior is not
ideal.


> 3. The decision about whether side inputs are "available", i.e. complete
> > in case of list/map side inputs is made by a Trigger. (This is also true
> > for updates to side input caused by speculative triggers firing again.)
> > This uses the continuation trigger feature, which is easy for time
> triggers
> > and interesting for the AfterPane.elementCountAtLeast() trigger which
> > changes to AfterPane.elementCountAtLeast(1) on continuation and other
> > speculative/late triggers.
>

True up to this point: The side input is considered ready when there has
been any data output/added to the PCollection that it is being read as a
side input. So the upstream trigger controls this. I don't know if the
continuation trigger is important. The goal of the continuation trigger is
to handle an implementation detail: Once an upstream trigger has already
regulated the flow, we want downstream stuff to just proceed as fast as
reasonable.


> 4. About the StreamingSideInputDoFnRunner.java
> > <
> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/1e5524a7f5d0d774488cb0206ea6433085461775/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingSideInputDoFnRunner.java>
> posted
> > by Kenneth. It uses StateInternals to buffer the elements of the main
> input
> > in a BagState.  I was under the assumption that state is always scoped
> to a
> > key but side inputs can also be used on non-keyed streams. In this case,
> > the state is scoped to the key group (the unit/granularity that is used
> to
> > rebalance in case of rescaling) and when we access the state we get all
> > elements for the key groups for which our parallel worker instance is
> > responsible. (This is the part where I am completely unsure about what is
> > happening... :-O)
>

In this case, the key that is scoping the state is just the sharding key
for the computation, just like you say. It may not be the actual key of a
KV element, and the PCollection may not even be keyed, but the computation
always has such a sharding.

I believe replacing the runner-specific pieces of that code might be most
of the way to what you want. And I think the runner-specific pieces are
really just there because it didn't matter to make it more general, so it
will be easy to change. The upcoming in-process runner is developing some
abstractions in this area. But I can easily believe that there are other
ways of designing this. For example in batch mode we simply schedule the
side input first. Even in streaming style, the runner backend might be able
to take some more responsibility. So that code is just one point in the
design space.

Kenn

Reply via email to