Hi, Thanks for the answers :) Ok I understand the full picture now. +1 from my side on solving this issue somehow. But before we start discussing how to solve it one last control question:
I guess this multicast is intended to be used in blink planner, right? Assuming that we implement the multicast support now, when would it be used by the blink? I would like to avoid a scenario, where we implement an unused feature and we keep maintaining it for a long period of time. Piotrek PS, try to include motivating examples, including concrete ones in the proposals/design docs, for example in the very first paragraph. Especially if it’s a commonly known feature like cross join :) > On 23 Aug 2019, at 11:38, Yun Gao <yungao...@aliyun.com.INVALID> wrote: > > Hi Piotr, > > Thanks a lot for sharing the thoughts! > > For the iteration, agree with that multicasting is not necessary. > Exploring the broadcast interface to Output of the operators in some way > should also solve this issue, and I think it should be even more convenient > to have the broadcast method for the iteration. > > Also thanks Zhu Zhu for the cross join case! > Best, > Yun > > > > ------------------------------------------------------------------ > From:Zhu Zhu <reed...@gmail.com> > Send Time:2019 Aug. 23 (Fri.) 17:25 > To:dev <dev@flink.apache.org> > Cc:Yun Gao <yungao...@aliyun.com> > Subject:Re: [DISCUSS] Enhance Support for Multicast Communication Pattern > > Hi Piotr, > > Yes you are right it's a distributed cross join requirement. > Broadcast join can help with cross join cases. But users cannot use it if the > data set to join is too large to fit into one subtask. > > Sorry for left some details behind. > > Thanks, > Zhu Zhu > Piotr Nowojski <pi...@ververica.com> 于2019年8月23日周五 下午4:57写道: > Hi Yun and Zhu Zhu, > > Thanks for the more detailed example Zhu Zhu. > > As far as I understand for the iterations example we do not need > multicasting. Regarding the Join example, I don’t fully understand it. The > example that Zhu Zhu presented has a drawback of sending both tables to > multiple nodes. What’s the benefit of using broadcast join over a hash join > in such case? As far as I know, the biggest benefit of using broadcast join > instead of hash join is that we can avoid sending the larger table over the > network, because we can perform the join locally. In this example we are > sending both of the tables to multiple nodes, which should defeat the purpose. > > Is it about implementing cross join or near cross joins in a distributed > fashion? > >> if we introduce a new MulticastRecordWriter > > That’s one of the solutions. It might have a drawback of 3 class > virtualisation problem (We have RecordWriter and BroadcastRecordWriter > already). With up to two implementations, JVM is able to devirtualise the > calls. > > Previously I was also thinking about just providing two different > ChannelSelector interfaces. One with `int[]` and `SingleChannelSelector` with > plain `int` and based on that, RecordWriter could perform some magic (worst > case scenario `instaceof` checks). > > Another solution might be to change `ChannelSelector` interface into an > iterator. > > But let's discuss the details after we agree on implementing this. > > Piotrek > >> On 23 Aug 2019, at 10:20, Yun Gao <yungao...@aliyun.com> wrote: >> >> Hi Piotr, >> >> Thanks a lot for the suggestions! >> >> The core motivation of this discussion is to implement a new >> iteration library on the DataStream, and it requires to insert special >> records in the stream to notify the progress of the iteration. The mechanism >> of such records is very similar to the current Watermark, and we meet the >> problem of sending normal records according to the partition (Rebalance, >> etc..) and also be able to broadcast the inserted progress records to all >> the connected records. I have read the notes in the google doc and I totally >> agree with that exploring the broadcast interface in RecordWriter in some >> way is able to solve this issue. >> >> Regarding to `int[] ChannelSelector#selectChannels()`, I'm wondering >> if we introduce a new MulticastRecordWriter and left the current >> RecordWriter untouched, could we avoid the performance degradation ? Since >> with such a modification the normal RecordWriter does not need to iterate >> the return array by ChannelSelector, and the only difference will be >> returning an array instead of an integer, and accessing the first element of >> the returned array instead of reading the integer directly. >> >> Best, >> Yun >> >> ------------------------------------------------------------------ >> From:Piotr Nowojski <pi...@ververica.com> >> Send Time:2019 Aug. 23 (Fri.) 15:20 >> To:dev <dev@flink.apache.org> >> Cc:Yun Gao <yungao...@aliyun.com> >> Subject:Re: [DISCUSS] Enhance Support for Multicast Communication Pattern >> >> Hi, >> >> Yun: >> >> Thanks for proposing the idea. I have checked the document and left couple >> of questions there, but it might be better to answer them here. >> >> What is the exact motivation and what problems do you want to solve? We have >> dropped multicast support from the network stack [1] for two reasons: >> 1. Performance >> 2. Code simplicity >> >> The proposal to re introduce `int[] ChannelSelector#selectChannels()` would >> revert those changes. At that time we were thinking about a way how to keep >> the multicast support on the network level, while keeping the performance >> and simplicity for non multicast cases and there are ways to achieve that. >> However they would add extra complexity to Flink, which it would be better >> to avoid. >> >> On the other hand, supporting dual pattern: standard partitioning or >> broadcasting is easy to do, as LatencyMarkers are doing exactly that. It >> would be just a matter of exposing this to the user in some way. So before >> we go any further, can you describe your use cases/motivation? Isn’t mix of >> standard partitioning and broadcasting enough? Do we need multicasting? >> >> Zhu: >> >> Could you rephrase your example? I didn’t quite understand it. >> >> Piotrek >> >> [1] https://issues.apache.org/jira/browse/FLINK-10662 >> <https://issues.apache.org/jira/browse/FLINK-10662> >> >> On 23 Aug 2019, at 09:17, Zhu Zhu <reed...@gmail.com >> <mailto:reed...@gmail.com>> wrote: >> >> Thanks Yun for starting this discussion. >> I think the multicasting can be very helpful in certain cases. >> >> I have received requirements from users that they want to do broadcast >> join, while the data set to broadcast is too large to fit in one task. >> Thus the requirement turned out to be to support cartesian product of 2 >> data set(one of which can be infinite stream). >> For example, A(parallelism=2) broadcast join B(parallelism=2) in JobVertex >> C. >> The idea to is have 4 C subtasks to deal with different combinations of A/B >> partitions, like C1(A1,B1), C2(A1,B2), C3(A2,B1), C4(A2,B2). >> This requires one record to be sent to multiple downstream subtasks, but >> not to all subtasks. >> >> With current interface this is not supported, as one record can only be >> sent to one subtask, or to all subtasks of a JobVertex. >> And the user had to split the broadcast data set manually to several >> different JobVertices, which is hard to maintain and extend. >> >> Thanks, >> Zhu Zhu >> >> Yun Gao <yungao...@aliyun.com.invalid <mailto:yungao...@aliyun.com.invalid>> >> 于2019年8月22日周四 下午8:42写道: >> >> Hi everyone, >> In some scenarios we met a requirement that some operators want to >> send records to theirs downstream operators with an multicast communication >> pattern. In detail, for some records, the operators want to send them >> according to the partitioner (for example, Rebalance), and for some other >> records, the operators want to send them to all the connected operators and >> tasks. Such a communication pattern could be viewed as a kind of multicast: >> it does not broadcast every record, but some record will indeed be sent to >> multiple downstream operators. >> >> However, we found that this kind of communication pattern seems could not >> be implemented rightly if the operators have multiple consumers with >> different parallelism, using the customized partitioner. To solve the above >> problem, we propose to enhance the support for such kind of irregular >> communication pattern. We think there may be two options: >> >> 1. Support a kind of customized operator events, which share much >> similarity with Watermark, and these events can be broadcasted to the >> downstream operators separately. >> 2. Let the channel selector supports multicast, and also add the >> separate RecordWriter implementation to avoid impacting the performance of >> the channel selector that does not need multicast. >> >> The problem and options are detailed in >> https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing >> >> <https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing> >> >> We are also wondering if there are other methods to implement this >> requirement with or without changing Runtime. Very thanks for any feedbacks >> ! >> >> >> Best, >> Yun >> >> >> >> > >