Hi, Aljoscha, Thanks for the analysis. I also agree with the separated
window handling. I am also grad to contribute too. Is there any issue which
is not picked yet? Feel free to count me in. We have removed the
restriction that connected stream cannot be one side keyed and the other
unkeyed to support side input temporarily in our own branch.
Looking forward to the availability of side input in API.


Best regards,
Wenlong

On 22 March 2017 at 01:12, Aljoscha Krettek <aljos...@apache.org> wrote:

> Alright! I created an umbrella Jira issue: https://issues.apache.org/
> jira/browse/FLINK-6131 which has three sub issues:
>  - https://issues.apache.org/jira/browse/FLINK-4940: Add support for
> broadcast state
>  - https://issues.apache.org/jira/browse/FLINK-6135: Allowing adding
> additional inputs to StreamOperator
>  - https://issues.apache.org/jira/browse/FLINK-6141: Add buffering
> service for stream operators
>
> Turns out that the last one is quite tricky to do (a bit more info on the
> issue itself). The first one should be somewhat straightforward and will
> get us a long way towards having some minimal side-input/join jobs. The
> second issue is good to have but you can get around it by using a
> CoOperator and manually multiplexing multiple inputs into one input.
>
> As mentioned in the second issue, I already have some proof-of-concept
> code for that so it makes sense for me to work on this. The first issue
> should be ok to work on while the last one, as I said, is probably a bit
> more long term.
>
> Anyone who want’s to pick up those issues, please ask me anything! On the
> issue or here in the thread so that we can resolve problems quickly.
>
> Best,
> Aljoscha
>
> P.S. I’ll be on vacation starting Thursday for 1.5 weeks so I’ll be a bit
> slow with responses.
>
> > On 17 Mar 2017, at 22:22, Ventura Del Monte <venturadelmo...@gmail.com>
> wrote:
> >
> > I agree with your analysis, I think we now have almost everything to
> start,
> > and I also would be interested in helping you.
> > Please feel free to count me in. Besides, I have few real use cases which
> > require side input and could help in benchmarking the final
> implementation.
> >
> > Best,
> > Ventura
> >
> >
> >
> >
> > This message, for the D. Lgs n. 196/2003 (Privacy Code), may contain
> > confidential and/or privileged information. If you are not the addressee
> or
> > authorized to receive this for the addressee, you must not use, copy,
> > disclose or take any action based on this message or any information
> > herein. If you have received this message in error, please advise the
> > sender immediately by reply e-mail and delete this message. Thank you for
> > your cooperation.
> >
> > On Fri, Mar 17, 2017 at 9:28 PM, Gábor Hermann <m...@gaborhermann.com>
> > wrote:
> >
> >> Thanks for demonstrating the windowed side-input case. I completely
> agree
> >> that handling windowed side-input separately would just simply
> complicate
> >> the implementation. The triggering mechanism for the upstream window
> could
> >> define when the windowed input is ready.
> >>
> >> I would gladly contribute to a low-level requirement. If there's a
> >> somewhat well defined JIRA issue, I'm happy to start working on it.
> >>
> >> Cheers,
> >> Gabor
> >>
> >>
> >>
> >> On 2017-03-17 16:03, Aljoscha Krettek wrote:
> >>
> >>> Yes, I agree! The implementation stuff we talked about so far is only
> >>> visible at the operator level. A user function that uses the (future)
> >>> side API would not be aware of whether buffering or blocking is used.
> It
> >>> would simply know that it is invoked and that side input is ready.
> >>>
> >>> I'll also quickly try to elaborate on my comment about why I think
> >>> windowing/triggering in the side-input operator itself is not
> necessary.
> >>> I created a figure: http://imgur.com/a/aAlw7 It is enough for the
> >>> side-input operator simply to consider side input for a given window as
> >>> ready when we have seen some data for that window. The WindowOperator
> >>> that is upstream of the side input will take care of
> >>> windowing/triggering.
> >>>
> >>> I'll create Jira issues for implementing the low-level requirements for
> >>> side inputs (n-ary operator, broadcast state and buffering) and update
> >>> this thread. If anyone is interested on working on one of those we
> might
> >>> have a chance of getting this ready for Flink 1.3. Time is a bit tight
> >>> for me because I'm going to be on vacation for 1.5 weeks starting next
> >>> week Wednesday and after that we have Flink Forward.
> >>>
> >>> Best,
> >>> Aljoscha
> >>>
> >>> On Thu, Mar 16, 2017, at 23:52, Gábor Hermann wrote:
> >>>
> >>>> Regarding the CoFlatMap workaround,
> >>>> - For keyed streams, do you suggest that having a per-key buffer
> stored
> >>>> as keyed state would have a large memory overhead? That must be true,
> >>>> although a workaround could be partitioning the data and using a
> >>>> non-keyed stream. Of course that seems hacky, as we have a keyed
> stream
> >>>> abstraction, so I agree with you.
> >>>> - I agree that keeping a broadcast side-input in the operator state is
> >>>> not optimal. That's a good point I have not thought about. First we
> have
> >>>> a separate abstraction for broadcast state, then we can optimize e.g.
> >>>> checkpointing it (avoiding checkpointing it at every operator).
> >>>>
> >>>>
> >>>> Regarding blocking/backpressuring inputs, it should not only be useful
> >>>> for static side-input, but also for periodically updated (i.e. slowly
> >>>> changing). E.g. when a machine learning model is updated and loaded
> >>>> every hour, it make sense to prioritize loading the model on the side
> >>>> input. But I see the limitations of the underlying runtime.
> >>>>
> >>>> Exposing a buffer could be useful for now. Although, the *API* for
> >>>> blocking could even be implemented by simply buffering. So the
> buffering
> >>>> could be hidden from the user, and later maybe optimized to not only
> >>>> buffer, but also apply backpressure. What do you think? Again, for the
> >>>> prototype, exposing the buffer should be fine IMHO. API and
> >>>> implementation for blocking inputs could be a separate issue, but
> let's
> >>>> not forget about it.
> >>>>
> >>>> Cheers,
> >>>> Gabor
> >>>>
> >>>>
> >>>> On 2017-03-15 16:14, Aljoscha Krettek wrote:
> >>>>
> >>>>> Hi,
> >>>>> thanks for you input! :-)
> >>>>>
> >>>>> Regarding 1)
> >>>>> I don't see the benefit of integrating windowing into the side-input
> >>>>> logic. Windowing can happen upstream and whenever that emits new data
> >>>>> then operator will notice because there is new input. Having
> windowing
> >>>>> inside the side-input of an operator as well would just make the
> >>>>> implementation more complex without adding benefit, IMHO.
> >>>>>
> >>>>> Regarding 2)
> >>>>> That's a very good observation! I think we are fine, though, because
> >>>>> checkpoint barriers never "overtake" elements. It's only elements
> that
> >>>>> can overtake checkpoint barriers. If the broadcast state on different
> >>>>> parallel instances differs in a checkpoint then it only differs
> because
> >>>>> some parallel instances have reflected changes in their state from
> >>>>> elements that they shouldn't have "seen" yet in the exactly-once
> mode.
> >>>>> If we pick the state of an arbitrary instance as the de-facto state
> we
> >>>>> don't break guarantees any more than turning on at-least-once mode
> does.
> >>>>>
> >>>>> Regarding 3)
> >>>>> We need the special buffer support for keyed operations because
> there we
> >>>>> need to make sure that data is restored on the correct operator that
> is
> >>>>> responsible for the key of the data while also allowing us to iterate
> >>>>> over all the buffered data (for when we are ready to process the
> data).
> >>>>> This iteration over elements is not possible when simply storing
> data in
> >>>>> keyed state.
> >>>>>
> >>>>> What do you think?
> >>>>>
> >>>>> On Wed, Mar 15, 2017, at 09:07, wenlong.lwl wrote:
> >>>>>
> >>>>>> Hi, Aljoscha, I just go through your prototype. I like the design of
> >>>>>> the
> >>>>>> SideInputReader which can make it flexible to determine when we can
> get
> >>>>>> the
> >>>>>> side input.
> >>>>>>
> >>>>>> I agree that side inputs are API sugar on the top of the three
> >>>>>> components(n-ary
> >>>>>> inputs, broadcast state and input buffering), following is some more
> >>>>>> thought about the three component:
> >>>>>>
> >>>>>> 1. Take both N-ary input operator and windowing/triggers mechanism
> into
> >>>>>> consideration, I think we may need the N-ary input operator supports
> >>>>>> some
> >>>>>> inputs(side inputs) are windowed while the others(main input) are
> >>>>>> normal
> >>>>>> stream. for static/slow-evolving data, we need to use global windows
> >>>>>> and
> >>>>>> for windowed-base join data , we need to use time window or custom
> >>>>>> windows.
> >>>>>> The window function on the side input can be used to collect or
> merge
> >>>>>> the
> >>>>>> data to generate the value of the side input(a single value or
> >>>>>> list/map).
> >>>>>> Once a side input reader window is triggered, the SideInputReader
> will
> >>>>>> return value available, and if a Window is triggered more than once,
> >>>>>> the
> >>>>>> value of side input will be updated and maybe the SideInputReader
> need
> >>>>>> a
> >>>>>> interface to notice the user that something changed. Besides, I
> prefer
> >>>>>> the
> >>>>>> option to make every input of N-ary input operator equal, because
> user
> >>>>>> may
> >>>>>> need one side input depends on another side input.
> >>>>>>
> >>>>>> 2. Regarding broadcast state, my concern is that how can we merge
> the
> >>>>>> value
> >>>>>> of the state from different subtasks. If the job running in at least
> >>>>>> once
> >>>>>> mode, the returned value of broadcast state from different subtasks
> >>>>>> will
> >>>>>> be
> >>>>>> different. Is there already any design on broadcast state?
> >>>>>>
> >>>>>> 3. Regarding input buffering, I think if we use window/trigger
> >>>>>> mechanism,
> >>>>>> state can be store in the state of window, which may be mostly like
> >>>>>> what
> >>>>>> we
> >>>>>> need to do currently in KeyedWindow and AllWindow. We may need to
> allow
> >>>>>> custom merge strategy on all window state data since in side inputs
> we
> >>>>>> may
> >>>>>> need to choose data according to broadcast state strategy  while in
> >>>>>> normal
> >>>>>> windows we can just redistribute the window state data.
> >>>>>>
> >>>>>> What do you think?
> >>>>>>
> >>>>>> Best Regards!
> >>>>>>
> >>>>>> Wenlong
> >>>>>>
> >>>>>> On 14 March 2017 at 01:41, Aljoscha Krettek <aljos...@apache.org>
> >>>>>> wrote:
> >>>>>>
> >>>>>> Ha! this is turning out to be quite the discussion. :-) Also, thanks
> >>>>>>> Kenn, for chiming in with the Beam perspective!
> >>>>>>>
> >>>>>>> I'll try and address some stuff.
> >>>>>>>
> >>>>>>> It seems we have some consensus on using N-ary operator to
> implement
> >>>>>>> side inputs. I see two ways forward there:
> >>>>>>>   - Have a "pure" N-ary operator that has zero inputs by default
> and
> >>>>>>> all
> >>>>>>>   N inputs are equal: this exists side-by-side with the current
> >>>>>>> one-input
> >>>>>>>   operator and two-input operator.
> >>>>>>>   - Extends the existing operators with more inputs: the main
> >>>>>>> input(s)
> >>>>>>>   would be considered different from the N other inputs,
> internally.
> >>>>>>> With
> >>>>>>>   this, we would not have to rewrite existing operators and could
> >>>>>>> simply
> >>>>>>>   have side inputs as an add-on.
> >>>>>>>
> >>>>>>> There weren't any (many?) comments on using broadcast state for
> side
> >>>>>>> inputs. I think there is not much to agree on there because it
> seems
> >>>>>>> pretty straightforward to me that we need this.
> >>>>>>>
> >>>>>>> About buffering: I think we need this as a Flink service because
> it is
> >>>>>>> right now not (easily) possible to buffer keyed input. For keyed
> input
> >>>>>>> we need to checkpoint the input buffers with the key-grouped state.
> >>>>>>> Otherwise the data would not be distributed to the correct operator
> >>>>>>> when
> >>>>>>> restoring. This is explained in the FLIP in more detail.
> >>>>>>>
> >>>>>>> If we have these three components (n-ary inputs, broadcast state
> and
> >>>>>>> input buffering) then side inputs are mostly API sugar on top. I
> even
> >>>>>>> believe that it might be enough to simply provide these and then
> users
> >>>>>>> have a very flexible system that allows them to implement different
> >>>>>>> side-input variants. I'm suggesting this because I see there are a
> lot
> >>>>>>> of different opinions and because the "field" of determining a side
> >>>>>>> input to be finished is still quite open.
> >>>>>>>
> >>>>>>> Now, regarding Gabor's comments which, I think, pretty nicely
> summed
> >>>>>>> up
> >>>>>>> the ongoing discussion and added some new stuff:
> >>>>>>>
> >>>>>>> About the CoFlatMap for the simple case: I think this is almost
> >>>>>>> possible, except for the buffering in case of a keyed input stream.
> >>>>>>> Also, the side input is not easy to store because we need broadcast
> >>>>>>> state for that (depending, of course, on whether the input(s) are
> >>>>>>> keyed
> >>>>>>> or not). I think with the above-mentioned additions this case
> would be
> >>>>>>> possible without explicit support for side inputs in the API.
> >>>>>>>
> >>>>>>> Re 1)
> >>>>>>> I would prefer to use windowing/triggers for determining side-input
> >>>>>>> readiness. There are, right now, enough messages flying around the
> >>>>>>> system and introducing yet more doesn't seem to desirable for me
> right
> >>>>>>> now. We should, of course, revisit this once we have the basic
> >>>>>>> components in place.
> >>>>>>>
> >>>>>>> Re 2)
> >>>>>>> See my comments about buffering in a keyed operator above.
> Regarding
> >>>>>>> blocking, this is currently not possible because all inputs are
> >>>>>>> consumed
> >>>>>>> by one thread. This could, of course, change in the future but it
> is a
> >>>>>>> feature (limitation?) of the current implementation. In general, I
> >>>>>>> think
> >>>>>>> blocking an input is only ever feasible while waiting for some
> bounded
> >>>>>>> inputs to be fully consumed. I.e. when you have some initial
> loading
> >>>>>>> of
> >>>>>>> data from a static data set.
> >>>>>>>
> >>>>>>> Re 3)
> >>>>>>> Agreed, I think that we should keep the side-input in the (yet to
> be
> >>>>>>> introduced) broadcast state. Again, once we have the basics in
> place
> >>>>>>> we
> >>>>>>> can investigate further optimisations here such as not
> checkpointing
> >>>>>>> side-input data from a static data set because we know that we can
> >>>>>>> easily rebuild it.
> >>>>>>>
> >>>>>>> What do you think?
> >>>>>>>
> >>>>>>> On Fri, Mar 10, 2017, at 20:44, Kenneth Knowles wrote:
> >>>>>>>
> >>>>>>>> Hi all,
> >>>>>>>>
> >>>>>>>> I thought I would briefly join this thread to mention some side
> input
> >>>>>>>> lessons from Apache Beam. My knowledge of Flink is not deep
> enough,
> >>>>>>>> technically or philosophically, to make any specific
> >>>>>>>> recommendations. And
> >>>>>>>> I
> >>>>>>>> might just be repeating things that the docs and threads cover,
> but I
> >>>>>>>> hope
> >>>>>>>> it might be helpful anyhow.
> >>>>>>>>
> >>>>>>>> Side Input Visibility / matching: Beam started with a coupling
> >>>>>>>> between
> >>>>>>>> the
> >>>>>>>> windowing on a stream and the way that windows are mapped between
> >>>>>>>> main
> >>>>>>>> input and side input. This is actually not needed and we'll be
> >>>>>>>> making the
> >>>>>>>> mapping explicit (with sensible defaults). In particular, the
> mapping
> >>>>>>>> determines when you can garbage collect, when you know that no
> main
> >>>>>>>> input
> >>>>>>>> element will ever map to a particular window again (so opaque
> >>>>>>>> mappings
> >>>>>>>> need
> >>>>>>>> some metadata).
> >>>>>>>>
> >>>>>>>> Side Input Readiness: There is an unpleasant asymmetry between
> >>>>>>>> waiting
> >>>>>>>> for
> >>>>>>>> the first triggering of a side input but not waiting for any later
> >>>>>>>> triggering. This manifests strongly when a user actually wants to
> >>>>>>>> know
> >>>>>>>> something about the relationship to side input update latency and
> >>>>>>>> main
> >>>>>>>> input processing. This echoes some of the concern here about
> >>>>>>>> user-defined
> >>>>>>>> control over readiness. IMO this is a rather open area.
> >>>>>>>>
> >>>>>>>> Default values for singleton side inputs: A special case of side
> >>>>>>>> input
> >>>>>>>> readiness that is related also to windowing. By far the most
> useful
> >>>>>>>> singleton side input is the result of a global reduction with an
> >>>>>>>> associative&commutative operator. A lot of these operators also
> have
> >>>>>>>> an
> >>>>>>>> identity element. It is nice for this identity element (known a
> >>>>>>>> priori)
> >>>>>>>> to
> >>>>>>>> be "always available" on the side input, for every window, if it
> is
> >>>>>>>> expected to be something that is continually updated. But if the
> >>>>>>>> configuration is such that it is a one-time triggering of bounded
> >>>>>>>> data,
> >>>>>>>> that behavior is not right. Related, after some amount of time we
> >>>>>>>> conclude
> >>>>>>>> that no input will ever be received for a window, and the side
> input
> >>>>>>>> becomes ready.
> >>>>>>>>
> >>>>>>>> Map Side Inputs with triggers: When new data arrives for a key in
> >>>>>>>> Beam,
> >>>>>>>> there's no way to know which value should "win", so you basically
> >>>>>>>> just
> >>>>>>>> can't use map side inputs with triggers.
> >>>>>>>>
> >>>>>>>> These are just some quick thoughts at a very high level.
> >>>>>>>>
> >>>>>>>> Kenn
> >>>>>>>>
> >>>>>>>> On Thu, Mar 9, 2017 at 7:59 AM, Aljoscha Krettek <
> >>>>>>>> aljos...@apache.org>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> Hi Jamie,
> >>>>>>>>> actually the approach where the .withSideInput() comes before the
> >>>>>>>>> user
> >>>>>>>>> function is only required for implementation proposal #1, which I
> >>>>>>>>> like
> >>>>>>>>> the least. For the other two it can be after the user function,
> >>>>>>>>> which
> >>>>>>>>>
> >>>>>>>> is
> >>>>>>>
> >>>>>>>> also what I prefer.
> >>>>>>>>>
> >>>>>>>>> Regarding semantics: yes, we simply wait for anything to be
> >>>>>>>>> available.
> >>>>>>>>> For GlobalWindows, i.e. side inputs on a normal function where we
> >>>>>>>>>
> >>>>>>>> simply
> >>>>>>>
> >>>>>>>> don't have windows, this means that we wait for anything. For the
> >>>>>>>>> windowed case, which I'm proposing as a second step we will wait
> for
> >>>>>>>>> side input in a window to be available that matches the
> main-input
> >>>>>>>>> window. For the keyed case we wait for something on the same key
> to
> >>>>>>>>> be
> >>>>>>>>> available, for the broadcast case we wait for anything.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Aljoscha
> >>>>>>>>>
> >>>>>>>>> On Thu, Mar 9, 2017, at 16:55, Jamie Grier wrote:
> >>>>>>>>>
> >>>>>>>>>> Hi, I think the proposal looks good.  The only thing I wasn't
> clear
> >>>>>>>>>>
> >>>>>>>>> on
> >>>>>>>
> >>>>>>>> was
> >>>>>>>>>> which API is actually being proposed.  The one where
> >>>>>>>>>> .withSideInput()
> >>>>>>>>>> comes
> >>>>>>>>>> before the user function or after.  I would definitely prefer it
> >>>>>>>>>> come
> >>>>>>>>>> after
> >>>>>>>>>> since that's the normal pattern in the Flink API.  I understood
> >>>>>>>>>> that
> >>>>>>>>>> makes
> >>>>>>>>>> the implementation different (maybe harder) but I think it helps
> >>>>>>>>>>
> >>>>>>>>> keep the
> >>>>>>>
> >>>>>>>> API uniform which is really good.
> >>>>>>>>>>
> >>>>>>>>>> Overall I think the API looks good and yes there are some tricky
> >>>>>>>>>> semantics
> >>>>>>>>>> here but in general if, when processing keyed main streams, we
> >>>>>>>>>> always
> >>>>>>>>>> wait
> >>>>>>>>>> until there is a side-input available for that key we're off to
> a
> >>>>>>>>>>
> >>>>>>>>> great
> >>>>>>>
> >>>>>>>> start and I think that was what you're suggesting in the design
> doc.
> >>>>>>>>>>
> >>>>>>>>>> -Jamie
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Thu, Mar 9, 2017 at 7:27 AM, Aljoscha Krettek <
> >>>>>>>>>>
> >>>>>>>>> aljos...@apache.org>
> >>>>>>>
> >>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hi,
> >>>>>>>>>>> these are all valuable suggestions and I think that we should
> >>>>>>>>>>>
> >>>>>>>>>> implement
> >>>>>>>
> >>>>>>>> them when the time is right. However, I would like to first get a
> >>>>>>>>>>> minimal viable version of this feature into Flink and then
> expand
> >>>>>>>>>>>
> >>>>>>>>>> on
> >>>>>>>
> >>>>>>>> it.
> >>>>>>>>>
> >>>>>>>>>> I think the last few tries of tackling this problem fizzled out
> >>>>>>>>>>>
> >>>>>>>>>> because
> >>>>>>>
> >>>>>>>> we got to deep into discussing special semantics and features. I
> >>>>>>>>>>>
> >>>>>>>>>> think
> >>>>>>>
> >>>>>>>> the most important thing to agree on right now is the basic API
> >>>>>>>>>>>
> >>>>>>>>>> and the
> >>>>>>>
> >>>>>>>> implementation plan. What do you think about that?
> >>>>>>>>>>>
> >>>>>>>>>>> Regarding your suggestions, I have in fact a branch [1] from
> May
> >>>>>>>>>>>
> >>>>>>>>>> 2016
> >>>>>>>
> >>>>>>>> where I implemented a prototype implementation. This has an n-ary
> >>>>>>>>>>> operator and inputs can be either bounded or unbounded and the
> >>>>>>>>>>> implementation actually waits for all bounded inputs to finish
> >>>>>>>>>>>
> >>>>>>>>>> before
> >>>>>>>
> >>>>>>>> starting to process the unbounded inputs.
> >>>>>>>>>>>
> >>>>>>>>>>> In general, I think blocking on an input is only possible while
> >>>>>>>>>>>
> >>>>>>>>>> you're
> >>>>>>>
> >>>>>>>> waiting for a bounded input to finish. If all inputs are unbounded
> >>>>>>>>>>>
> >>>>>>>>>> you
> >>>>>>>
> >>>>>>>> cannot block because you might run into deadlocks (in the
> >>>>>>>>>>>
> >>>>>>>>>> processing
> >>>>>>>
> >>>>>>>> graph, due to back pressure) and also because blocking will also
> >>>>>>>>>>>
> >>>>>>>>>> block
> >>>>>>>
> >>>>>>>> elements that might have a lower timestamp and might fall into a
> >>>>>>>>>>> different window which is already ready for processing.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Aljoscha
> >>>>>>>>>>>
> >>>>>>>>>>> [1]
> >>>>>>>>>>> https://github.com/aljoscha/flink/commits/operator-ng-side-
> >>>>>>>>>>>
> >>>>>>>>>> input-wrapper
> >>>>>>>>>
> >>>>>>>>>> On Tue, Mar 7, 2017, at 14:39, wenlong.lwl wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Aljoscha, thank you for the proposal, it is great to hear
> >>>>>>>>>>>>
> >>>>>>>>>>> about
> >>>>>>>
> >>>>>>>> the
> >>>>>>>>>
> >>>>>>>>>> progress in side input.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Following is my point of view:
> >>>>>>>>>>>> 1. I think there may be an option to block the processing of
> the
> >>>>>>>>>>>>
> >>>>>>>>>>> main
> >>>>>>>
> >>>>>>>> input
> >>>>>>>>>>>> instead of buffer the data in state because in production, the
> >>>>>>>>>>>>
> >>>>>>>>>>> through
> >>>>>>>>>
> >>>>>>>>>> put
> >>>>>>>>>>>> of the main input is usually much larger, and buffering the
> data
> >>>>>>>>>>>>
> >>>>>>>>>>> before
> >>>>>>>>>
> >>>>>>>>>> the
> >>>>>>>>>>>> side input may slow down the preparing of side input since the
> >>>>>>>>>>>>
> >>>>>>>>>>> i-o
> >>>>>>>
> >>>>>>>> and
> >>>>>>>>>
> >>>>>>>>>> computing resources are always limited.
> >>>>>>>>>>>> 2. another issue may need to be disscussed: how can we do
> >>>>>>>>>>>>
> >>>>>>>>>>> checkpointing
> >>>>>>>>>
> >>>>>>>>>> with side input, because static side input may finish soon once
> >>>>>>>>>>>>
> >>>>>>>>>>> started
> >>>>>>>>>
> >>>>>>>>>> which will stop the checkpointing.
> >>>>>>>>>>>> 3. I agree with Gyula that user should be able to determines
> >>>>>>>>>>>>
> >>>>>>>>>>> when a
> >>>>>>>
> >>>>>>>> side
> >>>>>>>>>
> >>>>>>>>>> input is ready? Maybe we can do it one step further: whether
> >>>>>>>>>>>>
> >>>>>>>>>>> users
> >>>>>>>
> >>>>>>>> can
> >>>>>>>>>
> >>>>>>>>>> determine a operator with multiple inputs to process which input
> >>>>>>>>>>>>
> >>>>>>>>>>> each
> >>>>>>>
> >>>>>>>> time
> >>>>>>>>>>>> or not?  It would be more flexible.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Best Regards!
> >>>>>>>>>>>> Wenlong
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 7 March 2017 at 18:39, Ventura Del Monte <
> >>>>>>>>>>>>
> >>>>>>>>>>> venturadelmo...@gmail.com>
> >>>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> Hi Aljoscha,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thank you for the proposal and for bringing up again this
> >>>>>>>>>>>>>
> >>>>>>>>>>>> discussion.
> >>>>>>>>>
> >>>>>>>>>> Regarding the implementation aspect,I would say the first way
> >>>>>>>>>>>>>
> >>>>>>>>>>>> could
> >>>>>>>
> >>>>>>>> be easier/faster to implement but it could add some overhead
> >>>>>>>>>>>>>
> >>>>>>>>>>>> when
> >>>>>>>
> >>>>>>>> dealing with multiple side inputs through the current 2-streams
> >>>>>>>>>>>>>
> >>>>>>>>>>>> union
> >>>>>>>>>
> >>>>>>>>>> transform. I tried the second option myself as it has less
> >>>>>>>>>>>>>
> >>>>>>>>>>>> overhead
> >>>>>>>
> >>>>>>>> but then the outcome was something close to a N-ary operator
> >>>>>>>>>>>>>
> >>>>>>>>>>>> consuming
> >>>>>>>>>
> >>>>>>>>>> first each side input while buffering the main one.
> >>>>>>>>>>>>> Therefore, I would choose the third option as it is more
> >>>>>>>>>>>>>
> >>>>>>>>>>>> generic
> >>>>>>>
> >>>>>>>> and might help also in other scenarios, although its
> >>>>>>>>>>>>>
> >>>>>>>>>>>> implementation
> >>>>>>>
> >>>>>>>> requires more effort.
> >>>>>>>>>>>>> I also agree with Gyula, I think the user should be allowed
> to
> >>>>>>>>>>>>>
> >>>>>>>>>>>> define
> >>>>>>>>>
> >>>>>>>>>> the
> >>>>>>>>>>>
> >>>>>>>>>>>> condition that determines when a side input is ready, e.g.,
> >>>>>>>>>>>>>
> >>>>>>>>>>>> load
> >>>>>>>
> >>>>>>>> the
> >>>>>>>>>
> >>>>>>>>>> side
> >>>>>>>>>>>
> >>>>>>>>>>>> input first, incrementally update the side input.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Ventura
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> This message, for the D. Lgs n. 196/2003 (Privacy Code), may
> >>>>>>>>>>>>>
> >>>>>>>>>>>> contain
> >>>>>>>>>
> >>>>>>>>>> confidential and/or privileged information. If you are not the
> >>>>>>>>>>>>>
> >>>>>>>>>>>> addressee or
> >>>>>>>>>>>
> >>>>>>>>>>>> authorized to receive this for the addressee, you must not
> use,
> >>>>>>>>>>>>>
> >>>>>>>>>>>> copy,
> >>>>>>>>>
> >>>>>>>>>> disclose or take any action based on this message or any
> >>>>>>>>>>>>>
> >>>>>>>>>>>> information
> >>>>>>>>>
> >>>>>>>>>> herein. If you have received this message in error, please
> >>>>>>>>>>>>>
> >>>>>>>>>>>> advise
> >>>>>>>
> >>>>>>>> the
> >>>>>>>>>
> >>>>>>>>>> sender immediately by reply e-mail and delete this message.
> >>>>>>>>>>>>>
> >>>>>>>>>>>> Thank
> >>>>>>>
> >>>>>>>> you
> >>>>>>>>>
> >>>>>>>>>> for
> >>>>>>>>>>>
> >>>>>>>>>>>> your cooperation.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Mon, Mar 6, 2017 at 3:50 PM, Gyula Fóra <
> >>>>>>>>>>>>>
> >>>>>>>>>>>> gyula.f...@gmail.com>
> >>>>>>>
> >>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Aljoscha,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Thank you for the nice proposal!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I think it would make sense to allow user's to affect the
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> readiness
> >>>>>>>>>
> >>>>>>>>>> of
> >>>>>>>>>>>
> >>>>>>>>>>>> the
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> side input. I think making it ready when the first element
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> arrives is
> >>>>>>>>>
> >>>>>>>>>> only
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> slightly better then making it always ready from usability
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> perspective.
> >>>>>>>>>>>
> >>>>>>>>>>>> For
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> instance if I am joining against a static data set I want to
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> wait
> >>>>>>>
> >>>>>>>> for the
> >>>>>>>>>>>
> >>>>>>>>>>>> whole set before making it ready. This could be exposed as a
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> user
> >>>>>>>
> >>>>>>>> defined
> >>>>>>>>>>>
> >>>>>>>>>>>> condition that could also recognize bounded inputs maybe.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Maybe we could also add an aggregating (merging) side input
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> type,
> >>>>>>>
> >>>>>>>> that
> >>>>>>>>>>>
> >>>>>>>>>>>> could work as a broadcast state.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> What do you think?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Gyula
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Aljoscha Krettek <aljos...@apache.org> ezt írta (időpont:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> 2017.
> >>>>>>>
> >>>>>>>> márc.
> >>>>>>>>>>>
> >>>>>>>>>>>> 6.,
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> H, 15:18):
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hi Folks,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> I would like to finally agree on a plan for implementing
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> side
> >>>>>>>
> >>>>>>>> inputs in
> >>>>>>>>>>>
> >>>>>>>>>>>> Flink. There has already been an attempt to come to
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> consensus
> >>>>>>>
> >>>>>>>> [1],
> >>>>>>>>>
> >>>>>>>>>> which
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> resulted in two design documents. I tried to consolidate
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> those
> >>>>>>>
> >>>>>>>> two
> >>>>>>>>>
> >>>>>>>>>> and
> >>>>>>>>>>>
> >>>>>>>>>>>> also added a section about implementation plans. This is
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> the
> >>>>>>>
> >>>>>>>> resulting
> >>>>>>>>>>>
> >>>>>>>>>>>> FLIP:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 17+Side+Inputs+for+DataStream+API
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> In terms of semantics I tried to go with the minimal viable
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> solution.
> >>>>>>>>>>>
> >>>>>>>>>>>> The part that needs discussing is how we want to implement
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> this. I
> >>>>>>>>>
> >>>>>>>>>> outlined three possible implementation plans in the FLIP
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> but
> >>>>>>>
> >>>>>>>> what
> >>>>>>>>>
> >>>>>>>>>> it
> >>>>>>>>>>>
> >>>>>>>>>>>> boils down to is that we need to introduce some way of
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> getting
> >>>>>>>
> >>>>>>>> several
> >>>>>>>>>>>
> >>>>>>>>>>>> inputs into an operator/task.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Please have a look at the doc and let us know what you
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> think.
> >>>>>>>
> >>>>>>>>
> >>>>>>>>>>>>>>> Best,
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Aljoscha
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>>> https://lists.apache.org/thread.html/
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> 797df0ba066151b77c7951fd7d603a
> >>>>>>>>>>>
> >>>>>>>>>>>> 8afd7023920d0607a0c6337db3@1462181294@%3Cdev.flink.apache.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> org%3E
> >>>>>>>
> >>>>>>>>
> >>>>>>>>>> --
> >>>>>>>>>>
> >>>>>>>>>> Jamie Grier
> >>>>>>>>>> data Artisans, Director of Applications Engineering
> >>>>>>>>>> @jamiegrier <https://twitter.com/jamiegrier>
> >>>>>>>>>> ja...@data-artisans.com
> >>>>>>>>>>
> >>>>>>>>>
> >>
>
>

Reply via email to