Hi Piotr,
Thanks a lot for the suggestions!
The core motivation of this discussion is to implement a new iteration
library on the DataStream, and it requires to insert special records in the
stream to notify the progress of the iteration. The mechanism of such records
is very similar to the current Watermark, and we meet the problem of sending
normal records according to the partition (Rebalance, etc..) and also be able
to broadcast the inserted progress records to all the connected records. I have
read the notes in the google doc and I totally agree with that exploring the
broadcast interface in RecordWriter in some way is able to solve this issue.
Regarding to `int[] ChannelSelector#selectChannels()`, I'm wondering if
we introduce a new MulticastRecordWriter and left the current RecordWriter
untouched, could we avoid the performance degradation ? Since with such a
modification the normal RecordWriter does not need to iterate the return array
by ChannelSelector, and the only difference will be returning an array instead
of an integer, and accessing the first element of the returned array instead of
reading the integer directly.
Best,
Yun
------------------------------------------------------------------
From:Piotr Nowojski <[email protected]>
Send Time:2019 Aug. 23 (Fri.) 15:20
To:dev <[email protected]>
Cc:Yun Gao <[email protected]>
Subject:Re: [DISCUSS] Enhance Support for Multicast Communication Pattern
Hi,
Yun:
Thanks for proposing the idea. I have checked the document and left couple of
questions there, but it might be better to answer them here.
What is the exact motivation and what problems do you want to solve? We have
dropped multicast support from the network stack [1] for two reasons:
1. Performance
2. Code simplicity
The proposal to re introduce `int[] ChannelSelector#selectChannels()` would
revert those changes. At that time we were thinking about a way how to keep the
multicast support on the network level, while keeping the performance and
simplicity for non multicast cases and there are ways to achieve that. However
they would add extra complexity to Flink, which it would be better to avoid.
On the other hand, supporting dual pattern: standard partitioning or
broadcasting is easy to do, as LatencyMarkers are doing exactly that. It would
be just a matter of exposing this to the user in some way. So before we go any
further, can you describe your use cases/motivation? Isn’t mix of standard
partitioning and broadcasting enough? Do we need multicasting?
Zhu:
Could you rephrase your example? I didn’t quite understand it.
Piotrek
[1] https://issues.apache.org/jira/browse/FLINK-10662
On 23 Aug 2019, at 09:17, Zhu Zhu <[email protected]> wrote:
Thanks Yun for starting this discussion.
I think the multicasting can be very helpful in certain cases.
I have received requirements from users that they want to do broadcast
join, while the data set to broadcast is too large to fit in one task.
Thus the requirement turned out to be to support cartesian product of 2
data set(one of which can be infinite stream).
For example, A(parallelism=2) broadcast join B(parallelism=2) in JobVertex
C.
The idea to is have 4 C subtasks to deal with different combinations of A/B
partitions, like C1(A1,B1), C2(A1,B2), C3(A2,B1), C4(A2,B2).
This requires one record to be sent to multiple downstream subtasks, but
not to all subtasks.
With current interface this is not supported, as one record can only be
sent to one subtask, or to all subtasks of a JobVertex.
And the user had to split the broadcast data set manually to several
different JobVertices, which is hard to maintain and extend.
Thanks,
Zhu Zhu
Yun Gao <[email protected]> 于2019年8月22日周四 下午8:42写道:
Hi everyone,
In some scenarios we met a requirement that some operators want to
send records to theirs downstream operators with an multicast communication
pattern. In detail, for some records, the operators want to send them
according to the partitioner (for example, Rebalance), and for some other
records, the operators want to send them to all the connected operators and
tasks. Such a communication pattern could be viewed as a kind of multicast:
it does not broadcast every record, but some record will indeed be sent to
multiple downstream operators.
However, we found that this kind of communication pattern seems could not
be implemented rightly if the operators have multiple consumers with
different parallelism, using the customized partitioner. To solve the above
problem, we propose to enhance the support for such kind of irregular
communication pattern. We think there may be two options:
1. Support a kind of customized operator events, which share much
similarity with Watermark, and these events can be broadcasted to the
downstream operators separately.
2. Let the channel selector supports multicast, and also add the
separate RecordWriter implementation to avoid impacting the performance of
the channel selector that does not need multicast.
The problem and options are detailed in
https://docs.google.com/document/d/1npi5c_SeP68KuT2lNdKd8G7toGR_lxQCGOnZm_hVMks/edit?usp=sharing
We are also wondering if there are other methods to implement this
requirement with or without changing Runtime. Very thanks for any feedbacks
!
Best,
Yun