Hi Vino,

+1 for this feature. It's useful for data skew. And it could also reduce
shuffled datum.

I have some concerns about the API part. From my side, this feature should
be more like an improvement. I'm afraid the proposal is an overkill about
the API part. Many other systems support pre-aggregation as an optimization
of global aggregation. The optimization might be used automatically or
manually but with a simple API. The proposal introduces a series of
flexible local aggregation APIs. They could be independent with global
aggregation. It doesn't look like an improvement but introduces a lot of
features. I'm not sure if there is a bigger picture later. As for now the
API part looks a little heavy for me.


vino yang <yanghua1...@gmail.com> 于2019年6月5日周三 上午10:38写道:

> Hi Litree,
>
> From an implementation level, the localKeyBy API returns a general
> KeyedStream, you can call all the APIs which KeyedStream provides, we did
> not restrict its usage, although we can do this (for example returns a new
> stream object named LocalKeyedStream).
>
> However, to achieve the goal of local aggregation, it only makes sense to
> call the window API.
>
> Best,
> Vino
>
> litree <lyuan...@126.com> 于2019年6月4日周二 下午10:41写道:
>
> > Hi Vino,
> >
> >
> > I have read your design,something I want to know is the usage of these
> new
> > APIs.It looks like when I use localByKey,i must then use a window
> operator
> > to return a datastream,and then use keyby and another window operator to
> > get the final result?
> >
> >
> > thanks,
> > Litree
> >
> >
> > On 06/04/2019 17:22, vino yang wrote:
> > Hi Dian,
> >
> > Thanks for your reply.
> >
> > I know what you mean. However, if you think deeply, you will find your
> > implementation need to provide an operator which looks like a window
> > operator. You need to use state and receive aggregation function and
> > specify the trigger time. It looks like a lightweight window operator.
> > Right?
> >
> > We try to reuse Flink provided functions and reduce complexity. IMO, It
> is
> > more user-friendly because users are familiar with the window API.
> >
> > Best,
> > Vino
> >
> >
> > Dian Fu <dian0511...@gmail.com> 于2019年6月4日周二 下午4:19写道:
> >
> > > Hi Vino,
> > >
> > > Thanks a lot for starting this discussion. +1 to this feature as I
> think
> > > it will be very useful.
> > >
> > > Regarding to using window to buffer the input elements, personally I
> > don't
> > > think it's a good solution for the following reasons:
> > > 1) As we know that WindowOperator will store the accumulated results in
> > > states, this is not necessary for Local Aggregate operator.
> > > 2) For WindowOperator, each input element will be accumulated to
> states.
> > > This is also not necessary for Local Aggregate operator and storing the
> > > input elements in memory is enough.
> > >
> > > Thanks,
> > > Dian
> > >
> > > > 在 2019年6月4日,上午10:03,vino yang <yanghua1...@gmail.com> 写道:
> > > >
> > > > Hi Ken,
> > > >
> > > > Thanks for your reply.
> > > >
> > > > As I said before, we try to reuse Flink's state concept (fault
> > tolerance
> > > > and guarantee "Exactly-Once" semantics). So we did not consider
> cache.
> > > >
> > > > In addition, if we use Flink's state, the OOM related issue is not a
> > key
> > > > problem we need to consider.
> > > >
> > > > Best,
> > > > Vino
> > > >
> > > > Ken Krugler <kkrugler_li...@transpac.com> 于2019年6月4日周二 上午1:37写道:
> > > >
> > > >> Hi all,
> > > >>
> > > >> Cascading implemented this “map-side reduce” functionality with an
> LLR
> > > >> cache.
> > > >>
> > > >> That worked well, as then the skewed keys would always be in the
> > cache.
> > > >>
> > > >> The API let you decide the size of the cache, in terms of number of
> > > >> entries.
> > > >>
> > > >> Having a memory limit would have been better for many of our use
> > cases,
> > > >> though FWIR there’s no good way to estimate in-memory size for
> > objects.
> > > >>
> > > >> — Ken
> > > >>
> > > >>> On Jun 3, 2019, at 2:03 AM, vino yang <yanghua1...@gmail.com>
> wrote:
> > > >>>
> > > >>> Hi Piotr,
> > > >>>
> > > >>> The localKeyBy API returns an instance of KeyedStream (we just
> added
> > an
> > > >>> inner flag to identify the local mode) which is Flink has provided
> > > >> before.
> > > >>> Users can call all the APIs(especially *window* APIs) which
> > KeyedStream
> > > >>> provided.
> > > >>>
> > > >>> So if users want to use local aggregation, they should call the
> > window
> > > >> API
> > > >>> to build a local window that means users should (or say "can")
> > specify
> > > >> the
> > > >>> window length and other information based on their needs.
> > > >>>
> > > >>> I think you described another idea different from us. We did not
> try
> > to
> > > >>> react after triggering some predefined threshold. We tend to give
> > users
> > > >> the
> > > >>> discretion to make decisions.
> > > >>>
> > > >>> Our design idea tends to reuse Flink provided concept and functions
> > > like
> > > >>> state and window (IMO, we do not need to worry about OOM and the
> > issues
> > > >> you
> > > >>> mentioned).
> > > >>>
> > > >>> Best,
> > > >>> Vino
> > > >>>
> > > >>> Piotr Nowojski <pi...@ververica.com> 于2019年6月3日周一 下午4:30写道:
> > > >>>
> > > >>>> Hi,
> > > >>>>
> > > >>>> +1 for the idea from my side. I’ve even attempted to add similar
> > > feature
> > > >>>> quite some time ago, but didn’t get enough traction [1].
> > > >>>>
> > > >>>> I’ve read through your document and I couldn’t find it mentioning
> > > >>>> anywhere, when the pre aggregated result should be emitted down
> the
> > > >> stream?
> > > >>>> I think that’s one of the most crucial decision, since wrong
> > decision
> > > >> here
> > > >>>> can lead to decrease of performance or to an explosion of
> > memory/state
> > > >>>> consumption (both with bounded and unbounded data streams). For
> > > >> streaming
> > > >>>> it can also lead to an increased latency.
> > > >>>>
> > > >>>> Since this is also a decision that’s impossible to make
> > automatically
> > > >>>> perfectly reliably, first and foremost I would expect this to be
> > > >>>> configurable via the API. With maybe some predefined triggers,
> like
> > on
> > > >>>> watermark (for windowed operations), on checkpoint barrier (to
> > > decrease
> > > >>>> state size?), on element count, maybe memory usage (much easier to
> > > >> estimate
> > > >>>> with a known/predefined types, like in SQL)… and with some option
> to
> > > >>>> implement custom trigger.
> > > >>>>
> > > >>>> Also what would work the best would be to have a some form of
> memory
> > > >>>> consumption priority. For example if we are running out of memory
> > for
> > > >>>> HashJoin/Final aggregation, instead of spilling to disk or
> crashing
> > > the
> > > >> job
> > > >>>> with OOM it would be probably better to prune/dump the pre/local
> > > >>>> aggregation state. But that’s another story.
> > > >>>>
> > > >>>> [1] https://github.com/apache/flink/pull/4626 <
> > > >>>> https://github.com/apache/flink/pull/4626>
> > > >>>>
> > > >>>> Piotrek
> > > >>>>
> > > >>>>> On 3 Jun 2019, at 10:16, sf lee <leesf0...@gmail.com> wrote:
> > > >>>>>
> > > >>>>> Excited and  Big +1 for this feature.
> > > >>>>>
> > > >>>>> SHI Xiaogang <shixiaoga...@gmail.com> 于2019年6月3日周一 下午3:37写道:
> > > >>>>>
> > > >>>>>> Nice feature.
> > > >>>>>> Looking forward to having it in Flink.
> > > >>>>>>
> > > >>>>>> Regards,
> > > >>>>>> Xiaogang
> > > >>>>>>
> > > >>>>>> vino yang <yanghua1...@gmail.com> 于2019年6月3日周一 下午3:31写道:
> > > >>>>>>
> > > >>>>>>> Hi all,
> > > >>>>>>>
> > > >>>>>>> As we mentioned in some conference, such as Flink Forward SF
> 2019
> > > and
> > > >>>>>> QCon
> > > >>>>>>> Beijing 2019, our team has implemented "Local aggregation" in
> our
> > > >> inner
> > > >>>>>>> Flink fork. This feature can effectively alleviate data skew.
> > > >>>>>>>
> > > >>>>>>> Currently, keyed streams are widely used to perform aggregating
> > > >>>>>> operations
> > > >>>>>>> (e.g., reduce, sum and window) on the elements that having the
> > same
> > > >>>> key.
> > > >>>>>>> When executed at runtime, the elements with the same key will
> be
> > > sent
> > > >>>> to
> > > >>>>>>> and aggregated by the same task.
> > > >>>>>>>
> > > >>>>>>> The performance of these aggregating operations is very
> sensitive
> > > to
> > > >>>> the
> > > >>>>>>> distribution of keys. In the cases where the distribution of
> keys
> > > >>>>>> follows a
> > > >>>>>>> powerful law, the performance will be significantly downgraded.
> > > More
> > > >>>>>>> unluckily, increasing the degree of parallelism does not help
> > when
> > > a
> > > >>>> task
> > > >>>>>>> is overloaded by a single key.
> > > >>>>>>>
> > > >>>>>>> Local aggregation is a widely-adopted method to reduce the
> > > >> performance
> > > >>>>>>> degraded by data skew. We can decompose the aggregating
> > operations
> > > >> into
> > > >>>>>> two
> > > >>>>>>> phases. In the first phase, we aggregate the elements of the
> same
> > > key
> > > >>>> at
> > > >>>>>>> the sender side to obtain partial results. Then at the second
> > > phase,
> > > >>>>>> these
> > > >>>>>>> partial results are sent to receivers according to their keys
> and
> > > are
> > > >>>>>>> combined to obtain the final result. Since the number of
> partial
> > > >>>> results
> > > >>>>>>> received by each receiver is limited by the number of senders,
> > the
> > > >>>>>>> imbalance among receivers can be reduced. Besides, by reducing
> > the
> > > >>>> amount
> > > >>>>>>> of transferred data the performance can be further improved.
> > > >>>>>>>
> > > >>>>>>> The design documentation is here:
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>
> > >
> >
> https://docs.google.com/document/d/1gizbbFPVtkPZPRS8AIuH8596BmgkfEa7NRwR6n3pQes/edit?usp=sharing
> > > >>>>>>>
> > > >>>>>>> Any comment and feedback are welcome and appreciated.
> > > >>>>>>>
> > > >>>>>>> Best,
> > > >>>>>>> Vino
> > > >>>>>>>
> > > >>>>>>
> > > >>>>
> > > >>>>
> > > >>
> > > >> --------------------------
> > > >> Ken Krugler
> > > >> +1 530-210-6378
> > > >> http://www.scaleunlimited.com
> > > >> Custom big data solutions & training
> > > >> Flink, Solr, Hadoop, Cascading & Cassandra
> > > >>
> > > >>
> > >
> > >
> >
>

Reply via email to