Hi

I noticed that previous design doc [1] also talked about the topic of 
introducing new KeyedStreamOperatorNG, I wonder is that a must-do to introduce 
N-ary stream operator?


[1] 
https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI

Best
Yun Tang
________________________________
From: Piotr Nowojski <pi...@ververica.com>
Sent: Thursday, January 9, 2020 23:27
To: dev <dev@flink.apache.org>
Subject: Re: [DISCUSS] Add N-Ary Stream Operator

Hi,

I have started a vote on this topic [1], please cast your +1 or -1 there :)

Also I assigned FLIP-92 number to this design doc.

Piotrek

[1] 
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-92-Add-N-Ary-Stream-Operator-in-Flink-td36539.html
 
<http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-FLIP-92-Add-N-Ary-Stream-Operator-in-Flink-td36539.html>

> On 10 Dec 2019, at 07:10, Jingsong Li <jingsongl...@gmail.com> wrote:
>
> Hi Piotr,
>
> Sorry for the misunderstanding, chaining does work with multiple output
> right now, I mean, it's also a very important feature, and it should work
> with N-ary selectable input operators.
> We all think that providing N-ary selectable input operator is a very
> important thing, it makes TwoInputOperator chaining possible in upper
> layer, and it makes things simpler.
>
> Looking forward to it very much.
>
> Best,
> Jingsong Lee
>
> On Thu, Dec 5, 2019 at 6:01 PM Piotr Nowojski <pi...@ververica.com> wrote:
>
>> Hi,
>>
>> Thanks for the clarifications Jingsong. Indeed, if chaining doesn’t work
>> with multiple output right now (doesn’t it?), that’s also a good future
>> story.
>>
>> Re Kurt:
>> I think this pattern could be easily handled if those two joins are
>> implemented as a single 3 input operator, that internally is composed of
>> those three operators.
>> 1. You can set the initial InputSelection to Build1 and Build2.
>> 2. When Build1 receives `endOfInput`, InputSelection switches to Probe1
>> and Build2.
>> 3. When Probe1 receives `endOfInput`, you do not forward the `endOfInput`
>> to the internal `HashAgg` operator
>> 4. When Build2 finally receives `endOfInput`, you can finally forward the
>> `endOfInput` to the internal `HashAgg`
>>
>> Exactly for reasons like that, I wanted to at least post pone handling
>> tree-like operator chains in the Flink. Logic like that is difficult to
>> express generically, since it requires the knowledge about the operators
>> behaviour. While when hardcoded for the specific project (Blink in this
>> case) and encapsulated behind N-ary selectable input operator, it’s very
>> easy to handle by the runtime. Sure, at the expense of a bit more
>> complexity in forcing the user to compose operators, that’s why I’m not
>> saying that we do not want to handle this at some point in the future, but
>> at least not in the first version.
>>
>> Piotrek
>>
>>> On 5 Dec 2019, at 10:11, Jingsong Li <jingsongl...@gmail.com> wrote:
>>>
>>> Kurt mentioned a very interesting thing,
>>>
>>> If we want to better performance to read simultaneously, To this pattern:
>>> We need to control not only the read order of inputs, but also the
>> outputs
>>> of endInput.
>>> In this case, HashAggregate can only call its real endInput after the
>> input
>>> of build2 is finished, so the endInput of an operator is not necessarily
>>> determined by its input, but also by other associated inputs.
>>> I think we have the ability to do this in the n-input operator.
>>>
>>> Note that these behaviors should be determined at compile time.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Thu, Dec 5, 2019 at 4:42 PM Kurt Young <ykt...@gmail.com> wrote:
>>>
>>>> During implementing n-ary input operator in table, please keep
>>>> this pattern in mind:
>>>>
>>>> Build1 ---+
>>>>
>>>>         |
>>>>
>>>>         +---> HshJoin1 --—> HashAgg ---+
>>>>
>>>>         |                              |
>>>>
>>>> Probe1 ---+                              +---> HashJoin2
>>>>
>>>>                                        |
>>>>
>>>>                              Build2 ---+
>>>>
>>>> It's quite interesting that both `Build1`, `Build2` and `Probe1` can
>>>> be read simultaneously. But we need to control `HashAgg`'s output
>>>> before `Build2` finished. I don't have a clear solution for now, but
>>>> it's a common pattern we will face.
>>>>
>>>> Best,
>>>> Kurt
>>>>
>>>>
>>>> On Thu, Dec 5, 2019 at 4:37 PM Jingsong Li <jingsongl...@gmail.com>
>> wrote:
>>>>
>>>>> Hi Piotr,
>>>>>
>>>>>> a) two input operator X -> one input operator Y -> one input operator
>> Z
>>>>> (ALLOWED)
>>>>>> b) n input operator X -> one input operator Y -> one input operator Z
>>>>> (ALLOWED)
>>>>>> c) two input operator X -> one input operator Y -> two input operator
>> Z
>>>>> (NOT ALLOWED as a single chain)
>>>>>
>>>>> NOT ALLOWED to c) sounds good to me. I understand that it is very
>>>> difficult
>>>>> to propose a general support for any input selectable two input
>> operators
>>>>> chain with high performance.
>>>>> And it is not necessary for table layer too. b) has already excited us.
>>>>>
>>>>> Actually, we have supported n output chain too:
>>>>> d) one/two/n op X -> one op Y -> one op A1 -> one op B1 -> one op C1
>>>>>                                                -> one op A2 -> one op
>>>> B2
>>>>> -> one op C2
>>>>> d) is a very useful feature too.
>>>>>
>>>>>> Do you mean that those Table API/SQL use cases
>> (HashJoin/SortMergeJoin)
>>>>> could be easily handled by a single N-Ary Stream Operator, so this
>> would
>>>> be
>>>>> covered by steps 1. and 2. from my plan from my previous e-mail? That
>>>> would
>>>>> be real nice (avoiding the input selection chaining).
>>>>>
>>>>> Yes, because in the table layer, the typical scenarios currently only
>>>> have
>>>>> static order. (We don't consider MergeJoin here, because it's too
>> complex
>>>>> to be optimized, and not deserved to be optimized at present.).
>>>>> For example, the current TwoInputOperators: HashJoin and
>> NestedLoopJoin.
>>>>> They are all static reading order. We must read the build input before
>> we
>>>>> can read the probe input.
>>>>> So after we analyze chain, we put all the operators that can chain into
>>>> a N
>>>>> input operator, We can analyze the static order required by this
>>>> operator,
>>>>> and divide the reading order into several levels:
>>>>> - fist level: input4, input5, input1
>>>>> - second level: input2, input6
>>>>> - third level: input1, input7
>>>>> Note that these analyses are at the compile time of the client.
>>>>> At runtime, we just need to read in a fixed order.
>>>>>
>>>>> Best,
>>>>> Jingsong Lee
>>>>>
>>>>> On Wed, Dec 4, 2019 at 10:15 PM Piotr Nowojski <pi...@ververica.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Jingsong,
>>>>>>
>>>>>> Thanks for the feedback :)
>>>>>>
>>>>>> Could you clarify a little bit what do you mean by your wished use
>>>> cases?
>>>>>>
>>>>>>> There are a large number jobs (in production environment) that their
>>>>>>> TwoInputOperators that can be chained. We used to only watch the last
>>>>>>> ten tasks transmit data through disk and network, which could have
>>>> been
>>>>>>> done in one task.
>>>>>>> For performance, if we can chain them, the average is 30%+, and there
>>>>>>> is an order of magnitude in extreme cases.
>>>>>>
>>>>>> As I mentioned at the end, I would like to avoid/post pone chaining of
>>>>>> multiple/two input operators one after another because of the
>>>> complexity
>>>>> of
>>>>>> input selection. For the first version I would like to aim only to
>>>> allow
>>>>>> chaining the single input operators with something (2 or N input must
>>>> be
>>>>>> always head of the chain) . For example chains:
>>>>>>
>>>>>> a) two input operator X -> one input operator Y -> one input operator
>> Z
>>>>>> (ALLOWED)
>>>>>> b) n input operator X -> one input operator Y -> one input operator Z
>>>>>> (ALLOWED)
>>>>>> c) two input operator X -> one input operator Y -> two input operator
>> Z
>>>>>> (NOT ALLOWED as a single chain)
>>>>>>
>>>>>> The example above sounds to me like c)
>>>>>>
>>>>>> I think as a follow up, we could allow c), by extend chaining to a
>>>> simple
>>>>>> rule: there can only be a single input selectable operator in the
>> chain
>>>>>> (again, it’s the chaining of multiple input selectable operators
>> that’s
>>>>>> causing some problems).
>>>>>>
>>>>>>> The table layer has many special features. which give us the chance
>>>> to
>>>>>> optimize
>>>>>>> it, but also results that it is hard to let underlying layer to
>>>>> provide
>>>>>> an abstract
>>>>>>> mechanism to implement it. For example:
>>>>>>> - HashJoin must read all the data on one side(build side) and then
>>>> read
>>>>>> the
>>>>>>> other side (probe side).
>>>>>>> - HashJoin only emit data when read probe side.
>>>>>>> - SortMergeJoin read random, but if we have SortMergeJoin chain
>>>> another
>>>>>>> MergeJoin(Sort attribute re-use), that make things complicated.
>>>>>>> - HashAggregate/Sort only emit data in endInput.
>>>>>>>
>>>>>>> Provide an N-Ary stream operator to make everything possible. The
>>>> upper
>>>>>>> layer can do anything. These things can be specific optimization,
>>>>> which
>>>>>> is much
>>>>>>> more natural than the lower layer.
>>>>>>
>>>>>> Do you mean that those Table API/SQL use cases
>> (HashJoin/SortMergeJoin)
>>>>>> could be easily handled by a single N-Ary Stream Operator, so this
>>>> would
>>>>> be
>>>>>> covered by steps 1. and 2. from my plan from my previous e-mail? That
>>>>> would
>>>>>> be real nice (avoiding the input selection chaining).
>>>>>>
>>>>>> Piotrek
>>>>>>
>>>>>>> On 4 Dec 2019, at 14:29, Jingsong Li <jingsongl...@gmail.com> wrote:
>>>>>>>
>>>>>>> Hi Piotr,
>>>>>>>
>>>>>>> Huge +1 for N-Ary Stream Operator.
>>>>>>> And I love this Golden Shovel award very much!
>>>>>>>
>>>>>>> There are a large number jobs (in production environment) that their
>>>>>>> TwoInputOperators that can be chained. We used to only watch the last
>>>>>>> ten tasks transmit data through disk and network, which could have
>>>> been
>>>>>>> done in one task.
>>>>>>> For performance, if we can chain them, the average is 30%+, and there
>>>>>>> is an order of magnitude in extreme cases.
>>>>>>>
>>>>>>> The table layer has many special features. which give us the chance
>>>> to
>>>>>> optimize
>>>>>>> it, but also results that it is hard to let underlying layer to
>>>>> provide
>>>>>> an abstract
>>>>>>> mechanism to implement it. For example:
>>>>>>> - HashJoin must read all the data on one side(build side) and then
>>>> read
>>>>>> the
>>>>>>> other side (probe side).
>>>>>>> - HashJoin only emit data when read probe side.
>>>>>>> - SortMergeJoin read random, but if we have SortMergeJoin chain
>>>> another
>>>>>>> MergeJoin(Sort attribute re-use), that make things complicated.
>>>>>>> - HashAggregate/Sort only emit data in endInput.
>>>>>>>
>>>>>>> Provide an N-Ary stream operator to make everything possible. The
>>>> upper
>>>>>>> layer can do anything. These things can be specific optimization,
>>>>> which
>>>>>> is much
>>>>>>> more natural than the lower layer.
>>>>>>>
>>>>>>> In addition to the two optimizations you mentioned, it also gives
>>>> more
>>>>>> space to
>>>>>>> eliminate virtual function calls:
>>>>>>> Because following this way, the table layer has to consider the
>>>>> operator
>>>>>> chain.
>>>>>>> And in the future, we can optimize a whole N-Ary stream operator to a
>>>>>>> JIT-friendly operator. Without virtual function calls, JIT can play
>>>>> its
>>>>>> real strength.
>>>>>>>
>>>>>>> Best,
>>>>>>> Jingsong Lee
>>>>>>>
>>>>>>> On Wed, Dec 4, 2019 at 5:24 PM Piotr Nowojski <pi...@ververica.com
>>>>>> <mailto:pi...@ververica.com>> wrote:
>>>>>>> Hi,
>>>>>>>
>>>>>>> First and foremost I would like to nominate myself to the Golden
>>>> Shovel
>>>>>> award for digging out this topic:
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Secondly, I would like to discuss coming back to this particular idea
>>>>> of
>>>>>> implementing N-Ary Stream Operator. This time motivation doesn’t come
>>>>> from
>>>>>> the Side Inputs, but to efficiently support multi joins in SQL,
>> without
>>>>>> extra network exchanges. I’ve reviewed the design doc proposed by
>>>>> Aljoscha,
>>>>>> I quite like it and I think we could start from that.
>>>>>>>
>>>>>>> Specifically the end-goal is to allow for example Blink, to:
>>>>>>>
>>>>>>> I. Implement A* multi broadcast join - to have a single operator
>>>> chain,
>>>>>> where probe table (source) is read locally (inside the task that’s is
>>>>>> actually doing the join), then joined with multiple other broadcasted
>>>>>> tables.
>>>>>>> II. Another example might be when we have 2 or more sources,
>>>>>> pre-partitioned on the same key. In that case we should also be able
>> to
>>>>>> perform all of the table reading and the join inside a single Task.
>>>>>>>
>>>>>>> In order to achieve that, I would propose the following plan:
>>>>>>>
>>>>>>> 1. Implement N-Ary Stream Operator as proposed in the design doc
>>>> below,
>>>>>> however with added support for the input selection [1].
>>>>>>> - initially it can be just exposed via the `StreamTransformation`,
>>>>>> without direct access from the `DataStream API`
>>>>>>>
>>>>>>> 2. Allow it to be chained with sources (implemented using the FLIP-27
>>>>>> SourceReader [2])
>>>>>>>
>>>>>>> 3. Think about whether we need to support more complex chaining.
>>>>> Without
>>>>>> this point, motivating examples (I and II) could be implemented if all
>>>> of
>>>>>> the joins/filtering/mappings are compiled/composed into a single N-Ary
>>>>>> Stream Operator (which could be chained with some other single input
>>>>>> operators at the tail). We could also think about supporting of
>>>> chaining
>>>>> a
>>>>>> tree of for example TwoInputStreamOperators inside a single Task.
>>>> However
>>>>>> I’m leaving this as a follow up, since in that case, it’s not so easy
>>>> to
>>>>>> handle the `InputSelection` of multiple operators inside the tree.
>>>>>>>
>>>>>>> Piotrek
>>>>>>>
>>>>>>> [1]
>>>>>>
>>>>>
>>>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
>>>>>> <
>>>>>>
>>>>>
>>>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
>>>>>>>
>>>>>>> [2]
>>>>>>
>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>>>>> <
>>>>>>
>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
>>>>>>>
>>>>>>>>> On 21. Apr 2016, at 17:09, Aljoscha Krettek <aljos...@apache.org
>>>>>> <mailto:aljos...@apache.org>> wrote:
>>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>> yes, I see operators of this style as very much an internal thing.
>>>>> You
>>>>>> are probably talking about use cases for OneInputOperator and
>>>>>> TwoInputOperator where users have a very specific need and require
>>>> access
>>>>>> the the low-level details such as watermarks, state and timers to get
>>>>> stuff
>>>>>> done. Maybe we could have a wrapper for these so that users can still
>>>> use
>>>>>> them but internally we wrap them in an N-Ary Operator.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Aljoscha
>>>>>>>>>
>>>>>>>>> On Thu, 21 Apr 2016 at 16:31 Gyula Fóra <gyf...@apache.org
>>>> <mailto:
>>>>>> gyf...@apache.org>> wrote:
>>>>>>>>> Hey,
>>>>>>>>>
>>>>>>>>> Some initial feedback from side:
>>>>>>>>>
>>>>>>>>> I think this a very important problem to deal with as a lot of
>>>>>> applications
>>>>>>>>> depend on it. I like the proposed runtime model and that is
>>>> probably
>>>>>> the
>>>>>>>>> good way to handle this task, it is very clean what is happening.
>>>>>>>>>
>>>>>>>>> My main concern is how to handle this from the API and UDFs. What
>>>> you
>>>>>>>>> proposed seems like a very internal thing from the API perspective
>>>>> and
>>>>>> I
>>>>>>>>> would be against exposing it in the way you wrote in your example.
>>>> We
>>>>>>>>> should make all effort to streamline this with the functional style
>>>>>>>>> operators in some way. (so in that sense the way broadcastsets are
>>>>>> handled
>>>>>>>>> is pretty nice) Maybe we could extend ds.connect() to many streams
>>>>>>>>>
>>>>>>>>> But in any case this is awesome initiative :)
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Gyula
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org
>>>>>>
>>>>>> ezt írta (időpont: 2016. ápr. 21.,
>>>>>>>>> Cs, 15:56):
>>>>>>>>>
>>>>>>>>>> Hi Team,
>>>>>>>>>> I'm currently thinking about how we can bring the broadcast
>>>>>> set/broadcast
>>>>>>>>>> input feature form the DataSet API to the DataStream API. I think
>>>>> this
>>>>>>>>>> would be a valuable addition since it would enable use cases that
>>>>> join
>>>>>>>>>> streams with constant (or slowly changing) side information.
>>>>>>>>>>
>>>>>>>>>> For this purpose, I think that we need to change the way we handle
>>>>>> stream
>>>>>>>>>> operators. The new model would have one unified operator that
>>>>> handles
>>>>>> all
>>>>>>>>>> cases and allows to add inputs after the operator was constructed,
>>>>>> thus
>>>>>>>>>> allowing the specification of broadcast inputs.
>>>>>>>>>>
>>>>>>>>>> I wrote up this preliminary document detailing the reason why we
>>>>> need
>>>>>> such
>>>>>>>>>> a new operator for broadcast inputs and also what the API of such
>>>> an
>>>>>>>>>> operator would be. It also quickly touches on the required changes
>>>>> of
>>>>>>>>>> existing per-operation stream operations such as StreamMap:
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>
>>>>>
>>>>
>> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
>>>>>> <
>>>>>>
>>>>>
>>>>
>> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Please have a look if you're interested. Feedback/insights are
>>>> very
>>>>>>>>>> welcome. :-)
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Aljoscha
>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Best, Jingsong Lee
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Best, Jingsong Lee
>>>>>
>>>>
>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>
>>
>
> --
> Best, Jingsong Lee

Reply via email to