Hi Piotr,

> a) two input operator X -> one input operator Y -> one input operator Z
(ALLOWED)
> b) n input operator X -> one input operator Y -> one input operator Z
(ALLOWED)
> c) two input operator X -> one input operator Y -> two input operator Z
(NOT ALLOWED as a single chain)

NOT ALLOWED to c) sounds good to me. I understand that it is very difficult
to propose a general support for any input selectable two input operators
chain with high performance.
And it is not necessary for table layer too. b) has already excited us.

Actually, we have supported n output chain too:
d) one/two/n op X -> one op Y -> one op A1 -> one op B1 -> one op C1
                                                 -> one op A2 -> one op B2
-> one op C2
d) is a very useful feature too.

> Do you mean that those Table API/SQL use cases (HashJoin/SortMergeJoin)
could be easily handled by a single N-Ary Stream Operator, so this would be
covered by steps 1. and 2. from my plan from my previous e-mail? That would
be real nice (avoiding the input selection chaining).

Yes, because in the table layer, the typical scenarios currently only have
static order. (We don't consider MergeJoin here, because it's too complex
to be optimized, and not deserved to be optimized at present.).
For example, the current TwoInputOperators: HashJoin and NestedLoopJoin.
They are all static reading order. We must read the build input before we
can read the probe input.
So after we analyze chain, we put all the operators that can chain into a N
input operator, We can analyze the static order required by this operator,
and divide the reading order into several levels:
- fist level: input4, input5, input1
- second level: input2, input6
- third level: input1, input7
Note that these analyses are at the compile time of the client.
At runtime, we just need to read in a fixed order.

Best,
Jingsong Lee

On Wed, Dec 4, 2019 at 10:15 PM Piotr Nowojski <pi...@ververica.com> wrote:

