Ok, I'll try and start such a design. Before I can start, I have a few
questions about how the side inputs actually work.  Some of it is in the
docs, but some is also conjecture on my part about how MillWheel/Windmill
(I don't know, is the first the system and the second a worker in there?)
works.

I'll write the questions as a set of assumptions and please correct me on
those where I'm wrong:

1. Side inputs are always global/broadcast, they are never scoped to a key.

2. Mapping of main-input window to side-input window is done by the
side-input WindowFn. If the side input has a larger window than the main
input, processing of main-input elements will block until the side input
becomes available. (Side inputs for a larger side-input window can become
available early if using a speculative trigger)

2.5 If we update the side input because a speculative trigger fires again
we don't reprocess the main-input elements that were already processed.
Only new elements see the updated side input.

3. The decision about whether side inputs are "available", i.e. complete in
case of list/map side inputs is made by a Trigger. (This is also true for
updates to side input caused by speculative triggers firing again.) This
uses the continuation trigger feature, which is easy for time triggers and
interesting for the AfterPane.elementCountAtLeast() trigger which changes
to AfterPane.elementCountAtLeast(1) on continuation and other
speculative/late triggers.

4. About the StreamingSideInputDoFnRunner.java
<https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/1e5524a7f5d0d774488cb0206ea6433085461775/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingSideInputDoFnRunner.java>
posted
by Kenneth. It uses StateInternals to buffer the elements of the main input
in a BagState.  I was under the assumption that state is always scoped to a
key but side inputs can also be used on non-keyed streams. In this case,
the state is scoped to the key group (the unit/granularity that is used to
rebalance in case of rescaling) and when we access the state we get all
elements for the key groups for which our parallel worker instance is
responsible. (This is the part where I am completely unsure about what is
happening... :-O)

These are the ones I can come up with for now. :-)

On Wed, 20 Apr 2016 at 23:25 Davor Bonaci <da...@google.com.invalid> wrote:

> If we come up with a general approach in the context of the Flink runner,
> perhaps that piece can go back to the "runner-core" component and be
> adopted more widely.
>
> On Wed, Apr 20, 2016 at 8:13 AM, Kenneth Knowles <k...@google.com.invalid>
> wrote:
>
> > Hi Aljoscha,
> >
> > Great idea.
> >
> >  - The logic for matching up the windows is WindowFn#getSideInputWindow
> [1]
> >  - The SDK used to have something along the lines of what you describe
> [2]
> > but we thought it was too runner-specific, directly referencing Dataflow
> > details, and with a particular model of buffering + timer. Perhaps it is
> a
> > starting place for your design?
> >
> > Kenn
> >
> > [1]
> >
> >
> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java#L131
> >
> > [2]
> >
> >
> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/1e5524a7f5d0d774488cb0206ea6433085461775/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingSideInputDoFnRunner.java
> >
> > On Wed, Apr 20, 2016 at 4:25 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
> > wrote:
> >
> > > Hi Aljoscha
> > >
> > > AFAIR, the Runner API Proposal document (from Kenneth) contains some
> > > points about side input.
> > >
> > >
> > >
> >
> https://drive.google.com/folderview?id=0B-IhJZh9Ab52OFBVZHpsNjc4eXc&usp=sharing
> > >
> > > I don't think it goes into the details of side inputs and windows, but
> > > definitely the document we should extend.
> > >
> > > Regards
> > > JB
> > >
> > >
> > >
> > > On 04/20/2016 11:55 AM, Aljoscha Krettek wrote:
> > >
> > >> Hi,
> > >> for https://issues.apache.org/jira/browse/BEAM-102 we will need to
> have
> > >> some functionality that deals with side inputs and windows (of both
> the
> > >> main input and the side inputs) and how they get matched and how we
> wait
> > >> for windows (blocking). I imagine that we could add some component
> that
> > is
> > >> similar to ReduceFnRunner but for side inputs: We would just
> instantiate
> > >> it
> > >> with a factory for state storage, then push elements into it while
> > >> processing and it would provide a way to get a SideInputReader.
> > >>
> > >> I think this would not be specific to the Flink runner because other
> > >> runner
> > >> implementors will face similar problems. Are there any ideas/design
> docs
> > >> about such a thing already? If not, we should probably start
> designing.
> > >>
> > >> What do you think?
> > >>
> > >> Cheers,
> > >> Aljoscha
> > >>
> > >>
> > > --
> > > Jean-Baptiste Onofré
> > > jbono...@apache.org
> > > http://blog.nanthrax.net
> > > Talend - http://www.talend.com
> > >
> >
>

Reply via email to