Bump. I'm afraid this might have gotten lost during the conferences/summits.
On Thu, 21 Apr 2016 at 13:30 Aljoscha Krettek <aljos...@apache.org> wrote: > Ok, I'll try and start such a design. Before I can start, I have a few > questions about how the side inputs actually work. Some of it is in the > docs, but some is also conjecture on my part about how MillWheel/Windmill > (I don't know, is the first the system and the second a worker in there?) > works. > > I'll write the questions as a set of assumptions and please correct me on > those where I'm wrong: > > 1. Side inputs are always global/broadcast, they are never scoped to a key. > > 2. Mapping of main-input window to side-input window is done by the > side-input WindowFn. If the side input has a larger window than the main > input, processing of main-input elements will block until the side input > becomes available. (Side inputs for a larger side-input window can become > available early if using a speculative trigger) > > 2.5 If we update the side input because a speculative trigger fires again > we don't reprocess the main-input elements that were already processed. > Only new elements see the updated side input. > > 3. The decision about whether side inputs are "available", i.e. complete > in case of list/map side inputs is made by a Trigger. (This is also true > for updates to side input caused by speculative triggers firing again.) > This uses the continuation trigger feature, which is easy for time triggers > and interesting for the AfterPane.elementCountAtLeast() trigger which > changes to AfterPane.elementCountAtLeast(1) on continuation and other > speculative/late triggers. > > 4. About the StreamingSideInputDoFnRunner.java > <https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/1e5524a7f5d0d774488cb0206ea6433085461775/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingSideInputDoFnRunner.java> > posted > by Kenneth. It uses StateInternals to buffer the elements of the main input > in a BagState. I was under the assumption that state is always scoped to a > key but side inputs can also be used on non-keyed streams. In this case, > the state is scoped to the key group (the unit/granularity that is used to > rebalance in case of rescaling) and when we access the state we get all > elements for the key groups for which our parallel worker instance is > responsible. (This is the part where I am completely unsure about what is > happening... :-O) > > These are the ones I can come up with for now. :-) > > On Wed, 20 Apr 2016 at 23:25 Davor Bonaci <da...@google.com.invalid> > wrote: > >> If we come up with a general approach in the context of the Flink runner, >> perhaps that piece can go back to the "runner-core" component and be >> adopted more widely. >> >> On Wed, Apr 20, 2016 at 8:13 AM, Kenneth Knowles <k...@google.com.invalid> >> wrote: >> >> > Hi Aljoscha, >> > >> > Great idea. >> > >> > - The logic for matching up the windows is WindowFn#getSideInputWindow >> [1] >> > - The SDK used to have something along the lines of what you describe >> [2] >> > but we thought it was too runner-specific, directly referencing Dataflow >> > details, and with a particular model of buffering + timer. Perhaps it >> is a >> > starting place for your design? >> > >> > Kenn >> > >> > [1] >> > >> > >> https://github.com/apache/incubator-beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java#L131 >> > >> > [2] >> > >> > >> https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/1e5524a7f5d0d774488cb0206ea6433085461775/sdk/src/main/java/com/google/cloud/dataflow/sdk/runners/worker/StreamingSideInputDoFnRunner.java >> > >> > On Wed, Apr 20, 2016 at 4:25 AM, Jean-Baptiste Onofré <j...@nanthrax.net> >> > wrote: >> > >> > > Hi Aljoscha >> > > >> > > AFAIR, the Runner API Proposal document (from Kenneth) contains some >> > > points about side input. >> > > >> > > >> > > >> > >> https://drive.google.com/folderview?id=0B-IhJZh9Ab52OFBVZHpsNjc4eXc&usp=sharing >> > > >> > > I don't think it goes into the details of side inputs and windows, but >> > > definitely the document we should extend. >> > > >> > > Regards >> > > JB >> > > >> > > >> > > >> > > On 04/20/2016 11:55 AM, Aljoscha Krettek wrote: >> > > >> > >> Hi, >> > >> for https://issues.apache.org/jira/browse/BEAM-102 we will need to >> have >> > >> some functionality that deals with side inputs and windows (of both >> the >> > >> main input and the side inputs) and how they get matched and how we >> wait >> > >> for windows (blocking). I imagine that we could add some component >> that >> > is >> > >> similar to ReduceFnRunner but for side inputs: We would just >> instantiate >> > >> it >> > >> with a factory for state storage, then push elements into it while >> > >> processing and it would provide a way to get a SideInputReader. >> > >> >> > >> I think this would not be specific to the Flink runner because other >> > >> runner >> > >> implementors will face similar problems. Are there any ideas/design >> docs >> > >> about such a thing already? If not, we should probably start >> designing. >> > >> >> > >> What do you think? >> > >> >> > >> Cheers, >> > >> Aljoscha >> > >> >> > >> >> > > -- >> > > Jean-Baptiste Onofré >> > > jbono...@apache.org >> > > http://blog.nanthrax.net >> > > Talend - http://www.talend.com >> > > >> > >> >