Thanks Yun for bringing up this discussion and very thanks for all the deep
thoughts!

For now, I think this discussion contains two scenarios: one if for
iteration library support and the other is for SQL join support. I think
both of the two scenarios are useful but they seem to have different best
suitable solutions. For making the discussion more clear, I would suggest
to split the discussion into two threads.

And I agree with Piotr that it is very tricky that a keyed stream received
a "broadcast element". So we may add some new interfaces, which could
broadcast or process some special "broadcast event". In that way "broadcast
event" will not be sent with the normal process.

Best,
Guowei


SHI Xiaogang <shixiaoga...@gmail.com> 于2019年8月26日周一 上午9:27写道:

> Hi all,
>
> I also think that multicasting is a necessity in Flink, but more details
> are needed to be considered.
>
> Currently network is tightly coupled with states in Flink to achieve
> automatic scaling. We can only access keyed states in keyed streams and
> operator states in all streams.
> In the concrete example of theta-joins implemented with mutlticasting, the
> following questions exist:
>
>    - In which type of states will the data be stored? Do we need another
>    type of states which is coupled with multicasting streams?
>    - How to ensure the consistency between network and states when jobs
>    scale out or scale in?
>
> Regards,
> Xiaogang
>
> Xingcan Cui <xingc...@gmail.com> 于2019年8月25日周日 上午10:03写道:
>
> > Hi all,
> >
> > Sorry for joining this thread late. Basically, I think enabling multicast
> > pattern could be the right direction, but more detailed implementation
> > policies need to be discussed.
> >
> > Two years ago, I filed an issue [1] about the multicast API. However, due
> > to some reasons, it was laid aside. After that, when I tried to
> cherry-pick
> > the change for experimental use, I found the return type of
> > `selectChannels()` method had changed from `int[]` to `int`, which makes
> > the old implementation not work anymore.
> >
> > From my side, the multicast has always been used for theta-join. As far
> as
> > I know, it’s an essential requirement for some sophisticated joining
> > algorithms. Until now, the Flink non-equi joins can still only be
> executed
> > single-threaded. If we'd like to make some improvements on this, we
> should
> > first take some measures to support multicast pattern.
> >
> > Best,
> > Xingcan
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-6936
> >
> > > On Aug 24, 2019, at 5:54 AM, Zhu Zhu <reed...@gmail.com> wrote:
> > >
> > > Hi Piotr,
> > >
> > > Thanks for the explanation.
> > > Agreed that the broadcastEmit(record) is a better choice for
> broadcasting
> > > for the iterations.
> > > As broadcasting for the iterations is the first motivation, let's
> support
> > > it first.
> > >
> > > Thanks,
> > > Zhu Zhu
> > >
> > > Yun Gao <yungao...@aliyun.com.invalid> 于2019年8月23日周五 下午11:56写道:
> > >
> > >>     Hi Piotr,
> > >>
> > >>      Very thanks for the suggestions!
> > >>
> > >>     Totally agree with that we could first focus on the broadcast
> > >> scenarios and exposing the broadcastEmit method first considering the
> > >> semantics and performance.
> > >>
> > >>     For the keyed stream, I also agree with that broadcasting keyed
> > >> records to all the tasks may be confused considering the semantics of
> > keyed
> > >> partitioner. However, in the iteration case supporting broadcast over
> > keyed
> > >> partitioner should be required since users may create any subgraph for
> > the
> > >> iteration body, including the operators with key. I think a possible
> > >> solution to this issue is to introduce another data type for
> > >> 'broadcastEmit'. For example, for an operator Operator<T>, it may
> > broadcast
> > >> emit another type E instead of T, and the transmitting E will bypass
> the
> > >> partitioner and setting keyed context. This should result in the
> design
> > to
> > >> introduce customized operator event (option 1 in the document). The
> > cost of
> > >> this method is that we need to introduce a new type of StreamElement
> and
> > >> new interface for this type, but it should be suitable for both keyed
> or
> > >> non-keyed partitioner.
> > >>
> > >> Best,
> > >> Yun
> > >>
> > >>
> > >>
> > >> ------------------------------------------------------------------
> > >> From:Piotr Nowojski <pi...@ververica.com>
> > >> Send Time:2019 Aug. 23 (Fri.) 22:29
> > >> To:Zhu Zhu <reed...@gmail.com>
> > >> Cc:dev <dev@flink.apache.org>; Yun Gao <yungao...@aliyun.com>
> > >> Subject:Re: [DISCUSS] Enhance Support for Multicast Communication
> > Pattern
> > >>
> > >> Hi,
> > >>
> > >> If the primary motivation is broadcasting (for the iterations) and we
> > have
> > >> no immediate need for multicast (cross join), I would prefer to first
> > >> expose broadcast via the DataStream API and only later, once we
> finally
> > >> need it, support multicast. As I wrote, multicast would be more
> > challenging
> > >> to implement, with more complicated runtime and API. And re-using
> > multicast
> > >> just to support broadcast doesn’t have much sense:
> > >>
> > >> 1. It’s a bit obfuscated. It’s easier to understand
> > >> collectBroadcast(record) or broadcastEmit(record) compared to some
> > >> multicast channel selector that just happens to return all of the
> > channels.
> > >> 2. There are performance benefits of explicitly calling
> > >> `RecordWriter#broadcastEmit`.
> > >>
> > >>
> > >> On a different note, what would be the semantic of such broadcast emit
> > on
> > >> KeyedStream? Would it be supported? Or would we limit support only to
> > the
> > >> non-keyed streams?
> > >>
> > >> Piotrek
> > >>
> > >>> On 23 Aug 2019, at 12:48, Zhu Zhu <reed...@gmail.com> wrote:
> > >>>
> > >>> Thanks Piotr,
> > >>>
> > >>> Users asked for this feature sometimes ago when they migrating batch
> > >> jobs to Flink(Blink).
> > >>> It's not very urgent as they have taken some workarounds to solve
> > >> it.(like partitioning data set to different job vertices)
> > >>> So it's fine to not make it top priority.
> > >>>
> > >>> Anyway, as a commonly known scenario, I think users can benefit from
> > >> cross join sooner or later.
> > >>>
> > >>> Thanks,
> > >>> Zhu Zhu
> > >>>
> > >>> Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com>>
> > >> 于2019年8月23日周五 下午6:19写道:
> > >>> Hi,
> > >>>
> > >>> Thanks for the answers :) Ok I understand the full picture now. +1
> from
> > >> my side on solving this issue somehow. But before we start discussing
> > how
> > >> to solve it one last control question:
> > >>>
> > >>> I guess this multicast is intended to be used in blink planner,
> right?
> > >> Assuming that we implement the multicast support now, when would it be
> > used
> > >> by the blink? I would like to avoid a scenario, where we implement an
> > >> unused feature and we keep maintaining it for a long period of time.
> > >>>
> > >>> Piotrek
> > >>>
> > >>> PS, try to include motivating examples, including concrete ones in
> the
> > >> proposals/design docs, for example in the very first paragraph.
> > Especially
> > >> if it’s a commonly known feature like cross join :)
> > >>>
> > >>>> On 23 Aug 2019, at 11:38, Yun Gao <yungao...@aliyun.com.INVALID>
> > >> wrote:
> > >>>>
> > >>>>    Hi Piotr,
> > >>>>
> > >>>>       Thanks a lot for sharing the thoughts!
> > >>>>
> > >>>>       For the iteration, agree with that multicasting is not
> > >> necessary. Exploring the broadcast interface to Output of the
> operators
> > in
> > >> some way should also solve this issue, and I think it should be even
> > more
> > >> convenient to have the broadcast method for the iteration.
> > >>>>
> > >>>>       Also thanks Zhu Zhu for the cross join case!
> > >>>> Best,
> > >>>>  Yun
> > >>>>
> > >>>>
> > >>>>
> > >>>> ------------------------------------------------------------------
> > >>>> From:Zhu Zhu <reed...@gmail.com <mailto:reed...@gmail.com>>
> > >>>> Send Time:2019 Aug. 23 (Fri.) 17:25
> > >>>> To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org>>
> > >>>> Cc:Yun Gao <yungao...@aliyun.com <mailto:yungao...@aliyun.com>>
> > >>>> Subject:Re: [DISCUSS] Enhance Support for Multicast Communication
> > >> Pattern
> > >>>>
> > >>>> Hi Piotr,
> > >>>>
> > >>>> Yes you are right it's a distributed cross join requirement.
> > >>>> Broadcast join can help with cross join cases. But users cannot use
> it
> > >> if the data set to join is too large to fit into one subtask.
> > >>>>
> > >>>> Sorry for left some details behind.
> > >>>>
> > >>>> Thanks,
> > >>>> Zhu Zhu
> > >>>> Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com>>
> > >> 于2019年8月23日周五 下午4:57写道:
> > >>>> Hi Yun and Zhu Zhu,
> > >>>>
> > >>>> Thanks for the more detailed example Zhu Zhu.
> > >>>>
> > >>>> As far as I understand for the iterations example we do not need
> > >> multicasting. Regarding the Join example, I don’t fully understand it.
> > The
> > >> example that Zhu Zhu presented has a drawback of sending both tables
> to
> > >> multiple nodes. What’s the benefit of using broadcast join over a hash
> > join
> > >> in such case? As far as I know, the biggest benefit of using broadcast
> > join
> > >> instead of hash join is that we can avoid sending the larger table
> over
> > the
> > >> network, because we can perform the join locally. In this example we
> are
> > >> sending both of the tables to multiple nodes, which should defeat the
> > >> purpose.
> > >>>>
> > >>>> Is it about implementing cross join or near cross joins in a
> > >> distributed fashion?
> > >>>>
> > >>>>> if we introduce a new MulticastRecordWriter
> > >>>>
> > >>>> That’s one of the solutions. It might have a drawback of 3 class
> > >> virtualisation problem (We have RecordWriter and BroadcastRecordWriter
> > >> already). With up to two implementations, JVM is able to devirtualise
> > the
> > >> calls.
> > >>>>
> > >>>> Previously I was also thinking about just providing two different
> > >> ChannelSelector interfaces. One with `int[]` and
> `SingleChannelSelector`
> > >> with plain `int` and based on that, RecordWriter could perform some
> > magic
> > >> (worst case scenario `instaceof` checks).
> > >>>>
> > >>>> Another solution might be to change `ChannelSelector` interface into
> > >> an iterator.
> > >>>>
> > >>>> But let's discuss the details after we agree on implementing this.
> > >>>>
> > >>>> Piotrek
> > >>>>
> > >>>>> On 23 Aug 2019, at 10:20, Yun Gao <yungao...@aliyun.com <mailto:
> > >> yungao...@aliyun.com>> wrote:
> > >>>>>
> > >>>>>  Hi Piotr,
> > >>>>>
> > >>>>>       Thanks a lot for the suggestions!
> > >>>>>
> > >>>>>       The core motivation of this discussion is to implement a new
> > >> iteration library on the DataStream, and it requires to insert special
> > >> records in the stream to notify the progress of the iteration. The
> > >> mechanism of such records is very similar to the current Watermark,
> and
> > we
> > >> meet the problem of sending normal records according to the partition
> > >> (Rebalance, etc..) and also be able to broadcast the inserted progress
> > >> records to all the connected records. I have read the notes in the
> > google
> > >> doc and I totally agree with that exploring the broadcast interface in
> > >> RecordWriter in some way is able to solve this issue.
> > >>>>>
> > >>>>>      Regarding to `int[] ChannelSelector#selectChannels()`, I'm
> > >> wondering if we introduce a new MulticastRecordWriter and left the
> > current
> > >> RecordWriter untouched, could we avoid the performance degradation ?
> > Since
> > >> with such a modification the normal RecordWriter does not need to
> > iterate
> > >> the return array by ChannelSelector, and the only difference will be
> > >> returning an array instead of an integer, and accessing the first
> > element
> > >> of the returned array instead of reading the integer directly.
> > >>>>>
> > >>>>> Best,
> > >>>>> Yun
> > >>>>>
> > >>>>> ------------------------------------------------------------------
> > >>>>> From:Piotr Nowojski <pi...@ververica.com <mailto:
> pi...@ververica.com
> > >>>>
> > >>>>> Send Time:2019 Aug. 23 (Fri.) 15:20
> > >>>>> To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org>>
> > >>>>> Cc:Yun Gao <yungao...@aliyun.com <mailto:yungao...@aliyun.com>>
> > >>>>> Subject:Re: [DISCUSS] Enhance Support for Multicast Communication
> > >> Pattern
> > >>>>>
> > >>>>> Hi,
> > >>>>>
> > >>>>> Yun:
> > >>>>>
> > >>>>> Thanks for proposing the idea. I have checked the document and left
> > >> couple of questions there, but it might be better to answer them here.
> > >>>>>
> > >>>>> What is the exact motivation and what problems do you want to
> solve?
> > >> We have dropped multicast support from the network stack [1] for two
> > >> reasons:
> > >>>>> 1. Performance
> > >>>>> 2. Code simplicity
> > >>>>>
> > >>>>> The proposal to re introduce `int[]
> ChannelSelector#selectChannels()`
> > >> would revert those changes. At that time we were thinking about a way
> > how
> > >> to keep the multicast support on the network level, while keeping the
> > >> performance and simplicity for non multicast cases and there are ways
> to
> > >> achieve that. However they would add extra complexity to Flink, which
> it
> > >> would be better to avoid.
> > >>>>>
> > >>>>> On the other hand, supporting dual pattern: standard partitioning
> or
> > >> broadcasting is easy to do, as LatencyMarkers are doing exactly that.
> It
> > >> would be just a matter of exposing this to the user in some way. So
> > before
> > >> we go any further, can you describe your use cases/motivation? Isn’t
> > mix of
> > >> standard partitioning and broadcasting enough? Do we need
> multicasting?
> > >>>>>
> > >>>>> Zhu:
> > >>>>>
> > >>>>> Could you rephrase your example? I didn’t quite understand it.
> > >>>>>
> > >>>>> Piotrek
> > >>>>>
> > >>>>> [1] https://issues.apache.org/jira/browse/FLINK-10662 <
> > >> https://issues.apache.org/jira/browse/FLINK-10662> <
> > >> https://issues.apache.org/jira/browse/FLINK-10662 <
> > >> https://issues.apache.org/jira/browse/FLINK-10662>>
> > >>>>>
> > >>>>> On 23 Aug 2019, at 09:17, Zhu Zhu <reed...@gmail.com <mailto:
> > >> reed...@gmail.com> <mailto:reed...@gmail.com <mailto:
> reed...@gmail.com
> > >>>
> > >> wrote:
> > >>>>>
> > >>>>> Thanks Yun for starting this discussion.
> > >>>>> I think the multicasting can be very helpful in certain cases.
> > >>>>>
> > >>>>> I have received requirements from users that they want to do
> > broadcast
> > >>>>> join, while the data set to broadcast is too large to fit in one
> > task.
> > >>>>> Thus the requirement turned out to be to support cartesian product
> of
> > >> 2
> > >>>>> data set(one of which can be infinite stream).
> > >>>>> For example, A(parallelism=2) broadcast join B(parallelism=2) in
> > >> JobVertex
> > >>>>> C.
> > >>>>> The idea to is have 4 C subtasks to deal with different
> combinations
> > >> of A/B
> > >>>>> partitions, like C1(A1,B1), C2(A1,B2), C3(A2,B1), C4(A2,B2).
> > >>>>> This requires one record to be sent to multiple downstream
> subtasks,
> > >> but
> > >>>>> not to all subtasks.
> > >>>>>
> > >>>>> With current interface this is not supported, as one record can
> only
> > >> be
> > >>>>> sent to one subtask, or to all subtasks of a JobVertex.
> > >>>>> And the user had to split the broadcast data set manually to
> several
> > >>>>> different JobVertices, which is hard to maintain and extend.
> > >>>>>
> > >>>>> Thanks,
> > >>>>> Zhu Zhu
> > >>>>>
> > >>>>> Yun Gao <yungao...@aliyun.com.invalid <mailto:
> > >> yungao...@aliyun.com.invalid <mailto:yungao...@aliyun.com.invalid>>>
> > >> 于2019年8月22日周四 下午8:42写道:
> > >>>>>
> > >>>>> Hi everyone,
> > >>>>>    In some scenarios we met a requirement that some operators want
> to
> > >>>>> send records to theirs downstream operators with an multicast
> > >> communication
> > >>>>> pattern. In detail, for some records, the operators want to send
> them
> > >>>>> according to the partitioner (for example, Rebalance), and for some
> > >> other
> > >>>>> records, the operators want to send them to all the connected
> > >> operators and
> > >>>>> tasks. Such a communication pattern could be viewed as a kind of
> > >> multicast:
> > >>>>> it does not broadcast every record, but some record will indeed be
> > >> sent to
> > >>>>> multiple downstream operators.
> > >>>>>
> > >>>>> However, we found that this kind of communication pattern seems
> could
> > >> not
> > >>>>> be implemented rightly if the operators have multiple consumers
> with
> > >>>>> different parallelism, using the customized partitioner. To solve
> the
> > >> above
> > >>>>> problem, we propose to enhance the support for such kind of
> irregular
> > >>>>> communication pattern. We think there may be two options:
> > >>>>>
> > >>>>>   1. Support a kind of customized operator events, which share much
> > >>>>> similarity with Watermark, and these events can be broadcasted to
> the
> > >>>>> downstream operators separately.
> > >>>>>   2. Let the channel selector supports multicast, and also add the
> > >>>>> separate RecordWriter implementation to avoid impacting the
> > >> performance of
> > >>>>> the channel selector that does not need multicast.
> > >>>>>
> > >>>>> The problem and options are detailed in
> > >>>>>
> > >>
> >
> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing
> > >> <
> > >>
> >
> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing
> > >
> > >> <
> > >>
> >
> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing
> > >> <
> > >>
> >
> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing
> > >>>>
> > >>>>>
> > >>>>> We are also wondering if there are other methods to implement this
> > >>>>> requirement with or without changing Runtime. Very thanks for any
> > >> feedbacks
> > >>>>> !
> > >>>>>
> > >>>>>
> > >>>>> Best,
> > >>>>> Yun
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >>
> >
> >
>

Reply via email to