> Hi Jingsong,
>
> Thanks for the feedback :)
>
> Could you clarify a little bit what do you mean by your wished use cases?
>
> > There are a large number jobs (in production environment) that their
> > TwoInputOperators that can be chained. We used to only watch the last
> > ten tasks transmit data through disk and network, which could have been
> >  done in one task.
> > For performance, if we can chain them, the average is 30%+, and there
> >  is an order of magnitude in extreme cases.
>
> As I mentioned at the end, I would like to avoid/post pone chaining of
> multiple/two input operators one after another because of the complexity of
> input selection. For the first version I would like to aim only to allow
> chaining the single input operators with something (2 or N input must be
> always head of the chain) . For example chains:
>
> a) two input operator X -> one input operator Y -> one input operator Z
> (ALLOWED)
> b) n input operator X -> one input operator Y -> one input operator Z
> (ALLOWED)
> c) two input operator X -> one input operator Y -> two input operator Z
> (NOT ALLOWED as a single chain)
>
> The example above sounds to me like c)
>
> I think as a follow up, we could allow c), by extend chaining to a simple
> rule: there can only be a single input selectable operator in the chain
> (again, it’s the chaining of multiple input selectable operators that’s
> causing some problems).
>
> > The table layer has many special features. which give us the chance to
> optimize
> >  it, but also results that it is hard to let underlying layer to provide
> an abstract
> > mechanism to implement it. For example:
> > - HashJoin must read all the data on one side(build side) and then read
> the
> > other side (probe side).
> > - HashJoin only emit data when read probe side.
> > - SortMergeJoin read random, but if we have SortMergeJoin chain another
> >  MergeJoin(Sort attribute re-use), that make things complicated.
> > - HashAggregate/Sort only emit data in endInput.
> >
> > Provide an N-Ary stream operator to make everything possible. The upper
> >  layer can do anything. These things can be specific optimization, which
> is much
> >  more natural than the lower layer.
>
> Do you mean that those Table API/SQL use cases (HashJoin/SortMergeJoin)
> could be easily handled by a single N-Ary Stream Operator, so this would be
> covered by steps 1. and 2. from my plan from my previous e-mail? That would
> be real nice (avoiding the input selection chaining).
>
> Piotrek
>
> > On 4 Dec 2019, at 14:29, Jingsong Li <jingsongl...@gmail.com> wrote:
> >
> > Hi Piotr,
> >
> > Huge +1 for N-Ary Stream Operator.
> > And I love this Golden Shovel award very much!
> >
> > There are a large number jobs (in production environment) that their
> > TwoInputOperators that can be chained. We used to only watch the last
> > ten tasks transmit data through disk and network, which could have been
> >  done in one task.
> > For performance, if we can chain them, the average is 30%+, and there
> >  is an order of magnitude in extreme cases.
> >
> > The table layer has many special features. which give us the chance to
> optimize
> >  it, but also results that it is hard to let underlying layer to provide
> an abstract
> > mechanism to implement it. For example:
> > - HashJoin must read all the data on one side(build side) and then read
> the
> > other side (probe side).
> > - HashJoin only emit data when read probe side.
> > - SortMergeJoin read random, but if we have SortMergeJoin chain another
> >  MergeJoin(Sort attribute re-use), that make things complicated.
> > - HashAggregate/Sort only emit data in endInput.
> >
> > Provide an N-Ary stream operator to make everything possible. The upper
> >  layer can do anything. These things can be specific optimization, which
> is much
> >  more natural than the lower layer.
> >
> > In addition to the two optimizations you mentioned, it also gives more
> space to
> >  eliminate virtual function calls:
> > Because following this way, the table layer has to consider the operator
> chain.
> > And in the future, we can optimize a whole N-Ary stream operator to a
> >  JIT-friendly operator. Without virtual function calls, JIT can play its
> real strength.
> >
> > Best,
> > Jingsong Lee
> >
> > On Wed, Dec 4, 2019 at 5:24 PM Piotr Nowojski <pi...@ververica.com
> <mailto:pi...@ververica.com>> wrote:
> > Hi,
> >
> > First and foremost I would like to nominate myself to the Golden Shovel
> award for digging out this topic:
> >
> >
> >
> > Secondly, I would like to discuss coming back to this particular idea of
> implementing N-Ary Stream Operator. This time motivation doesn’t come from
> the Side Inputs, but to efficiently support multi joins in SQL, without
> extra network exchanges. I’ve reviewed the design doc proposed by Aljoscha,
> I quite like it and I think we could start from that.
> >
> > Specifically the end-goal is to allow for example Blink, to:
> >
> > I. Implement A* multi broadcast join - to have a single operator chain,
> where probe table (source) is read locally (inside the task that’s is
> actually doing the join), then joined with multiple other broadcasted
> tables.
> > II. Another example might be when we have 2 or more sources,
> pre-partitioned on the same key. In that case we should also be able to
> perform all of the table reading and the join inside a single Task.
> >
> > In order to achieve that, I would propose the following plan:
> >
> > 1. Implement N-Ary Stream Operator as proposed in the design doc below,
> however with added support for the input selection [1].
> >   - initially it can be just exposed via the `StreamTransformation`,
> without direct access from the `DataStream API`
> >
> > 2. Allow it to be chained with sources (implemented using the FLIP-27
> SourceReader [2])
> >
> > 3. Think about whether we need to support more complex chaining. Without
> this point, motivating examples (I and II) could be implemented if all of
> the joins/filtering/mappings are compiled/composed into a single N-Ary
> Stream Operator (which could be chained with some other single input
> operators at the tail). We could also think about supporting of chaining a
> tree of for example TwoInputStreamOperators inside a single Task. However
> I’m leaving this as a follow up, since in that case, it’s not so easy to
> handle the `InputSelection` of multiple operators inside the tree.
> >
> > Piotrek
> >
> > [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/api/java/org/apache/flink/streaming/api/operators/InputSelectable.html
> >
> > [2]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
> >
> >>> On 21. Apr 2016, at 17:09, Aljoscha Krettek <aljos...@apache.org
> <mailto:aljos...@apache.org>> wrote:
> >>>
> >>> Hi,
> >>> yes, I see operators of this style as very much an internal thing. You
> are probably talking about use cases for OneInputOperator and
> TwoInputOperator where users have a very specific need and require access
> the the low-level details such as watermarks, state and timers to get stuff
> done. Maybe we could have a wrapper for these so that users can still use
> them but internally we wrap them in an N-Ary Operator.
> >>>
> >>> Cheers,
> >>> Aljoscha
> >>>
> >>> On Thu, 21 Apr 2016 at 16:31 Gyula Fóra <gyf...@apache.org <mailto:
> gyf...@apache.org>> wrote:
> >>> Hey,
> >>>
> >>> Some initial feedback from side:
> >>>
> >>> I think this a very important problem to deal with as a lot of
> applications
> >>> depend on it. I like the proposed runtime model and that is probably
> the
> >>> good way to handle this task, it is very clean what is happening.
> >>>
> >>> My main concern is how to handle this from the API and UDFs. What you
> >>> proposed seems like a very internal thing from the API perspective and
> I
> >>> would be against exposing it in the way you wrote in your example. We
> >>> should make all effort to streamline this with the functional style
> >>> operators in some way. (so in that sense the way broadcastsets are
> handled
> >>> is pretty nice) Maybe we could extend ds.connect() to many streams
> >>>
> >>> But in any case this is awesome initiative :)
> >>>
> >>> Cheers,
> >>> Gyula
> >>>
> >>>
> >>> Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org>>
> ezt írta (időpont: 2016. ápr. 21.,
> >>> Cs, 15:56):
> >>>
> >>>> Hi Team,
> >>>> I'm currently thinking about how we can bring the broadcast
> set/broadcast
> >>>> input feature form the DataSet API to the DataStream API. I think this
> >>>> would be a valuable addition since it would enable use cases that join
> >>>> streams with constant (or slowly changing) side information.
> >>>>
> >>>> For this purpose, I think that we need to change the way we handle
> stream
> >>>> operators. The new model would have one unified operator that handles
> all
> >>>> cases and allows to add inputs after the operator was constructed,
> thus
> >>>> allowing the specification of broadcast inputs.
> >>>>
> >>>> I wrote up this preliminary document detailing the reason why we need
> such
> >>>> a new operator for broadcast inputs and also what the API of such an
> >>>> operator would be. It also quickly touches on the required changes of
> >>>> existing per-operation stream operations such as StreamMap:
> >>>>
> >>>>
> >>>>
> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
> <
> https://docs.google.com/document/d/1ZFzL_0xGuUEnBsFyEiHwWcmCcjhd9ArWsmhrgnt05RI/edit?usp=sharing
> >
> >>>>
> >>>> Please have a look if you're interested. Feedback/insights are very
> >>>> welcome. :-)
> >>>>
> >>>> Cheers,
> >>>> Aljoscha
> >>>>
> >>
> >
> >
> >
> > --
> > Best, Jingsong Lee
>
>

-- 
Best, Jingsong Lee

Reply via email to