Bump.

I'm afraid this might have gotten lost during the conferences/summits.

On Thu, 21 Apr 2016 at 13:30 Aljoscha Krettek <aljos...@apache.org> wrote:

> Ok, I'll try and start such a design. Before I can start, I have a few
> questions about how the side inputs actually work.  Some of it is in the
> docs, but some is also conjecture on my part about how MillWheel/Windmill
> (I don't know, is the first the system and the second a worker in there?)
> works.
>
> I'll write the questions as a set of assumptions and please correct me on
> those where I'm wrong:
>
> 1. Side inputs are always global/broadcast, they are never scoped to a key.
>
> 2. Mapping of main-input window to side-input window is done by the
> side-input WindowFn. If the side input has a larger window than the main
> input, processing of main-input elements will block until the side input
> becomes available. (Side inputs for a larger side-input window can become
> available early if using a speculative trigger)
>
> 2.5 If we update the side input because a speculative trigger fires again
> we don't reprocess the main-input elements that were already processed.
> Only new elements see the updated side input.
>
> 3. The decision about whether side inputs are "available", i.e. complete
> in case of list/map side inputs is made by a Trigger. (This is also true
> for updates to side input caused by speculative triggers firing again.)
> This uses the continuation trigger feature, which is easy for time triggers
> and interesting for the AfterPane.elementCountAtLeast() trigger which
> changes to AfterPane.elementCountAtLeast(1) on continuation and other
> speculative/late triggers.
>
> 4. About the StreamingSideInputDoFnRunner.java
> <https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/1e5524a7f5d0d774488cb0206ea6433085461775/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingSideInputDoFnRunner.java>
>  posted
> by Kenneth. It uses StateInternals to buffer the elements of the main input
> in a BagState.  I was under the assumption that state is always scoped to a
> key but side inputs can also be used on non-keyed streams. In this case,
> the state is scoped to the key group (the unit/granularity that is used to
> rebalance in case of rescaling) and when we access the state we get all
> elements for the key groups for which our parallel worker instance is
> responsible. (This is the part where I am completely unsure about what is
> happening... :-O)
>
> These are the ones I can come up with for now. :-)
>
> On Wed, 20 Apr 2016 at 23:25 Davor Bonaci <da...@google.com.invalid>
> wrote:
>
>> If we come up with a general approach in the context of the Flink runner,
>> perhaps that piece can go back to the "runner-core" component and be
>> adopted more widely.
>>
>> On Wed, Apr 20, 2016 at 8:13 AM, Kenneth Knowles <k...@google.com.invalid>
>> wrote:
>>
>> > Hi Aljoscha,
>> >
>> > Great idea.
>> >
>> >  - The logic for matching up the windows is WindowFn#getSideInputWindow
>> [1]
>> >  - The SDK used to have something along the lines of what you describe
>> [2]
>> > but we thought it was too runner-specific, directly referencing Dataflow
>> > details, and with a particular model of buffering + timer. Perhaps it
>> is a
>> > starting place for your design?
>> >
>> > Kenn
>> >
>> > [1]
>> >
>> >
>> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java#L131
>> >
>> > [2]
>> >
>> >
>> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/1e5524a7f5d0d774488cb0206ea6433085461775/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingSideInputDoFnRunner.java
>> >
>> > On Wed, Apr 20, 2016 at 4:25 AM, Jean-Baptiste Onofré <j...@nanthrax.net>
>> > wrote:
>> >
>> > > Hi Aljoscha
>> > >
>> > > AFAIR, the Runner API Proposal document (from Kenneth) contains some
>> > > points about side input.
>> > >
>> > >
>> > >
>> >
>> https://drive.google.com/folderview?id=0B-IhJZh9Ab52OFBVZHpsNjc4eXc&usp=sharing
>> > >
>> > > I don't think it goes into the details of side inputs and windows, but
>> > > definitely the document we should extend.
>> > >
>> > > Regards
>> > > JB
>> > >
>> > >
>> > >
>> > > On 04/20/2016 11:55 AM, Aljoscha Krettek wrote:
>> > >
>> > >> Hi,
>> > >> for https://issues.apache.org/jira/browse/BEAM-102 we will need to
>> have
>> > >> some functionality that deals with side inputs and windows (of both
>> the
>> > >> main input and the side inputs) and how they get matched and how we
>> wait
>> > >> for windows (blocking). I imagine that we could add some component
>> that
>> > is
>> > >> similar to ReduceFnRunner but for side inputs: We would just
>> instantiate
>> > >> it
>> > >> with a factory for state storage, then push elements into it while
>> > >> processing and it would provide a way to get a SideInputReader.
>> > >>
>> > >> I think this would not be specific to the Flink runner because other
>> > >> runner
>> > >> implementors will face similar problems. Are there any ideas/design
>> docs
>> > >> about such a thing already? If not, we should probably start
>> designing.
>> > >>
>> > >> What do you think?
>> > >>
>> > >> Cheers,
>> > >> Aljoscha
>> > >>
>> > >>
>> > > --
>> > > Jean-Baptiste Onofré
>> > > jbono...@apache.org
>> > > http://blog.nanthrax.net
>> > > Talend - http://www.talend.com
>> > >
>> >
>>
>

Reply via email to