Hi I noticed that previous design doc [1] also talked about the topic of introducing new KeyedStreamOperatorNG, I wonder is that a must-do to introduce N-ary stream operator?
[1] https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI Best Yun Tang ________________________________ From: Piotr Nowojski <pi...@ververica.com> Sent: Thursday, January 9, 2020 23:27 To: dev <dev@flink.apache.org> Subject: Re: [DISCUSS] Add N-Ary Stream Operator Hi, I have started a vote on this topic [1], please cast your +1 or -1 there :) Also I assigned FLIP-92 number to this design doc. Piotrek [1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-92-Add-N-Ary-Stream-Operator-in-Flink-td36539.html <http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-92-Add-N-Ary-Stream-Operator-in-Flink-td36539.html> > On 10 Dec 2019, at 07:10, Jingsong Li <jingsongl...@gmail.com> wrote: > > Hi Piotr, > > Sorry for the misunderstanding, chaining does work with multiple output > right now, I mean, it's also a very important feature, and it should work > with N-ary selectable input operators. > We all think that providing N-ary selectable input operator is a very > important thing, it makes TwoInputOperator chaining possible in upper > layer, and it makes things simpler. > > Looking forward to it very much. > > Best, > Jingsong Lee > > On Thu, Dec 5, 2019 at 6:01 PM Piotr Nowojski <pi...@ververica.com> wrote: > >> Hi, >> >> Thanks for the clarifications Jingsong. Indeed, if chaining doesn’t work >> with multiple output right now (doesn’t it?), that’s also a good future >> story. >> >> Re Kurt: >> I think this pattern could be easily handled if those two joins are >> implemented as a single 3 input operator, that internally is composed of >> those three operators. >> 1. You can set the initial InputSelection to Build1 and Build2. >> 2. When Build1 receives `endOfInput`, InputSelection switches to Probe1 >> and Build2. >> 3. When Probe1 receives `endOfInput`, you do not forward the `endOfInput` >> to the internal `HashAgg` operator >> 4. When Build2 finally receives `endOfInput`, you can finally forward the >> `endOfInput` to the internal `HashAgg` >> >> Exactly for reasons like that, I wanted to at least post pone handling >> tree-like operator chains in the Flink. Logic like that is difficult to >> express generically, since it requires the knowledge about the operators >> behaviour. While when hardcoded for the specific project (Blink in this >> case) and encapsulated behind N-ary selectable input operator, it’s very >> easy to handle by the runtime. Sure, at the expense of a bit more >> complexity in forcing the user to compose operators, that’s why I’m not >> saying that we do not want to handle this at some point in the future, but >> at least not in the first version. >> >> Piotrek >> >>> On 5 Dec 2019, at 10:11, Jingsong Li <jingsongl...@gmail.com> wrote: >>> >>> Kurt mentioned a very interesting thing, >>> >>> If we want to better performance to read simultaneously, To this pattern: >>> We need to control not only the read order of inputs, but also the >> outputs >>> of endInput. >>> In this case, HashAggregate can only call its real endInput after the >> input >>> of build2 is finished, so the endInput of an operator is not necessarily >>> determined by its input, but also by other associated inputs. >>> I think we have the ability to do this in the n-input operator. >>> >>> Note that these behaviors should be determined at compile time. >>> >>> Best, >>> Jingsong Lee >>> >>> On Thu, Dec 5, 2019 at 4:42 PM Kurt Young <ykt...@gmail.com> wrote: >>> >>>> During implementing n-ary input operator in table, please keep >>>> this pattern in mind: >>>> >>>> Build1 ---+ >>>> >>>> | >>>> >>>> +---> HshJoin1 --—> HashAgg ---+ >>>> >>>> | | >>>> >>>> Probe1 ---+ +---> HashJoin2 >>>> >>>> | >>>> >>>> Build2 ---+ >>>> >>>> It's quite interesting that both `Build1`, `Build2` and `Probe1` can >>>> be read simultaneously. But we need to control `HashAgg`'s output >>>> before `Build2` finished. I don't have a clear solution for now, but >>>> it's a common pattern we will face. >>>> >>>> Best, >>>> Kurt >>>> >>>> >>>> On Thu, Dec 5, 2019 at 4:37 PM Jingsong Li <jingsongl...@gmail.com> >> wrote: >>>> >>>>> Hi Piotr, >>>>> >>>>>> a) two input operator X -> one input operator Y -> one input operator >> Z >>>>> (ALLOWED) >>>>>> b) n input operator X -> one input operator Y -> one input operator Z >>>>> (ALLOWED) >>>>>> c) two input operator X -> one input operator Y -> two input operator >> Z >>>>> (NOT ALLOWED as a single chain) >>>>> >>>>> NOT ALLOWED to c) sounds good to me. I understand that it is very >>>> difficult >>>>> to propose a general support for any input selectable two input >> operators >>>>> chain with high performance. >>>>> And it is not necessary for table layer too. b) has already excited us. >>>>> >>>>> Actually, we have supported n output chain too: >>>>> d) one/two/n op X -> one op Y -> one op A1 -> one op B1 -> one op C1 >>>>> -> one op A2 -> one op >>>> B2 >>>>> -> one op C2 >>>>> d) is a very useful feature too. >>>>> >>>>>> Do you mean that those Table API/SQL use cases >> (HashJoin/SortMergeJoin) >>>>> could be easily handled by a single N-Ary Stream Operator, so this >> would >>>> be >>>>> covered by steps 1. and 2. from my plan from my previous e-mail? That >>>> would >>>>> be real nice (avoiding the input selection chaining). >>>>> >>>>> Yes, because in the table layer, the typical scenarios currently only >>>> have >>>>> static order. (We don't consider MergeJoin here, because it's too >> complex >>>>> to be optimized, and not deserved to be optimized at present.). >>>>> For example, the current TwoInputOperators: HashJoin and >> NestedLoopJoin. >>>>> They are all static reading order. We must read the build input before >> we >>>>> can read the probe input. >>>>> So after we analyze chain, we put all the operators that can chain into >>>> a N >>>>> input operator, We can analyze the static order required by this >>>> operator, >>>>> and divide the reading order into several levels: >>>>> - fist level: input4, input5, input1 >>>>> - second level: input2, input6 >>>>> - third level: input1, input7 >>>>> Note that these analyses are at the compile time of the client. >>>>> At runtime, we just need to read in a fixed order. >>>>> >>>>> Best, >>>>> Jingsong Lee >>>>> >>>>> On Wed, Dec 4, 2019 at 10:15 PM Piotr Nowojski <pi...@ververica.com> >>>>> wrote: >>>>> >>>>>> Hi Jingsong, >>>>>> >>>>>> Thanks for the feedback :) >>>>>> >>>>>> Could you clarify a little bit what do you mean by your wished use >>>> cases? >>>>>> >>>>>>> There are a large number jobs (in production environment) that their >>>>>>> TwoInputOperators that can be chained. We used to only watch the last >>>>>>> ten tasks transmit data through disk and network, which could have >>>> been >>>>>>> done in one task. >>>>>>> For performance, if we can chain them, the average is 30%+, and there >>>>>>> is an order of magnitude in extreme cases. >>>>>> >>>>>> As I mentioned at the end, I would like to avoid/post pone chaining of >>>>>> multiple/two input operators one after another because of the >>>> complexity >>>>> of >>>>>> input selection. For the first version I would like to aim only to >>>> allow >>>>>> chaining the single input operators with something (2 or N input must >>>> be >>>>>> always head of the chain) . For example chains: >>>>>> >>>>>> a) two input operator X -> one input operator Y -> one input operator >> Z >>>>>> (ALLOWED) >>>>>> b) n input operator X -> one input operator Y -> one input operator Z >>>>>> (ALLOWED) >>>>>> c) two input operator X -> one input operator Y -> two input operator >> Z >>>>>> (NOT ALLOWED as a single chain) >>>>>> >>>>>> The example above sounds to me like c) >>>>>> >>>>>> I think as a follow up, we could allow c), by extend chaining to a >>>> simple >>>>>> rule: there can only be a single input selectable operator in the >> chain >>>>>> (again, it’s the chaining of multiple input selectable operators >> that’s >>>>>> causing some problems). >>>>>> >>>>>>> The table layer has many special features. which give us the chance >>>> to >>>>>> optimize >>>>>>> it, but also results that it is hard to let underlying layer to >>>>> provide >>>>>> an abstract >>>>>>> mechanism to implement it. For example: >>>>>>> - HashJoin must read all the data on one side(build side) and then >>>> read >>>>>> the >>>>>>> other side (probe side). >>>>>>> - HashJoin only emit data when read probe side. >>>>>>> - SortMergeJoin read random, but if we have SortMergeJoin chain >>>> another >>>>>>> MergeJoin(Sort attribute re-use), that make things complicated. >>>>>>> - HashAggregate/Sort only emit data in endInput. >>>>>>> >>>>>>> Provide an N-Ary stream operator to make everything possible. The >>>> upper >>>>>>> layer can do anything. These things can be specific optimization, >>>>> which >>>>>> is much >>>>>>> more natural than the lower layer. >>>>>> >>>>>> Do you mean that those Table API/SQL use cases >> (HashJoin/SortMergeJoin) >>>>>> could be easily handled by a single N-Ary Stream Operator, so this >>>> would >>>>> be >>>>>> covered by steps 1. and 2. from my plan from my previous e-mail? That >>>>> would >>>>>> be real nice (avoiding the input selection chaining). >>>>>> >>>>>> Piotrek >>>>>> >>>>>>> On 4 Dec 2019, at 14:29, Jingsong Li <jingsongl...@gmail.com> wrote: >>>>>>> >>>>>>> Hi Piotr, >>>>>>> >>>>>>> Huge +1 for N-Ary Stream Operator. >>>>>>> And I love this Golden Shovel award very much! >>>>>>> >>>>>>> There are a large number jobs (in production environment) that their >>>>>>> TwoInputOperators that can be chained. We used to only watch the last >>>>>>> ten tasks transmit data through disk and network, which could have >>>> been >>>>>>> done in one task. >>>>>>> For performance, if we can chain them, the average is 30%+, and there >>>>>>> is an order of magnitude in extreme cases. >>>>>>> >>>>>>> The table layer has many special features. which give us the chance >>>> to >>>>>> optimize >>>>>>> it, but also results that it is hard to let underlying layer to >>>>> provide >>>>>> an abstract >>>>>>> mechanism to implement it. For example: >>>>>>> - HashJoin must read all the data on one side(build side) and then >>>> read >>>>>> the >>>>>>> other side (probe side). >>>>>>> - HashJoin only emit data when read probe side. >>>>>>> - SortMergeJoin read random, but if we have SortMergeJoin chain >>>> another >>>>>>> MergeJoin(Sort attribute re-use), that make things complicated. >>>>>>> - HashAggregate/Sort only emit data in endInput. >>>>>>> >>>>>>> Provide an N-Ary stream operator to make everything possible. The >>>> upper >>>>>>> layer can do anything. These things can be specific optimization, >>>>> which >>>>>> is much >>>>>>> more natural than the lower layer. >>>>>>> >>>>>>> In addition to the two optimizations you mentioned, it also gives >>>> more >>>>>> space to >>>>>>> eliminate virtual function calls: >>>>>>> Because following this way, the table layer has to consider the >>>>> operator >>>>>> chain. >>>>>>> And in the future, we can optimize a whole N-Ary stream operator to a >>>>>>> JIT-friendly operator. Without virtual function calls, JIT can play >>>>> its >>>>>> real strength. >>>>>>> >>>>>>> Best, >>>>>>> Jingsong Lee >>>>>>> >>>>>>> On Wed, Dec 4, 2019 at 5:24 PM Piotr Nowojski <pi...@ververica.com >>>>>> <mailto:pi...@ververica.com>> wrote: >>>>>>> Hi, >>>>>>> >>>>>>> First and foremost I would like to nominate myself to the Golden >>>> Shovel >>>>>> award for digging out this topic: >>>>>>> >>>>>>> >>>>>>> >>>>>>> Secondly, I would like to discuss coming back to this particular idea >>>>> of >>>>>> implementing N-Ary Stream Operator. This time motivation doesn’t come >>>>> from >>>>>> the Side Inputs, but to efficiently support multi joins in SQL, >> without >>>>>> extra network exchanges. I’ve reviewed the design doc proposed by >>>>> Aljoscha, >>>>>> I quite like it and I think we could start from that. >>>>>>> >>>>>>> Specifically the end-goal is to allow for example Blink, to: >>>>>>> >>>>>>> I. Implement A* multi broadcast join - to have a single operator >>>> chain, >>>>>> where probe table (source) is read locally (inside the task that’s is >>>>>> actually doing the join), then joined with multiple other broadcasted >>>>>> tables. >>>>>>> II. Another example might be when we have 2 or more sources, >>>>>> pre-partitioned on the same key. In that case we should also be able >> to >>>>>> perform all of the table reading and the join inside a single Task. >>>>>>> >>>>>>> In order to achieve that, I would propose the following plan: >>>>>>> >>>>>>> 1. Implement N-Ary Stream Operator as proposed in the design doc >>>> below, >>>>>> however with added support for the input selection [1]. >>>>>>> - initially it can be just exposed via the `StreamTransformation`, >>>>>> without direct access from the `DataStream API` >>>>>>> >>>>>>> 2. Allow it to be chained with sources (implemented using the FLIP-27 >>>>>> SourceReader [2]) >>>>>>> >>>>>>> 3. Think about whether we need to support more complex chaining. >>>>> Without >>>>>> this point, motivating examples (I and II) could be implemented if all >>>> of >>>>>> the joins/filtering/mappings are compiled/composed into a single N-Ary >>>>>> Stream Operator (which could be chained with some other single input >>>>>> operators at the tail). We could also think about supporting of >>>> chaining >>>>> a >>>>>> tree of for example TwoInputStreamOperators inside a single Task. >>>> However >>>>>> I’m leaving this as a follow up, since in that case, it’s not so easy >>>> to >>>>>> handle the `InputSelection` of multiple operators inside the tree. >>>>>>> >>>>>>> Piotrek >>>>>>> >>>>>>> [1] >>>>>> >>>>> >>>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html >>>>>> < >>>>>> >>>>> >>>> >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html >>>>>>> >>>>>>> [2] >>>>>> >>>>> >>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface >>>>>> < >>>>>> >>>>> >>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface >>>>>>> >>>>>>>>> On 21. Apr 2016, at 17:09, Aljoscha Krettek <aljos...@apache.org >>>>>> <mailto:aljos...@apache.org>> wrote: >>>>>>>>> >>>>>>>>> Hi, >>>>>>>>> yes, I see operators of this style as very much an internal thing. >>>>> You >>>>>> are probably talking about use cases for OneInputOperator and >>>>>> TwoInputOperator where users have a very specific need and require >>>> access >>>>>> the the low-level details such as watermarks, state and timers to get >>>>> stuff >>>>>> done. Maybe we could have a wrapper for these so that users can still >>>> use >>>>>> them but internally we wrap them in an N-Ary Operator. >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Aljoscha >>>>>>>>> >>>>>>>>> On Thu, 21 Apr 2016 at 16:31 Gyula Fóra <gyf...@apache.org >>>> <mailto: >>>>>> gyf...@apache.org>> wrote: >>>>>>>>> Hey, >>>>>>>>> >>>>>>>>> Some initial feedback from side: >>>>>>>>> >>>>>>>>> I think this a very important problem to deal with as a lot of >>>>>> applications >>>>>>>>> depend on it. I like the proposed runtime model and that is >>>> probably >>>>>> the >>>>>>>>> good way to handle this task, it is very clean what is happening. >>>>>>>>> >>>>>>>>> My main concern is how to handle this from the API and UDFs. What >>>> you >>>>>>>>> proposed seems like a very internal thing from the API perspective >>>>> and >>>>>> I >>>>>>>>> would be against exposing it in the way you wrote in your example. >>>> We >>>>>>>>> should make all effort to streamline this with the functional style >>>>>>>>> operators in some way. (so in that sense the way broadcastsets are >>>>>> handled >>>>>>>>> is pretty nice) Maybe we could extend ds.connect() to many streams >>>>>>>>> >>>>>>>>> But in any case this is awesome initiative :) >>>>>>>>> >>>>>>>>> Cheers, >>>>>>>>> Gyula >>>>>>>>> >>>>>>>>> >>>>>>>>> Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org >>>>>> >>>>>> ezt írta (időpont: 2016. ápr. 21., >>>>>>>>> Cs, 15:56): >>>>>>>>> >>>>>>>>>> Hi Team, >>>>>>>>>> I'm currently thinking about how we can bring the broadcast >>>>>> set/broadcast >>>>>>>>>> input feature form the DataSet API to the DataStream API. I think >>>>> this >>>>>>>>>> would be a valuable addition since it would enable use cases that >>>>> join >>>>>>>>>> streams with constant (or slowly changing) side information. >>>>>>>>>> >>>>>>>>>> For this purpose, I think that we need to change the way we handle >>>>>> stream >>>>>>>>>> operators. The new model would have one unified operator that >>>>> handles >>>>>> all >>>>>>>>>> cases and allows to add inputs after the operator was constructed, >>>>>> thus >>>>>>>>>> allowing the specification of broadcast inputs. >>>>>>>>>> >>>>>>>>>> I wrote up this preliminary document detailing the reason why we >>>>> need >>>>>> such >>>>>>>>>> a new operator for broadcast inputs and also what the API of such >>>> an >>>>>>>>>> operator would be. It also quickly touches on the required changes >>>>> of >>>>>>>>>> existing per-operation stream operations such as StreamMap: >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>> >>>>> >>>> >> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing >>>>>> < >>>>>> >>>>> >>>> >> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing >>>>>>> >>>>>>>>>> >>>>>>>>>> Please have a look if you're interested. Feedback/insights are >>>> very >>>>>>>>>> welcome. :-) >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Aljoscha >>>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best, Jingsong Lee >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Best, Jingsong Lee >>>>> >>>> >>> >>> >>> -- >>> Best, Jingsong Lee >> >> > > -- > Best, Jingsong Lee