Hi all,

Sorry for joining this thread late. Basically, I think enabling multicast 
pattern could be the right direction, but more detailed implementation policies 
need to be discussed.

Two years ago, I filed an issue [1] about the multicast API. However, due to 
some reasons, it was laid aside. After that, when I tried to cherry-pick the 
change for experimental use, I found the return type of `selectChannels()` 
method had changed from `int[]` to `int`, which makes the old implementation 
not work anymore.

From my side, the multicast has always been used for theta-join. As far as I 
know, it’s an essential requirement for some sophisticated joining algorithms. 
Until now, the Flink non-equi joins can still only be executed single-threaded. 
If we'd like to make some improvements on this, we should first take some 
measures to support multicast pattern.

Best,
Xingcan

[1] https://issues.apache.org/jira/browse/FLINK-6936

> On Aug 24, 2019, at 5:54 AM, Zhu Zhu <reed...@gmail.com> wrote:
> 
> Hi Piotr,
> 
> Thanks for the explanation.
> Agreed that the broadcastEmit(record) is a better choice for broadcasting
> for the iterations.
> As broadcasting for the iterations is the first motivation, let's support
> it first.
> 
> Thanks,
> Zhu Zhu
> 
> Yun Gao <yungao...@aliyun.com.invalid> 于2019年8月23日周五 下午11:56写道:
> 
>>     Hi Piotr,
>> 
>>      Very thanks for the suggestions!
>> 
>>     Totally agree with that we could first focus on the broadcast
>> scenarios and exposing the broadcastEmit method first considering the
>> semantics and performance.
>> 
>>     For the keyed stream, I also agree with that broadcasting keyed
>> records to all the tasks may be confused considering the semantics of keyed
>> partitioner. However, in the iteration case supporting broadcast over keyed
>> partitioner should be required since users may create any subgraph for the
>> iteration body, including the operators with key. I think a possible
>> solution to this issue is to introduce another data type for
>> 'broadcastEmit'. For example, for an operator Operator<T>, it may broadcast
>> emit another type E instead of T, and the transmitting E will bypass the
>> partitioner and setting keyed context. This should result in the design to
>> introduce customized operator event (option 1 in the document). The cost of
>> this method is that we need to introduce a new type of StreamElement and
>> new interface for this type, but it should be suitable for both keyed or
>> non-keyed partitioner.
>> 
>> Best,
>> Yun
>> 
>> 
>> 
>> ------------------------------------------------------------------
>> From:Piotr Nowojski <pi...@ververica.com>
>> Send Time:2019 Aug. 23 (Fri.) 22:29
>> To:Zhu Zhu <reed...@gmail.com>
>> Cc:dev <dev@flink.apache.org>; Yun Gao <yungao...@aliyun.com>
>> Subject:Re: [DISCUSS] Enhance Support for Multicast Communication Pattern
>> 
>> Hi,
>> 
>> If the primary motivation is broadcasting (for the iterations) and we have
>> no immediate need for multicast (cross join), I would prefer to first
>> expose broadcast via the DataStream API and only later, once we finally
>> need it, support multicast. As I wrote, multicast would be more challenging
>> to implement, with more complicated runtime and API. And re-using multicast
>> just to support broadcast doesn’t have much sense:
>> 
>> 1. It’s a bit obfuscated. It’s easier to understand
>> collectBroadcast(record) or broadcastEmit(record) compared to some
>> multicast channel selector that just happens to return all of the channels.
>> 2. There are performance benefits of explicitly calling
>> `RecordWriter#broadcastEmit`.
>> 
>> 
>> On a different note, what would be the semantic of such broadcast emit on
>> KeyedStream? Would it be supported? Or would we limit support only to the
>> non-keyed streams?
>> 
>> Piotrek
>> 
>>> On 23 Aug 2019, at 12:48, Zhu Zhu <reed...@gmail.com> wrote:
>>> 
>>> Thanks Piotr,
>>> 
>>> Users asked for this feature sometimes ago when they migrating batch
>> jobs to Flink(Blink).
>>> It's not very urgent as they have taken some workarounds to solve
>> it.(like partitioning data set to different job vertices)
>>> So it's fine to not make it top priority.
>>> 
>>> Anyway, as a commonly known scenario, I think users can benefit from
>> cross join sooner or later.
>>> 
>>> Thanks,
>>> Zhu Zhu
>>> 
>>> Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com>>
>> 于2019年8月23日周五 下午6:19写道:
>>> Hi,
>>> 
>>> Thanks for the answers :) Ok I understand the full picture now. +1 from
>> my side on solving this issue somehow. But before we start discussing how
>> to solve it one last control question:
>>> 
>>> I guess this multicast is intended to be used in blink planner, right?
>> Assuming that we implement the multicast support now, when would it be used
>> by the blink? I would like to avoid a scenario, where we implement an
>> unused feature and we keep maintaining it for a long period of time.
>>> 
>>> Piotrek
>>> 
>>> PS, try to include motivating examples, including concrete ones in the
>> proposals/design docs, for example in the very first paragraph. Especially
>> if it’s a commonly known feature like cross join :)
>>> 
>>>> On 23 Aug 2019, at 11:38, Yun Gao <yungao...@aliyun.com.INVALID>
>> wrote:
>>>> 
>>>>    Hi Piotr,
>>>> 
>>>>       Thanks a lot for sharing the thoughts!
>>>> 
>>>>       For the iteration, agree with that multicasting is not
>> necessary. Exploring the broadcast interface to Output of the operators in
>> some way should also solve this issue, and I think it should be even more
>> convenient to have the broadcast method for the iteration.
>>>> 
>>>>       Also thanks Zhu Zhu for the cross join case!
>>>> Best,
>>>>  Yun
>>>> 
>>>> 
>>>> 
>>>> ------------------------------------------------------------------
>>>> From:Zhu Zhu <reed...@gmail.com <mailto:reed...@gmail.com>>
>>>> Send Time:2019 Aug. 23 (Fri.) 17:25
>>>> To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org>>
>>>> Cc:Yun Gao <yungao...@aliyun.com <mailto:yungao...@aliyun.com>>
>>>> Subject:Re: [DISCUSS] Enhance Support for Multicast Communication
>> Pattern
>>>> 
>>>> Hi Piotr,
>>>> 
>>>> Yes you are right it's a distributed cross join requirement.
>>>> Broadcast join can help with cross join cases. But users cannot use it
>> if the data set to join is too large to fit into one subtask.
>>>> 
>>>> Sorry for left some details behind.
>>>> 
>>>> Thanks,
>>>> Zhu Zhu
>>>> Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com>>
>> 于2019年8月23日周五 下午4:57写道:
>>>> Hi Yun and Zhu Zhu,
>>>> 
>>>> Thanks for the more detailed example Zhu Zhu.
>>>> 
>>>> As far as I understand for the iterations example we do not need
>> multicasting. Regarding the Join example, I don’t fully understand it. The
>> example that Zhu Zhu presented has a drawback of sending both tables to
>> multiple nodes. What’s the benefit of using broadcast join over a hash join
>> in such case? As far as I know, the biggest benefit of using broadcast join
>> instead of hash join is that we can avoid sending the larger table over the
>> network, because we can perform the join locally. In this example we are
>> sending both of the tables to multiple nodes, which should defeat the
>> purpose.
>>>> 
>>>> Is it about implementing cross join or near cross joins in a
>> distributed fashion?
>>>> 
>>>>> if we introduce a new MulticastRecordWriter
>>>> 
>>>> That’s one of the solutions. It might have a drawback of 3 class
>> virtualisation problem (We have RecordWriter and BroadcastRecordWriter
>> already). With up to two implementations, JVM is able to devirtualise the
>> calls.
>>>> 
>>>> Previously I was also thinking about just providing two different
>> ChannelSelector interfaces. One with `int[]` and `SingleChannelSelector`
>> with plain `int` and based on that, RecordWriter could perform some magic
>> (worst case scenario `instaceof` checks).
>>>> 
>>>> Another solution might be to change `ChannelSelector` interface into
>> an iterator.
>>>> 
>>>> But let's discuss the details after we agree on implementing this.
>>>> 
>>>> Piotrek
>>>> 
>>>>> On 23 Aug 2019, at 10:20, Yun Gao <yungao...@aliyun.com <mailto:
>> yungao...@aliyun.com>> wrote:
>>>>> 
>>>>>  Hi Piotr,
>>>>> 
>>>>>       Thanks a lot for the suggestions!
>>>>> 
>>>>>       The core motivation of this discussion is to implement a new
>> iteration library on the DataStream, and it requires to insert special
>> records in the stream to notify the progress of the iteration. The
>> mechanism of such records is very similar to the current Watermark, and we
>> meet the problem of sending normal records according to the partition
>> (Rebalance, etc..) and also be able to broadcast the inserted progress
>> records to all the connected records. I have read the notes in the google
>> doc and I totally agree with that exploring the broadcast interface in
>> RecordWriter in some way is able to solve this issue.
>>>>> 
>>>>>      Regarding to `int[] ChannelSelector#selectChannels()`, I'm
>> wondering if we introduce a new MulticastRecordWriter and left the current
>> RecordWriter untouched, could we avoid the performance degradation ? Since
>> with such a modification the normal RecordWriter does not need to iterate
>> the return array by ChannelSelector, and the only difference will be
>> returning an array instead of an integer, and accessing the first element
>> of the returned array instead of reading the integer directly.
>>>>> 
>>>>> Best,
>>>>> Yun
>>>>> 
>>>>> ------------------------------------------------------------------
>>>>> From:Piotr Nowojski <pi...@ververica.com <mailto:pi...@ververica.com
>>>> 
>>>>> Send Time:2019 Aug. 23 (Fri.) 15:20
>>>>> To:dev <dev@flink.apache.org <mailto:dev@flink.apache.org>>
>>>>> Cc:Yun Gao <yungao...@aliyun.com <mailto:yungao...@aliyun.com>>
>>>>> Subject:Re: [DISCUSS] Enhance Support for Multicast Communication
>> Pattern
>>>>> 
>>>>> Hi,
>>>>> 
>>>>> Yun:
>>>>> 
>>>>> Thanks for proposing the idea. I have checked the document and left
>> couple of questions there, but it might be better to answer them here.
>>>>> 
>>>>> What is the exact motivation and what problems do you want to solve?
>> We have dropped multicast support from the network stack [1] for two
>> reasons:
>>>>> 1. Performance
>>>>> 2. Code simplicity
>>>>> 
>>>>> The proposal to re introduce `int[] ChannelSelector#selectChannels()`
>> would revert those changes. At that time we were thinking about a way how
>> to keep the multicast support on the network level, while keeping the
>> performance and simplicity for non multicast cases and there are ways to
>> achieve that. However they would add extra complexity to Flink, which it
>> would be better to avoid.
>>>>> 
>>>>> On the other hand, supporting dual pattern: standard partitioning or
>> broadcasting is easy to do, as LatencyMarkers are doing exactly that. It
>> would be just a matter of exposing this to the user in some way. So before
>> we go any further, can you describe your use cases/motivation? Isn’t mix of
>> standard partitioning and broadcasting enough? Do we need multicasting?
>>>>> 
>>>>> Zhu:
>>>>> 
>>>>> Could you rephrase your example? I didn’t quite understand it.
>>>>> 
>>>>> Piotrek
>>>>> 
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-10662 <
>> https://issues.apache.org/jira/browse/FLINK-10662> <
>> https://issues.apache.org/jira/browse/FLINK-10662 <
>> https://issues.apache.org/jira/browse/FLINK-10662>>
>>>>> 
>>>>> On 23 Aug 2019, at 09:17, Zhu Zhu <reed...@gmail.com <mailto:
>> reed...@gmail.com> <mailto:reed...@gmail.com <mailto:reed...@gmail.com>>>
>> wrote:
>>>>> 
>>>>> Thanks Yun for starting this discussion.
>>>>> I think the multicasting can be very helpful in certain cases.
>>>>> 
>>>>> I have received requirements from users that they want to do broadcast
>>>>> join, while the data set to broadcast is too large to fit in one task.
>>>>> Thus the requirement turned out to be to support cartesian product of
>> 2
>>>>> data set(one of which can be infinite stream).
>>>>> For example, A(parallelism=2) broadcast join B(parallelism=2) in
>> JobVertex
>>>>> C.
>>>>> The idea to is have 4 C subtasks to deal with different combinations
>> of A/B
>>>>> partitions, like C1(A1,B1), C2(A1,B2), C3(A2,B1), C4(A2,B2).
>>>>> This requires one record to be sent to multiple downstream subtasks,
>> but
>>>>> not to all subtasks.
>>>>> 
>>>>> With current interface this is not supported, as one record can only
>> be
>>>>> sent to one subtask, or to all subtasks of a JobVertex.
>>>>> And the user had to split the broadcast data set manually to several
>>>>> different JobVertices, which is hard to maintain and extend.
>>>>> 
>>>>> Thanks,
>>>>> Zhu Zhu
>>>>> 
>>>>> Yun Gao <yungao...@aliyun.com.invalid <mailto:
>> yungao...@aliyun.com.invalid <mailto:yungao...@aliyun.com.invalid>>>
>> 于2019年8月22日周四 下午8:42写道:
>>>>> 
>>>>> Hi everyone,
>>>>>    In some scenarios we met a requirement that some operators want to
>>>>> send records to theirs downstream operators with an multicast
>> communication
>>>>> pattern. In detail, for some records, the operators want to send them
>>>>> according to the partitioner (for example, Rebalance), and for some
>> other
>>>>> records, the operators want to send them to all the connected
>> operators and
>>>>> tasks. Such a communication pattern could be viewed as a kind of
>> multicast:
>>>>> it does not broadcast every record, but some record will indeed be
>> sent to
>>>>> multiple downstream operators.
>>>>> 
>>>>> However, we found that this kind of communication pattern seems could
>> not
>>>>> be implemented rightly if the operators have multiple consumers with
>>>>> different parallelism, using the customized partitioner. To solve the
>> above
>>>>> problem, we propose to enhance the support for such kind of irregular
>>>>> communication pattern. We think there may be two options:
>>>>> 
>>>>>   1. Support a kind of customized operator events, which share much
>>>>> similarity with Watermark, and these events can be broadcasted to the
>>>>> downstream operators separately.
>>>>>   2. Let the channel selector supports multicast, and also add the
>>>>> separate RecordWriter implementation to avoid impacting the
>> performance of
>>>>> the channel selector that does not need multicast.
>>>>> 
>>>>> The problem and options are detailed in
>>>>> 
>> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing
>> <
>> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing>
>> <
>> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing
>> <
>> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing
>>>> 
>>>>> 
>>>>> We are also wondering if there are other methods to implement this
>>>>> requirement with or without changing Runtime. Very thanks for any
>> feedbacks
>>>>> !
>>>>> 
>>>>> 
>>>>> Best,
>>>>> Yun
>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>> 
>>>> 
>>> 
>> 
>> 
>> 

Reply via email to