Ah, this piece of Code (the DataflowPipelineRunner) explains why I was talking about the continuation trigger in our earlier discussions on side inputs. If you have code like this:
PCollection<> input = ... PCollectionView<> view = input .apply(Window.into(...)) .apply(Combine.Globally(...)) .asSingletonView() PCollection<> input2 = ... input2. .apply(ParDo.withSideInputs(view).of(...)) the continuation trigger of the Trigger specified in the Window.into() will decide when the side input is ready by controlling (implicitly) when the "Combine.globally(Concatenate)" fires. I wasn't aware of this inserted Combine.globally(Concatenate) so I thought this logic was handled by the SideInputDoFnRunner directly. I think, however, that the behavior would be the same if the SideInputDoFnRunner had the Trigger and would perform the Concatenate internally. On Fri, 13 May 2016 at 18:24 Kenneth Knowles <[email protected]> wrote: > I think it may help to unpack the override & expansion of View.asXYZ() in > the DataflowPipelineRunner [1] and the InProcessPipelineRunner [2] > > Each of these does: > > 1. some preparation, perhaps > 2. concatenate the side input PColl into a single iterable (there's a GBK > here; triggering) > 3. materialize that iterable in a runner-specific way (for you, perhaps > this is a write to a BroadcastVariable) > > So most everything said on this thread is right, but it is helpful to > distinguish elements of the incoming PCollection being viewed from elements > written to the side input in step (3) because of triggering in step (2). > This is why in the global window you don't see the side input window ready > when the first element arrives, but when the whole window triggers. > > This expansion is not something we envision being owned by the SDK, longer > term, but is probably the best way to go right now. > > [1] > > https://github.com/apache/incubator-beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java#L2732 > [2] > > https://github.com/apache/incubator-beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java#L103 > > Kenn > > On Fri, May 13, 2016 at 8:48 AM, Lukasz Cwik <[email protected]> > wrote: > > > A side input window becomes "ready" as soon as a trigger has fired > > producing data within the PCollection which the side input view would be > > over. > > > > For example, if the side input window is in the global window with an > after > > watermark trigger, it will fire once when all the data has been processed > > along the side input path since the watermark will go from negative > > infinity to positive infinity. This is the canonical way of how to load a > > static dataset to use as a side input for streaming. Generally, the main > > input will need to block till at least one pane has been output into the > > side input PCollection. > > > > > > On Fri, May 13, 2016 at 7:01 AM, Aljoscha Krettek <[email protected]> > > wrote: > > > > > Hi, > > > in streaming, side input for a window is considered ready as soon as at > > > least one element is ready, this is the same for all kinds of side > > inputs, > > > i.e. List, Map, Singleton. This means that successive main-input > elements > > > can see a different side input List if more side-input elements keep > > > arriving. Side input is also never scoped to a key, but always global > > > (broadcast), that is if you have a Map you get the whole Map<K, V> from > > > your c.sideInput() call. > > > > > > At least that's what I gathered from discussions on the ML with > Kenneth. > > > And that's why Stephan and I where wondering about the "correctness > > > guarantees" that this gives and whether this is enough for most common > > use > > > cases. > > > > > > Cheers, > > > Aljoscha > > > > > > On Fri, 13 May 2016 at 14:44 Maximilian Michels <[email protected]> > wrote: > > > > > > > Hi Stephan, > > > > > > > > As far as I understand side inputs, by definition, always need to be > > > > "ready" before processing of any kind can start. What is considered > > > > ready depends on the type of side input. If you use View.asList() or > > > > View.asSingleton() then the whole side input needs to be > materialized. > > > > On the other hand, if you use View.asIterable(), processing can start > > > > once the the first element arrives. > > > > > > > > If the side input itself is windowed, then the notion of "ready" only > > > > applies to the individual windows. Side Input itself can also be > > > > scoped by key if you use the View.asMap() or View.asMultimap() side > > > > inputs views. > > > > > > > > From a quick look at the InProcessRunner it appears that processing > > > > does not start until the side input of the window is ready. Beam > > > > experts, please correct me if I got this wrong. > > > > > > > > Cheers, > > > > Max > > > > > > > > On Fri, May 13, 2016 at 1:12 PM, Stephan Ewen <[email protected]> > > wrote: > > > > > Hi! > > > > > > > > > > Aljoscha and me have been going through the side inputs quite a > bit, > > > and > > > > we > > > > > were wondering about the following: > > > > > > > > > > How does one properly join a static data set with a stream?. > > > > > > > > > > This sounds like a job for a side input, but would require that the > > > side > > > > > input materializes the initial static data before the main input > can > > > > begin > > > > > processing. > > > > > > > > > > Given that the static data set is in a global window, and the Beam > > side > > > > > inputs only wait for the first element in the window to be > available, > > > the > > > > > main input would start joining against the side input prematurely. > > > > > > > > > > Is this simply considered an uncommon use case, or is there a way > to > > > > > realize this that we overlooked? > > > > > > > > > > Greetings, > > > > > Stephan > > > > > > > > > >
