Ah, this piece of Code (the DataflowPipelineRunner) explains why I was
talking about the continuation trigger in our earlier discussions on side
inputs. If you have code like this:

PCollection<> input = ...
PCollectionView<> view = input
  .apply(Window.into(...))
  .apply(Combine.Globally(...))
  .asSingletonView()

PCollection<> input2 = ...

input2.
  .apply(ParDo.withSideInputs(view).of(...))

the continuation trigger of the Trigger specified in the Window.into() will
decide when the side input is ready by controlling (implicitly) when the
"Combine.globally(Concatenate)" fires.

I wasn't aware of this inserted Combine.globally(Concatenate) so I thought
this logic was handled by the SideInputDoFnRunner directly. I think,
however, that the behavior would be the same if the SideInputDoFnRunner had
the Trigger and would perform the Concatenate internally.

On Fri, 13 May 2016 at 18:24 Kenneth Knowles <[email protected]> wrote:

> I think it may help to unpack the override & expansion of View.asXYZ() in
> the DataflowPipelineRunner [1] and the InProcessPipelineRunner [2]
>
> Each of these does:
>
> 1. some preparation, perhaps
> 2. concatenate the side input PColl into a single iterable (there's a GBK
> here; triggering)
> 3. materialize that iterable in a runner-specific way (for you, perhaps
> this is a write to a BroadcastVariable)
>
> So most everything said on this thread is right, but it is helpful to
> distinguish elements of the incoming PCollection being viewed from elements
> written to the side input in step (3) because of triggering in step (2).
> This is why in the global window you don't see the side input window ready
> when the first element arrives, but when the whole window triggers.
>
> This expansion is not something we envision being owned by the SDK, longer
> term, but is probably the best way to go right now.
>
> [1]
>
> https://github.com/apache/incubator-beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowPipelineRunner.java#L2732
> [2]
>
> https://github.com/apache/incubator-beam/blob/master/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ViewEvaluatorFactory.java#L103
>
> Kenn
>
> On Fri, May 13, 2016 at 8:48 AM, Lukasz Cwik <[email protected]>
> wrote:
>
> > A side input window becomes "ready" as soon as a trigger has fired
> > producing data within the PCollection which the side input view would be
> > over.
> >
> > For example, if the side input window is in the global window with an
> after
> > watermark trigger, it will fire once when all the data has been processed
> > along the side input path since the watermark will go from negative
> > infinity to positive infinity. This is the canonical way of how to load a
> > static dataset to use as a side input for streaming. Generally, the main
> > input will need to block till at least one pane has been output into the
> > side input PCollection.
> >
> >
> > On Fri, May 13, 2016 at 7:01 AM, Aljoscha Krettek <[email protected]>
> > wrote:
> >
> > > Hi,
> > > in streaming, side input for a window is considered ready as soon as at
> > > least one element is ready, this is the same for all kinds of side
> > inputs,
> > > i.e. List, Map, Singleton. This means that successive main-input
> elements
> > > can see a different side input List if more side-input elements keep
> > > arriving. Side input is also never scoped to a key, but always global
> > > (broadcast), that is if you have a Map you get the whole Map<K, V> from
> > > your c.sideInput() call.
> > >
> > > At least that's what I gathered from discussions on the ML with
> Kenneth.
> > > And that's why Stephan and I where wondering about the "correctness
> > > guarantees" that this gives and whether this is enough for most common
> > use
> > > cases.
> > >
> > > Cheers,
> > > Aljoscha
> > >
> > > On Fri, 13 May 2016 at 14:44 Maximilian Michels <[email protected]>
> wrote:
> > >
> > > > Hi Stephan,
> > > >
> > > > As far as I understand side inputs, by definition, always need to be
> > > > "ready" before processing of any kind can start. What is considered
> > > > ready depends on the type of side input. If you use View.asList() or
> > > > View.asSingleton() then the whole side input needs to be
> materialized.
> > > > On the other hand, if you use View.asIterable(), processing can start
> > > > once the the first element arrives.
> > > >
> > > > If the side input itself is windowed, then the notion of "ready" only
> > > > applies to the individual windows. Side Input itself can also be
> > > > scoped by key if you use the View.asMap() or View.asMultimap() side
> > > > inputs views.
> > > >
> > > > From a quick look at the InProcessRunner it appears that processing
> > > > does not start until the side input of the window is ready. Beam
> > > > experts, please correct me if I got this wrong.
> > > >
> > > > Cheers,
> > > > Max
> > > >
> > > > On Fri, May 13, 2016 at 1:12 PM, Stephan Ewen <[email protected]>
> > wrote:
> > > > > Hi!
> > > > >
> > > > > Aljoscha and me have been going through the side inputs quite a
> bit,
> > > and
> > > > we
> > > > > were wondering about the following:
> > > > >
> > > > > How does one properly join a static data set with a stream?.
> > > > >
> > > > > This sounds like a job for a side input, but would require that the
> > > side
> > > > > input materializes the initial static data before the main input
> can
> > > > begin
> > > > > processing.
> > > > >
> > > > > Given that the static data set is in a global window, and the Beam
> > side
> > > > > inputs only wait for the first element in the window to be
> available,
> > > the
> > > > > main input would start joining against the side input prematurely.
> > > > >
> > > > > Is this simply considered an uncommon use case, or is there a way
> to
> > > > > realize this that we overlooked?
> > > > >
> > > > > Greetings,
> > > > > Stephan
> > > >
> > >
> >
>

Reply via email to