Hi,

Thanks for the answers :) Ok I understand the full picture now. +1 from my side 
on solving this issue somehow. But before we start discussing how to solve it 
one last control question:

I guess this multicast is intended to be used in blink planner, right? Assuming 
that we implement the multicast support now, when would it be used by the 
blink? I would like to avoid a scenario, where we implement an unused feature 
and we keep maintaining it for a long period of time.

Piotrek

PS, try to include motivating examples, including concrete ones in the 
proposals/design docs, for example in the very first paragraph. Especially if 
it’s a commonly known feature like cross join :)

> On 23 Aug 2019, at 11:38, Yun Gao <yungao...@aliyun.com.INVALID> wrote:
> 
>     Hi Piotr,
> 
>        Thanks a lot for sharing the thoughts! 
> 
>        For the iteration, agree with that multicasting is not necessary. 
> Exploring the broadcast interface to Output of the operators in some way 
> should also solve this issue, and I think it should be even more convenient 
> to have the broadcast method for the iteration. 
> 
>        Also thanks Zhu Zhu for the cross join case!
>  Best, 
>   Yun
> 
> 
> 
> ------------------------------------------------------------------
> From:Zhu Zhu <reed...@gmail.com>
> Send Time:2019 Aug. 23 (Fri.) 17:25
> To:dev <dev@flink.apache.org>
> Cc:Yun Gao <yungao...@aliyun.com>
> Subject:Re: [DISCUSS] Enhance Support for Multicast Communication Pattern
> 
> Hi Piotr,
> 
> Yes you are right it's a distributed cross join requirement.
> Broadcast join can help with cross join cases. But users cannot use it if the 
> data set to join is too large to fit into one subtask.
> 
> Sorry for left some details behind.
> 
> Thanks,
> Zhu Zhu
> Piotr Nowojski <pi...@ververica.com> 于2019年8月23日周五 下午4:57写道:
> Hi Yun and Zhu Zhu,
> 
> Thanks for the more detailed example Zhu Zhu.
> 
> As far as I understand for the iterations example we do not need 
> multicasting. Regarding the Join example, I don’t fully understand it. The 
> example that Zhu Zhu presented has a drawback of sending both tables to 
> multiple nodes. What’s the benefit of using broadcast join over a hash join 
> in such case? As far as I know, the biggest benefit of using broadcast join 
> instead of hash join is that we can avoid sending the larger table over the 
> network, because we can perform the join locally. In this example we are 
> sending both of the tables to multiple nodes, which should defeat the purpose.
> 
> Is it about implementing cross join or near cross joins in a distributed 
> fashion? 
> 
>> if we introduce a new MulticastRecordWriter
> 
> That’s one of the solutions. It might have a drawback of 3 class 
> virtualisation problem (We have RecordWriter and BroadcastRecordWriter 
> already). With up to two implementations, JVM is able to devirtualise the 
> calls.
> 
> Previously I was also thinking about just providing two different 
> ChannelSelector interfaces. One with `int[]` and `SingleChannelSelector` with 
> plain `int` and based on that, RecordWriter could perform some magic (worst 
> case scenario `instaceof` checks).
> 
> Another solution might be to change `ChannelSelector` interface into an 
> iterator.
> 
> But let's discuss the details after we agree on implementing this.
> 
> Piotrek
> 
>> On 23 Aug 2019, at 10:20, Yun Gao <yungao...@aliyun.com> wrote:
>> 
>>   Hi Piotr,
>> 
>>        Thanks a lot for the suggestions!
>> 
>>        The core motivation of this discussion is to implement a new 
>> iteration library on the DataStream, and it requires to insert special 
>> records in the stream to notify the progress of the iteration. The mechanism 
>> of such records is very similar to the current Watermark, and we meet the 
>> problem of sending normal records according to the partition (Rebalance, 
>> etc..) and also be able to broadcast the inserted progress records to all 
>> the connected records. I have read the notes in the google doc and I totally 
>> agree with that exploring the broadcast interface in RecordWriter in some 
>> way is able to solve this issue. 
>> 
>>       Regarding to `int[] ChannelSelector#selectChannels()`, I'm wondering 
>> if we introduce a new MulticastRecordWriter and left the current 
>> RecordWriter untouched, could we avoid the performance degradation ? Since 
>> with such a modification the normal RecordWriter does not need to iterate 
>> the return array by ChannelSelector, and the only difference will be 
>> returning an array instead of an integer, and accessing the first element of 
>> the returned array instead of reading the integer directly.
>> 
>> Best,
>> Yun
>> 
>> ------------------------------------------------------------------
>> From:Piotr Nowojski <pi...@ververica.com>
>> Send Time:2019 Aug. 23 (Fri.) 15:20
>> To:dev <dev@flink.apache.org>
>> Cc:Yun Gao <yungao...@aliyun.com>
>> Subject:Re: [DISCUSS] Enhance Support for Multicast Communication Pattern
>> 
>> Hi,
>> 
>> Yun:
>> 
>> Thanks for proposing the idea. I have checked the document and left couple 
>> of questions there, but it might be better to answer them here.
>> 
>> What is the exact motivation and what problems do you want to solve? We have 
>> dropped multicast support from the network stack [1] for two reasons:
>> 1. Performance 
>> 2. Code simplicity 
>> 
>> The proposal to re introduce `int[] ChannelSelector#selectChannels()` would 
>> revert those changes. At that time we were thinking about a way how to keep 
>> the multicast support on the network level, while keeping the performance 
>> and simplicity for non multicast cases and there are ways to achieve that. 
>> However they would add extra complexity to Flink, which it would be better 
>> to avoid.
>> 
>> On the other hand, supporting dual pattern: standard partitioning or 
>> broadcasting is easy to do, as LatencyMarkers are doing exactly that. It 
>> would be just a matter of exposing this to the user in some way. So before 
>> we go any further, can you describe your use cases/motivation? Isn’t mix of 
>> standard partitioning and broadcasting enough? Do we need multicasting?
>> 
>> Zhu:
>> 
>> Could you rephrase your example? I didn’t quite understand it.
>> 
>> Piotrek
>> 
>> [1] https://issues.apache.org/jira/browse/FLINK-10662 
>> <https://issues.apache.org/jira/browse/FLINK-10662>
>> 
>> On 23 Aug 2019, at 09:17, Zhu Zhu <reed...@gmail.com 
>> <mailto:reed...@gmail.com>> wrote:
>> 
>> Thanks Yun for starting this discussion.
>> I think the multicasting can be very helpful in certain cases.
>> 
>> I have received requirements from users that they want to do broadcast
>> join, while the data set to broadcast is too large to fit in one task.
>> Thus the requirement turned out to be to support cartesian product of 2
>> data set(one of which can be infinite stream).
>> For example, A(parallelism=2) broadcast join B(parallelism=2) in JobVertex
>> C.
>> The idea to is have 4 C subtasks to deal with different combinations of A/B
>> partitions, like C1(A1,B1), C2(A1,B2), C3(A2,B1), C4(A2,B2).
>> This requires one record to be sent to multiple downstream subtasks, but
>> not to all subtasks.
>> 
>> With current interface this is not supported, as one record can only be
>> sent to one subtask, or to all subtasks of a JobVertex.
>> And the user had to split the broadcast data set manually to several
>> different JobVertices, which is hard to maintain and extend.
>> 
>> Thanks,
>> Zhu Zhu
>> 
>> Yun Gao <yungao...@aliyun.com.invalid <mailto:yungao...@aliyun.com.invalid>> 
>> 于2019年8月22日周四 下午8:42写道:
>> 
>> Hi everyone,
>>     In some scenarios we met a requirement that some operators want to
>> send records to theirs downstream operators with an multicast communication
>> pattern. In detail, for some records, the operators want to send them
>> according to the partitioner (for example, Rebalance), and for some other
>> records, the operators want to send them to all the connected operators and
>> tasks. Such a communication pattern could be viewed as a kind of multicast:
>> it does not broadcast every record, but some record will indeed be sent to
>> multiple downstream operators.
>> 
>> However, we found that this kind of communication pattern seems could not
>> be implemented rightly if the operators have multiple consumers with
>> different parallelism, using the customized partitioner. To solve the above
>> problem, we propose to enhance the support for such kind of irregular
>> communication pattern. We think there may be two options:
>> 
>>    1. Support a kind of customized operator events, which share much
>> similarity with Watermark, and these events can be broadcasted to the
>> downstream operators separately.
>>    2. Let the channel selector supports multicast, and also add the
>> separate RecordWriter implementation to avoid impacting the performance of
>> the channel selector that does not need multicast.
>> 
>> The problem and options are detailed in
>> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing
>>  
>> <https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing>
>> 
>> We are also wondering if there are other methods to implement this
>> requirement with or without changing Runtime. Very thanks for any feedbacks
>> !
>> 
>> 
>> Best,
>> Yun
>> 
>> 
>> 
>> 
> 
> 

Reply via email to