> The observation is that parent operators sometimes do not know the exact
> traits they will have for the given child traits. Several examples:
> 1. PhysicalAggregate1 may request both HASH[b,c] or HASH[c,b]. Contrary to
> the default Apache Calcite implementation, in many systems, these are two
> different distributions - which one should I request? To make things worse,
> some operators may have strict requirements to the order (Join, Union),
> whilst others do not care about the order (Aggregate, Window).
select .... group by b,c,a,g,z,d; if you have StreamAgg in non-distributed 
system, what collation(s) do you request? 
Just request either one. I already stated in the email [1], but seems like you 
missed the 5th paragraph.

> 2. In some systems, the distribution may also define the distribution
> function, e.g., a number of shards. A UNION DISTINCT of two tables with the
> same sharding key, but the different numbers of shards must yield an
> exchange. The parent operator cannot know the number of shards of the input
> in advance and cannot define the proper trait set in the "passThrough"
> method.
The parent operator doesn't need to know what number of shards to request, just 
request hash distribution with shard number 0 or -1 or what ever to indicate 
shard number not decided yet. Later the child operator will tell parent 
operator the exact distribution through "derive".

In Alibaba MaxCompute, we have customized hash distribution, which contains 
number of buckets, hash function, null collation, we also support range 
distribution, which contains range bucket boundaries. All of these can work 
under current framework. With all that being said, distribution is nothing 
special than collation, it all depends on whether you design the operator 
"passthrough" and "derive" strategy correctly.

[1] 
https://lists.apache.org/thread.html/r36b25cbe4ca05fb1262c432ad9103f4126b654698481fca0d2a01fe7%40%3Cdev.calcite.apache.org%3E

Thanks,
Haisheng Yuan

On 2021/06/14 08:26:31, Vladimir Ozerov <ppoze...@gmail.com> wrote: 
> Hi Haisheng,
> 
> The observation is that parent operators sometimes do not know the exact
> traits they will have for the given child traits. Several examples:
> 1. PhysicalAggregate1 may request both HASH[b,c] or HASH[c,b]. Contrary to
> the default Apache Calcite implementation, in many systems, these are two
> different distributions - which one should I request? To make things worse,
> some operators may have strict requirements to the order (Join, Union),
> whilst others do not care about the order (Aggregate, Window).
> 2. In some systems, the distribution may also define the distribution
> function, e.g., a number of shards. A UNION DISTINCT of two tables with the
> same sharding key, but the different numbers of shards must yield an
> exchange. The parent operator cannot know the number of shards of the input
> in advance and cannot define the proper trait set in the "passThrough"
> method.
> 
> We will miss the optimization opportunity in all these cases unless we can
> clarify the real traits in the "derive" phase. But to do this, we need to
> know the original optimization request.
> 
> Regards,
> Vladimir.
> 
> 
> вс, 13 июн. 2021 г. в 22:17, Haisheng Yuan <hy...@apache.org>:
> 
> > How does it relate with "derive" to get the desired plan?
> >
> > Initially PhysicalAggregate1 requests HASH[b,c], PhysicalAggregate2
> > requests HASH[a,b,c]. PhysicalAggregate2 is called on "passthrough" by
> > passing HASH[b,c], then generate another PhysicalAggregate2 with trait
> > HASH[b,c]. You don't need the involvement of "derive".
> >
> > Haisheng Yuan
> >
> > On 2021/06/13 16:58:53, Vladimir Ozerov <ppoze...@gmail.com> wrote:
> > > Hi,
> > >
> > > I tried to apply different approaches, but eventually, I failed to
> > achieve
> > > my goals. It seems that the current implementation cannot handle the
> > > required scenario, as explained below.
> > >
> > > Consider the following tree:
> > > LogicalAggregate1[group=[b,c]]
> > >   LogicalAggregate2[group=[a,b,c]]
> > >     LogicalInput
> > >
> > > I want to find the plan to do these two aggregations without an exchange
> > in
> > > between because they may have compatible distributions. Example:
> > > PhysicalAggregate1[group=[b,c]]     // SHARDED[b,c]
> > >   PhysicalAggregate2[group=[a,b,c]] // SHARDED[b,c]
> > >     Exchange                        // SHARDED[b,c]
> > >       PhysicalInput                 // SHARDED[?]
> > >
> > > The fundamental problem is that it is impossible to save the optimization
> > > request and resolve traits in the "derive" phase afterward. What we need
> > is
> > > to send the optimization request "SHARDED by [b,c] in any order" to
> > > PhysicalAggregate2, and use it in the derive phase so that the new
> > > PhysicalAggregate2 is created with [b,c] or [c,b], but strictly without
> > > [a]. Unfortunately, this doesn't work because the nodes emitted from the
> > > pass-through do not participate in the "derive" phase.
> > >
> > > This could be fixed with a trivial change - to allow certain nodes
> > emitted
> > > from the "passThrough" to participate in "derive". We can do that using a
> > > marker interface or an extension to a PhysicalRel interface. For example:
> > > interface PhysicalRel {
> > >     boolean enforceDerive();
> > > }
> > >
> > > When set to "true", the node would not be added to the pass-through
> > cache.
> > > This way, we may use this node as *storage* for the optimization request.
> > > When the "derive" is called later, we know both the parent requirements
> > and
> > > the child traits. This would be sufficient to solve my problem. I already
> > > tried to do this by disabling the pass-through cache completely and
> > > confirmed that the required plan is found.
> > >
> > > Do you have any objections to such a change?
> > >
> > > Regards,
> > > Vladimir.
> > >
> > > сб, 29 мая 2021 г. в 11:59, Vladimir Ozerov <ppoze...@gmail.com>:
> > >
> > > > Hi Haisheng, Jinpeng
> > > >
> > > > I think we are more or less on the same page:
> > > >
> > > >    1. The current implementation of Apache Calcite may generate
> > wasteful
> > > >    alternatives because rules lack the optimization context.
> > > >    2. But the actual impact on efficiency is not clear.
> > > >
> > > > The (2) is essential to understand whether my efforts make any
> > practical
> > > > sense. And so far, I have only a vague common sense and some simple
> > > > examples in mind, which is not sufficient to make any claims.
> > > >
> > > > Nevertheless, I've checked the source code of the original Columbia
> > > > optimizer. I was wrong in my original claim that Columbia doesn't pass
> > > > optimization context to rules. It does [1]. The context consists of
> > > > required traits and cost budget. In Apache Calcite terms, the context
> > is
> > > > passed to both "RelRule.matches" and "RelRule.onMatch", so that the
> > rule
> > > > may decide on the optimization strategy based on parent request. This
> > is
> > > > exactly what I was trying to achieve in my system with some hacks
> > around
> > > > derive/passThrough.
> > > >
> > > > Regarding the example with join, my proposal is not likely to make any
> > > > difference because the tables are not co-located on the join key, and
> > hence
> > > > join may emit several distributions. Consider the different situation -
> > > > data is already collocated. Without the context, I will emit both
> > 1-phase
> > > > and 2-phase aggregates because I do not know which distributions are
> > > > available below. With the context available, I can collect propagate
> > > > promising optimization requests from Aggregate rules (1-phase,
> > 2-phase).
> > > > Then wait for input optimization and check what is returned. If only
> > > > [dist=a] is returned, I can skip the 2-phase aggregate completely.
> > > > Aggregate[group=a]
> > > >   Join[foo.a=bar.b]
> > > >     Input(foo, dist=a)
> > > >     Input(bar, dist=b)
> > > >
> > > > Another possible use case is join on several keys. By issuing a
> > > > context-aware optimization request [dist a1] from Aggregate to Join, we
> > > > can establish tight cost bounds on Aggregate and Join equivalence
> > groups
> > > > very early so that all other options (broadcasts, sharding in [a1,a2],
> > ...)
> > > > would be pruned without even entering MEMO.
> > > > Aggregate[group=a1]
> > > >   Join[foo.a1=bar.b1 AND foo.a2=bar.b2]
> > > >     Input(foo, dist=a1)
> > > >     Input(bar, dist=b2)
> > > >
> > > > As far as Jinpeng's example with logical multi-phase aggregates - I
> > think
> > > > this is a great example of why logical split might be useful. Thank
> > you for
> > > > that. This reminded me about another concerning use case. Consider an
> > > > Aggregate on top of a UnionAll:
> > > > LogicalAggregate[group=a, COUNT(b)]
> > > >   UnionAll
> > > >     Input1
> > > >     Input2
> > > >
> > > > With Calcite rules, we may push the aggregate down:
> > > > LogicalAggregate[group=a, SUM(COUNT)]
> > > >   UnionAll
> > > >     LogicalAggregate[group=a, COUNT(b)] // <-- Possible exchange here
> > > >       Input1
> > > >     LogicalAggregate[group=a, COUNT(b)] // <-- Possible exchange here
> > > >       Input2
> > > >
> > > > In my optimizer, all logical aggregates are treated in the same way.
> > So if
> > > > the Input1 is not shared by [a], I will generate an exchange. However,
> > if
> > > > we apply your suggestion, we may first split the logical aggregate
> > into two
> > > > tagged logical aggregates:
> > > > LogicalAggregate[group=a, SUM(COUNT), type=global]
> > > >   LogicalAggregate[group=a, COUNT(b), type=local]
> > > >     UnionAll
> > > >       Input1
> > > >       Input2
> > > >
> > > > Then we may implement a transformation rule that pushes down only
> > > > pre-aggregates. As a result, bottom aggregates will be converted into
> > > > single-phase physical aggregate, leading to a much better plan.
> > > > LogicalAggregate[group=a, SUM(COUNT), type=global]
> > > >   UnionAll
> > > >     LogicalAggregate[group=a, COUNT(b), type=local] // <-- No exchange
> > > >       Input1
> > > >     LogicalAggregate[group=a, COUNT(b), type=local] // <-- No exchange
> > > >       Input2
> > > >
> > > > So I agree with you that logical optimization might be very useful. The
> > > > main practical concern is the complexity. We essentially introduce new
> > > > logical operators that cannot be used by the existing Apache Calcite
> > > > logical rule library in the general case.
> > > >
> > > > Regards,
> > > > Vladimir.
> > > >
> > > > [1]
> > > >
> > https://github.com/yongwen/columbia/blob/master/header/rules.h#L383-L397
> > > >
> > > > сб, 29 мая 2021 г. в 04:30, Jinpeng Wu <wjpabc...@gmail.com>:
> > > >
> > > >> Hi, Vladimir.
> > > >>
> > > >> As another topic, it is highly recommended that you split the
> > aggregation
> > > >> in logical stages, not only for traits related matters. It is true
> > that
> > > >> you
> > > >> need to annotate the node with different flags or subclasses and it's
> > a
> > > >> large refactor. But after that, you may find much much bigger
> > benefits.
> > > >>
> > > >> The most important benefit is aggregation pushing down. For example,
> > the
> > > >> query:
> > > >>
> > > >> select t1.value, agg(t2.value)  from t1 join t2 on t1.key = t2.key;
> > > >>
> > > >> You may be able to generate such plan:
> > > >>
> > > >> PhysicalAggregatePhase(group=t1.value, method=agg_final(t2.value))
> > > >>   Exchange(dist = t1.value)
> > > >>       Join (t1.key = t2.key)
> > > >>          Exchange(dist = t1.key)
> > > >>              scan(t1)
> > > >>          Exchange(dist = t2.key)
> > > >>              PhysicalAggregationPhase(group = t2.key, f_partial(a))
> > > >>                 scan(t2)
> > > >>
> > > >> The pushed "PhysicalAggregationPhase(group = t2.key, f_partial(a))"
> > may be
> > > >> able to reduce the input data size of the exchange operation
> > dramatically.
> > > >>
> > > >> There has been lots of research on aggregation push down. But partial
> > > >> aggregate pushing down could achieve much more benefits:
> > > >> 1. Unlike pushing down a full aggregation, the partial aggregate
> > requires
> > > >> no extra exchanges. So it could be a pure gain.
> > > >> 2. The pushing down can apply to any aggregation functions, including
> > > >> user-defined aggregation functions.
> > > >> 3. By introducing the middle phase (the 3-pass aggregation
> > > >> implementation).
> > > >> Aggregation can be splitted into any number of phases and partial
> > > >> aggregation can be pushed down through any number of joins, somewhat
> > like:
> > > >>
> > > >> AggregatePhase(final)
> > > >>    Exchange
> > > >>       AggregatePhase(middle)
> > > >>         JOIN
> > > >>            Exchange
> > > >>                AggregatePhase(middle)
> > > >>                  JOIN
> > > >>                      Exchange
> > > >>                          AggregatePhase(middle)
> > > >>                          ...
> > > >>                            JOIN
> > > >>                                Exchange
> > > >>                                    AggregatePhase(partial)
> > > >>                                        TableScan
> > > >>                    ...
> > > >> Note that AggregatePhase(middle) could work in an adaptive manner:
> > after
> > > >> processing some data, if it discovers no data reduction, it could
> > > >> just degenerate to a NOP operation and can be very light weight.
> > > >>
> > > >> Thanks,
> > > >> Jinpeng Wu
> > > >>
> > > >>
> > > >> On Sat, May 29, 2021 at 8:32 AM Haisheng Yuan <hy...@apache.org>
> > wrote:
> > > >>
> > > >> > > 2) Optimization requests are basically sent to RelSet-s, not
> > > >> RelSubset-s,
> > > >> > > as we make pairwise comparisons between the requested RelSubset
> > and
> > > >> other
> > > >> > > subsets in the set [5][6].
> > > >> >
> > > >> > I agree with you. There could be some waste when the new delivered /
> > > >> > required traitset is generated by "passThrough"/ "derive", in which
> > > >> case,
> > > >> > we only need enforcer between the pair of subsets, instead of
> > pairing
> > > >> with
> > > >> > all other required / delivered subsets in the RelSet. i.e.
> > > >> > In the MEMO group, we have 2 required traitsets:
> > > >> > 1) Hash[a] Sort[b]
> > > >> > 2) Hash[b] Sort[c]
> > > >> >
> > > >> > When we try to pass Hash[a] Sort[b] to one of physical operators say
> > > >> > Project, we found that we can pass down Hash[a] down to its child,
> > then
> > > >> we
> > > >> > get a new physical Project with traitset Hash[a], we only need
> > enforcer
> > > >> > between Hash[a] and Hash[a]Sort[b], but currently in method
> > > >> > "addConverters", we also generate enforcer between Hash[a] and
> > > >> > Hash[b]Sort[c], which is not actually what we want.
> > > >> >
> > > >> > I think it is definitely worth trying to optimize.
> > > >> >
> > > >> > Regards,
> > > >> > Haisheng Yuan
> > > >> > On 2021/05/28 19:15:03, Haisheng Yuan <hy...@apache.org> wrote:
> > > >> > > Hi Vladimir,
> > > >> > >
> > > >> > > The top-down optimizer does NOT require implementation rule to
> > > >> generate
> > > >> > 1 to 1 physical operator for a logical operator, as you can see, if
> > you
> > > >> > generate a 2 phase physical aggregates for the logical aggregate in
> > the
> > > >> > implementation rule, it still works. Window is special because we
> > can
> > > >> > reshuffle the execution order of window functions, and that order
> > makes
> > > >> a
> > > >> > difference according to different parent physical property request.
> > A
> > > >> > single converged physical Window operator catered for this
> > speciality.
> > > >> > However as I said I don't think it is a common scenario.
> > > >> > >
> > > >> > > > the whole decision of whether to go with 1-phase or 2-phase
> > > >> > > > aggregate is a physical decision that should be made based on
> > > >> > available (or
> > > >> > > > assumed) input traits.
> > > >> > > What is the problem of generating both 1-phase and 2-phase
> > aggregates
> > > >> > and choose the best one based on the cost?
> > > >> > >
> > > >> > > Let's see the following query:
> > > >> > > select a, min(b) from (select * from foo, bar where foo.a=bar.a) t
> > > >> group
> > > >> > by a;
> > > >> > > suppose foo is randomly distributed fact table, and bar is
> > randomly
> > > >> > distributed dimension table.
> > > >> > > Consider the 2 following plans:
> > > >> > > 1)
> > > >> > > PhysicalAggregate
> > > >> > >    +-- HashJoin
> > > >> > >               +--  HashDistribute by a
> > > >> > >                          +-- TableScan on foo
> > > >> > >               +--  HashDistribute by a
> > > >> > >                          +-- TableScan on bar
> > > >> > >
> > > >> > > 2)
> > > >> > > PhysicalAggregate(global)
> > > >> > >    +--  HashDistribute by a
> > > >> > >             +---- PhysicalAggregate(local)
> > > >> > >                         +---- HashJoin
> > > >> > >                                      +-- TableScan on foo
> > > >> > >                                      +--  Broadcast
> > > >> > >                                                +-- TableScan on
> > bar
> > > >> > >
> > > >> > > Can you tell that the single phase aggregate plan is always better
> > > >> than
> > > >> > the 2 phase aggregate plan?
> > > >> > >
> > > >> > > > Therefore, the typical way to optimize
> > > >> > > > LogicalAggregate is to split in the physical phase
> > (implementation
> > > >> > rule,
> > > >> > > > pass-through, derive). Practical systems like Dremio [1] and
> > Flink
> > > >> [2]
> > > >> > > > work this way.
> > > >> > > Dremio and Flink work this way doesn't mean it is a good way.
> > > >> Greenplum
> > > >> > Orca and Alibaba MaxCompute optimizer work in another way. In Flink
> > and
> > > >> > Dremio, they have HashAggPRule to generate 1 phase HashAgg and 2
> > phase
> > > >> > HashAgg, SortAggPRule to generate 1 phase SortAgg and 2 phase
> > SortAgg.
> > > >> > However do you think there is possibility that the global SortAgg
> > > >> combined
> > > >> > with local HashAgg, or the global HashAgg combined with local
> > SortAgg
> > > >> may
> > > >> > perform better in difference cases? Are you going to generate all
> > the 4
> > > >> > combinations in the implementation rule? There are some cases we
> > found
> > > >> we'd
> > > >> > better to split the aggregate into 3 phase aggregate [1], in which
> > case,
> > > >> > will the implementation rule generate 3 HashAggs or 3 SortAggs, or
> > all
> > > >> the
> > > >> > 6 combinations?
> > > >> > >
> > > >> > > In our system, we have 1 phase, 2 phase, 3 phase logical aggregate
> > > >> rules
> > > >> > to transform the LogicalAggregate to another kind of logical
> > > >> aggregate(s)
> > > >> > with phase info, say LogicalXXXAggregate, then our physical
> > aggregate
> > > >> rules
> > > >> > match this kind of node to generate HashAgg or StreamAgg. Of
> > course, in
> > > >> the
> > > >> > logical rules, we can add business logic to guess the possible
> > traits
> > > >> > delivered by child nodes to determine whether the rule definitely
> > won't
> > > >> > generate a better alternative and may decide to abort this
> > > >> transformation
> > > >> > early. But I would rather let the cost model decide.
> > > >> > >
> > > >> > > Admittedly, the current top-down optimization is not pure
> > on-demand
> > > >> > request oriented, because it will always generate a physical request
> > > >> > regardless the parent nodes' trait request. For example the
> > following
> > > >> query
> > > >> > in a non-distributed environment:
> > > >> > > select a, b, c, max(d) from foo group by a, b, c order by a desc;
> > > >> > >
> > > >> > > It will first generate a StreamAgg[a ASC, b ASC, c ASC] no matter
> > what
> > > >> > the parent node requires, then the "passThrough" tells StreamAgg
> > that
> > > >> > parent requires [a DESC], we get a StreamAgg[a DESC, b ASC, c ASC].
> > It
> > > >> > would be ideal if we only generate StreamAgg[a DESC, b ASC, c ASC]
> > by
> > > >> > request, but I don't think that will make much difference, the
> > > >> bottleneck
> > > >> > relies on the join order enumeration and the Project related
> > operation.
> > > >> > >
> > > >> > > Regards,
> > > >> > > Haisheng Yuan
> > > >> > >
> > > >> > > [1]
> > > >> >
> > > >>
> > https://github.com/greenplum-db/gporca/blob/master/libgpopt/src/xforms/CXformSplitDQA.cpp
> > > >> > >
> > > >> > > On 2021/05/28 09:17:45, Vladimir Ozerov <ppoze...@gmail.com>
> > wrote:
> > > >> > > > Hi Jinpeng, Haisheng,
> > > >> > > >
> > > >> > > > Thank you for your inputs. I really appreciate that. Let me try
> > to
> > > >> > address
> > > >> > > > some of your comments and share some experience with the
> > > >> > implementation of
> > > >> > > > optimizers for a distributed engine I am currently working with.
> > > >> > > >
> > > >> > > > First of all, I would argue that multiple logical operators do
> > not
> > > >> > have a
> > > >> > > > 1-1 mapping to physical operators, and Window is not special
> > here.
> > > >> For
> > > >> > > > instance, LogicalAggregate doesn't have 1-1 mapping to physical
> > > >> > aggregates
> > > >> > > > because the physical implementation can be either 1-phase or
> > > >> 2-phase.
> > > >> > It
> > > >> > > > doesn't matter that the 2-phase aggregate is a composition of
> > two
> > > >> > 1-phase
> > > >> > > > aggregates: the whole decision of whether to go with 1-phase or
> > > >> 2-phase
> > > >> > > > aggregate is a physical decision that should be made based on
> > > >> > available (or
> > > >> > > > assumed) input traits.
> > > >> > > >
> > > >> > > > Consider the following logical tree:
> > > >> > > > LogicalAggregate[group=$0, agg=SUM($1)]
> > > >> > > >   Input
> > > >> > > >
> > > >> > > > If I do the split on the logical phase with a separate
> > > >> transformation
> > > >> > rule,
> > > >> > > > I will get the following tree:
> > > >> > > > LogicalAggregate[group=$0, agg=SUM($1)]
> > > >> > > >   LogicalAggregate[group=$0, agg=SUM($1)]
> > > >> > > >     Input
> > > >> > > >
> > > >> > > > Now we have an infinite loop because the rule takes one
> > aggregate
> > > >> and
> > > >> > > > produces two aggregates. To fix that, we may extend the
> > > >> > LogicalAggregate
> > > >> > > > with some flag or so. But this (1) potentially breaks other
> > > >> > LogicalAggregate
> > > >> > > > optimizations (e.g., transpose with other operators), and (2)
> > breaks
> > > >> > the
> > > >> > > > whole idea of the logical operators because the execution phase
> > > >> > > > (pre-aggregate of final aggregate) is a property of concrete
> > > >> backend,
> > > >> > not a
> > > >> > > > property of relational algebra. Therefore, the typical way to
> > > >> optimize
> > > >> > > > LogicalAggregate is to split in the physical phase
> > (implementation
> > > >> > rule,
> > > >> > > > pass-through, derive). Practical systems like Dremio [1] and
> > Flink
> > > >> [2]
> > > >> > > > work this way.
> > > >> > > >
> > > >> > > > That said, as an optimizer developer, I need the flexibility to
> > emit
> > > >> > any
> > > >> > > > physical trees for the given logical operator, and 1-1 mapping
> > > >> cannot
> > > >> > be
> > > >> > > > assumed. Calcite's API allows for that, and I am not aware of
> > formal
> > > >> > > > documentation or guidelines that discourage that.
> > > >> > > >
> > > >> > > > Now the question when exactly to emit the operators. Normally,
> > we
> > > >> > produce
> > > >> > > > operators from rules. As discussed above, if the logical
> > operator
> > > >> may
> > > >> > > > produce different physical trees depending on input traits, the
> > > >> > > > recommendation is to emit all combinations, even though we do
> > not
> > > >> know
> > > >> > > > whether there would be good inputs for that alternatives. This
> > > >> > contradicts
> > > >> > > > the idea of the guided top-down search, where we explore the
> > search
> > > >> > space
> > > >> > > > in response to a concrete optimization request, rather than
> > with a
> > > >> > > > pessimistic assumption that a certain plan might be required in
> > the
> > > >> > future.
> > > >> > > >
> > > >> > > > I found a way to mitigate this problem partially. Funny, my
> > > >> solution is
> > > >> > > > almost similar to what Haisheng proposed for the Window
> > operator.
> > > >> > > > 1. For every logical operator, I emit a single physical operator
> > > >> from
> > > >> > the
> > > >> > > > implementation rule, maintaining the exact 1-1 mapping. The
> > emitted
> > > >> > > > operators (1) have a special flag "template" which makes their
> > const
> > > >> > > > infinite, (2) never exposes or demands non-default traits
> > except for
> > > >> > > > convention, (3) have OMAKASE derivation mode.
> > > >> > > > 2. When the input is optimized, the "derive" is called on the
> > > >> template,
> > > >> > > > which produces the concrete physical tree, that is not
> > necessarily
> > > >> 1-1
> > > >> > to
> > > >> > > > the original logical node.
> > > >> > > >
> > > >> > > > Before rule:
> > > >> > > > LogicalAggregate[group=$0, agg=SUM($1)]
> > > >> > > >   LogicalInput
> > > >> > > >
> > > >> > > > After rule:
> > > >> > > > PhysicalAggregate[group=$0, agg=SUM($1), template=true,
> > > >> cost=infinite]
> > > >> > > >   LogicalInput
> > > >> > > >
> > > >> > > > After "derive" if the input is not shared on $0:
> > > >> > > > PhysicalAggregate[group=$0, agg=SUM($1)]
> > > >> > > >   Exchange
> > > >> > > >     PhysicalAggregate[group=$0, agg=SUM($1)]
> > > >> > > >       PhysicalInputNotSharded
> > > >> > > >
> > > >> > > > After "derive" if the input is shared on $0:
> > > >> > > > PhysicalAggregate[group=$0, agg=SUM($1)]
> > > >> > > >   PhysicalInputNotSharded
> > > >> > > >
> > > >> > > > This approach allows me to avoid the generation of unnecessary
> > > >> > alternatives
> > > >> > > > by delaying the optimization to derive phase. The aggregate
> > split is
> > > >> > > > implemented in rules in Dremio/Flink, but in my case, this logic
> > > >> > migrates
> > > >> > > > to "derive".
> > > >> > > >
> > > >> > > > This solution worked well for the whole TPC-DS suite until we
> > > >> wanted to
> > > >> > > > optimize combinations of operators rather than individual
> > > >> operators. A
> > > >> > good
> > > >> > > > example is TPC-DS query 1 [3]. During the logical optimization,
> > we
> > > >> get
> > > >> > the
> > > >> > > > following logical tree, which is exactly the case that I
> > > >> demonstrated
> > > >> > at
> > > >> > > > the beginning of this mail thread:
> > > >> > > > G1: Aggregate(groupBy=[ctr_store_sk])
> > > >> > > > G2:  Aggregate(groupBy=[ctr_customer_sk, ctr_store_sk]
> > > >> > > >
> > > >> > > > And this is where I got stuck. I need to do a simple thing -
> > > >> propagate
> > > >> > an
> > > >> > > > optimization request from G1 to G2, informing G2 that it should
> > > >> > consider
> > > >> > > > the distribution [ctr_store_sk]. I can deliver that request to
> > my
> > > >> > physical
> > > >> > > > template in G2 through "convert". But the problem is that the
> > > >> current
> > > >> > > > Calcite implementation doesn't allow me to satisfy this request
> > > >> later
> > > >> > on in
> > > >> > > > the derivation stage. Instead, I am forced to emit the final
> > > >> execution
> > > >> > tree
> > > >> > > > from the "passThrough" method, which will not be notified at the
> > > >> > derivation
> > > >> > > > stage. I prepared a scheme [4] that demonstrates the problem.
> > > >> > > >
> > > >> > > > It feels that I almost achieved what I need. The last step is to
> > > >> ensure
> > > >> > > > that "derive" is called on the newly created template. And this
> > is
> > > >> > where I
> > > >> > > > think I reach the inflexibility of the current top-down
> > optimizer
> > > >> > > > implementation. The current design forces us to define all
> > possible
> > > >> > > > structures of physical operators in advance, but I want to
> > delay the
> > > >> > > > decision to the derive stage when input traits are known because
> > > >> these
> > > >> > > > traits are essential to make the proper physical decisions.
> > > >> > > >
> > > >> > > > There are some similarities with Haisheng's proposal about the
> > > >> Window
> > > >> > > > operator. We also maintain the 1-1 correspondence between the
> > > >> logical
> > > >> > > > operator and a physical template. However, Haisheng's proposal
> > is
> > > >> > basically
> > > >> > > > heuristic, as we split optimization into two phases
> > (implementation,
> > > >> > > > post-processing). It is impossible to properly calculate the
> > cost of
> > > >> > the
> > > >> > > > Window operator because we do not know which exchanges would be
> > > >> needed
> > > >> > > > before the post-processing. In my case, we do the proper cost
> > > >> > estimation
> > > >> > > > within a single expanded MEMO.
> > > >> > > >
> > > >> > > > Now switching to theoretical considerations. We may make several
> > > >> > > > observations from the previous discussion:
> > > >> > > > 1) Our ideas converge to the solution where every logical
> > operator
> > > >> has
> > > >> > a
> > > >> > > > single corresponding physical operator, which is later expanded
> > into
> > > >> > more
> > > >> > > > alternatives.
> > > >> > > > 2) Optimization requests are basically sent to RelSet-s, not
> > > >> > RelSubset-s,
> > > >> > > > as we make pairwise comparisons between the requested RelSubset
> > and
> > > >> > other
> > > >> > > > subsets in the set [5][6].
> > > >> > > > 3) Irrespective of the design, the complete exploration requires
> > > >> > multiple
> > > >> > > > invocations of some implementation logic for different
> > combinations
> > > >> of
> > > >> > > > required traits and available input traits.
> > > >> > > >
> > > >> > > > These observations led me to think that maybe trait propagation
> > > >> through
> > > >> > > > some dedicated nodes (templates in my case and Haisheng's Window
> > > >> > proposal,
> > > >> > > > or pessimistically emitted physical nodes in the previous
> > > >> > Jinpeng/Haisheng
> > > >> > > > proposal) is not the ideal design, at least for some cases.
> > > >> > > >
> > > >> > > > From the design standpoint, we propagate traits top-down and
> > > >> bottom-up
> > > >> > > > across equivalence groups, not individual RelSubset-s or
> > RelNode-s.
> > > >> > > > Currently, we ignore the optimization context when optimizing
> > the
> > > >> group
> > > >> > > > (except for the cost pruning). Rules emit partially constructed
> > > >> nodes
> > > >> > since
> > > >> > > > neither parent requirements nor child traits are available to
> > the
> > > >> rule.
> > > >> > > >
> > > >> > > > Instead, there could exist a true guided top-down optimization
> > flow
> > > >> > when
> > > >> > > > the "guided" term applies to rules as well:
> > > >> > > > 1. Pass-through: RelSet receives an optimization request and
> > chooses
> > > >> > > > appropriate implementation rules to fire. A rule receives
> > > >> optimization
> > > >> > > > requests, constructs optimization requests for children
> > (adjusting
> > > >> > traits,
> > > >> > > > optimization budget, etc.), then sends these requests down. The
> > > >> process
> > > >> > > > repeated recursively until we either reach the bottom node or
> > some
> > > >> set
> > > >> > that
> > > >> > > > is already optimized for this request.
> > > >> > > > 3. Derive: given the now known input traits, emit appropriate
> > > >> physical
> > > >> > > > nodes from the rule. Then notify the parent. Repeat the process
> > > >> > recursively.
> > > >> > > >
> > > >> > > > For common use cases, this design would require the same logic,
> > > >> which
> > > >> > is
> > > >> > > > currently split between rules, "derive" and "passThrough", just
> > the
> > > >> > code
> > > >> > > > location will be different, as everything now converges to the
> > rule.
> > > >> > But
> > > >> > > > for the advanced use cases, that approach may allow for more
> > > >> flexible
> > > >> > > > optimization patterns, like for these two chained aggregates.
> > > >> > > >
> > > >> > > > I'll try to implement both solutions - (1) emit multiple nodes
> > from
> > > >> a
> > > >> > > > physical rule, and (2) enable derivation for some nodes emitted
> > from
> > > >> > > > "passThrough", and share the results here.
> > > >> > > >
> > > >> > > > Regards,
> > > >> > > > Vladimir.
> > > >> > > >
> > > >> > > > [1]
> > > >> > > >
> > > >> >
> > > >>
> > https://github.com/dremio/dremio-oss/blob/8e85901e7222c81ccad3436ba9b63485503472ac/sabot/kernel/src/main/java/com/dremio/exec/planner/physical/HashAggPrule.java#L123-L146
> > > >> > > > [2]
> > > >> > > >
> > > >> >
> > > >>
> > https://github.com/apache/flink/blob/release-1.13.0/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashAggRule.scala#L101-L147
> > > >> > > > [3] https://github.com/Agirish/tpcds/blob/master/query1.sql
> > > >> > > > [4]
> > > >> > > >
> > > >> >
> > > >>
> > https://docs.google.com/document/d/1u7V4bNp51OjytKnS2kzD_ikHLiNUPbhjZfRjkTfdsDA/edit
> > > >> > > > [5]
> > > >> > > >
> > > >> >
> > > >>
> > https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java#L196
> > > >> > > > [6]
> > > >> > > >
> > > >> >
> > > >>
> > https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java#L203
> > > >> > > >
> > > >> > > > пт, 28 мая 2021 г. в 00:10, Haisheng Yuan <hy...@apache.org>:
> > > >> > > >
> > > >> > > > > Getting back to your window query example:
> > > >> > > > >
> > > >> > > > > > Consider the Window function:
> > > >> > > > > > SELECT
> > > >> > > > > >   AGG1 over (partition by a),
> > > >> > > > > >   AGG2 over (partition by b),
> > > >> > > > > >   AGG3 over (partition by c),
> > > >> > > > > >   ...
> > > >> > > > > > FROM input
> > > >> > > > >
> > > >> > > > > Window is quite special because the logical vs physical
> > operator
> > > >> > count is
> > > >> > > > > not 1 to 1, generally we generate a physical window operator
> > for
> > > >> each
> > > >> > > > > window function with different partition column. That
> > determines
> > > >> > that once
> > > >> > > > > the physical operators are created, their order can't be
> > changed.
> > > >> > Hence
> > > >> > > > > your proposal of passing required traits to physical rule can
> > > >> > mitigate the
> > > >> > > > > problem.
> > > >> > > > >
> > > >> > > > > But things would be much easier if we define a different
> > physical
> > > >> > window
> > > >> > > > > operator.
> > > >> > > > > For the above query, we can generate the *Single* physical
> > window
> > > >> > operator
> > > >> > > > > like this:
> > > >> > > > > PhysicalWindow[AGG1 over (partition by a), AGG2 over
> > (partition by
> > > >> > b),
> > > >> > > > > AGG3 over (partition by c)]
> > > >> > > > > or PhysicalWindow(a, b, c) for brevity.
> > > >> > > > > How do we define the physical properties for it?
> > > >> > > > > The operator delivers hash distribution on first window
> > partition
> > > >> > column
> > > >> > > > > a, but requires its child input to be hash distributed by its
> > last
> > > >> > window
> > > >> > > > > partition column c.
> > > >> > > > >
> > > >> > > > > If the parent operator request hash distribution on b, or c,
> > the
> > > >> > window
> > > >> > > > > operator will be called on "passthrough" method and generate
> > > >> > > > > PhysicalWindow(b, a, c), or PhysicalWindow(c, a, b). After
> > final
> > > >> > plan is
> > > >> > > > > generated, during post processing, we can replace the window
> > > >> > operator with
> > > >> > > > > multiple layer nested window operators, and insert Exchange
> > > >> > operators if
> > > >> > > > > necessary. But frankly speaking, I haven't seen any use cases
> > of
> > > >> > this kind
> > > >> > > > > in production.
> > > >> > > > >
> > > >> > > > > Regarding the rule alternative you proposed;
> > > >> > > > > > class PhysicalAggregateRule extends PhysicalRule {
> > > >> > > > > >  void onMatch(RelOptRuleCall call, *RelTraitSet
> > requiredTraits*)
> > > >> > {...
> > > >> > > > >
> > > >> > > > > Consider the following plan:
> > > >> > > > > InnerJoin (on a)
> > > >> > > > >   +-- Agg (on b)
> > > >> > > > >   +-- Scan
> > > >> > > > >
> > > >> > > > > For the inner join, we can generate sort merge join and hash
> > join.
> > > >> > > > > The sort merge join can request the following traits to Agg:
> > > >> > > > > 1) Singleton
> > > >> > > > > 2) hash distribution on a, sorted by a
> > > >> > > > > The hash join can request the following traits to Agg:
> > > >> > > > > 1) Singleton
> > > >> > > > > 2) hash distribution on a
> > > >> > > > > 3) any distribution
> > > >> > > > > 4) broadcast distribution
> > > >> > > > >
> > > >> > > > > The PhysicalAggregateRule will be called and executed 5 times,
> > > >> while
> > > >> > > > > generating the same physical aggregate candidates, unless we
> > pass
> > > >> a
> > > >> > whole
> > > >> > > > > list of required traits to the physical rule, which I have
> > > >> > prototyped some
> > > >> > > > > time ago with the exact idea.
> > > >> > > > >
> > > >> > > > > Regards,
> > > >> > > > > Haisheng Yuan
> > > >> > > > >
> > > >> > > > > On 2021/05/27 17:44:23, Haisheng Yuan <hy...@apache.org>
> > wrote:
> > > >> > > > > > >    In distributed systems, an implementation rule may
> > produce
> > > >> > different
> > > >> > > > > > >    physical operators depending on the input traits.
> > Examples
> > > >> are
> > > >> > > > > Aggregate,
> > > >> > > > > > >    Sort, Window.
> > > >> > > > > >
> > > >> > > > > > No, in most cases, physical operators are generated
> > regardless
> > > >> the
> > > >> > > > > input, because the input traits are not know yet. Window
> > might be
> > > >> an
> > > >> > > > > exception.
> > > >> > > > > >
> > > >> > > > > > >    Since input traits are not known when the rule is
> > fired, we
> > > >> > must
> > > >> > > > > > >    generate *all possible combinations* of physical
> > operators
> > > >> > that we
> > > >> > > > > may
> > > >> > > > > > >    need. For LogicalAggregate, we must generate 1-phase
> > and
> > > >> > 2-phase
> > > >> > > > > > >    alternatives. For LogicalSort, we also have 1-phase and
> > > >> > 2-phase
> > > >> > > > > > >    alternatives. Etc.
> > > >> > > > > >
> > > >> > > > > > IMHO, 1 phase and 2 phase are just different logical
> > > >> alternatives,
> > > >> > that
> > > >> > > > > is also why I call it a logical rule to split the aggregate
> > into
> > > >> a 2
> > > >> > phase
> > > >> > > > > aggregate. But HashAggregate and StreamAggregate are indeed
> > the
> > > >> > different
> > > >> > > > > physical alternatives for a LogicalAggregate.
> > > >> > > > > >
> > > >> > > > > >
> > > >> > > > > > >   Unlike Aggregate or Sort, which may have only 1 or 2
> > phases,
> > > >> > certain
> > > >> > > > > > >   logical operators may have many physical alternatives.
> > > >> > Consider the
> > > >> > > > > Window
> > > >> > > > > > >   function:......
> > > >> > > > > >
> > > >> > > > > > In window implementation rule, when building physical
> > operator
> > > >> for
> > > >> > > > > Window that has multiple window functions but with different
> > > >> > partition
> > > >> > > > > columns, we can infer the possible traits that can be
> > delivered by
> > > >> > input
> > > >> > > > > operators by creating your own RelMetaData, hence multiple
> > window
> > > >> > > > > combination with certain order, but not exhausted
> > enumeration. In
> > > >> > fact, the
> > > >> > > > > window ordering problem exists in every different kind of
> > > >> optimizer.
> > > >> > > > > >
> > > >> > > > > > > As input traits are not known when the rule is fired, the
> > > >> nodes
> > > >> > emitted
> > > >> > > > > > > from the implementation rules most likely would not be
> > used in
> > > >> > the
> > > >> > > > > final
> > > >> > > > > > > plan.
> > > >> > > > > >
> > > >> > > > > > That is quite normal, any operator generated by
> > implementation
> > > >> rule
> > > >> > > > > might not be used in the final plan, because there may be
> > tens of
> > > >> > thousands
> > > >> > > > > of alternatives, we only choose the one with lowest cost.
> > > >> > > > > >
> > > >> > > > > > > For example, I can create a physical aggregate that
> > demands
> > > >> > > > > > > non-strict distribution {a,b} from its input, meaning that
> > > >> both
> > > >> > [a,b]
> > > >> > > > > and
> > > >> > > > > > > [b,a] is ok. However, in the final plan, we are obligated
> > to
> > > >> > have a
> > > >> > > > > strict
> > > >> > > > > > > distribution - either [a, b] in that order, or [b, a] in
> > that
> > > >> > order -
> > > >> > > > > > > otherwise, physical operators like Join and Union will not
> > > >> work.
> > > >> > > > > >
> > > >> > > > > > It depends on your own satisfaction model and how do you
> > > >> coordinate
> > > >> > > > > property requirement among child operators. Unlike Orca
> > optimizer,
> > > >> > where
> > > >> > > > > there is exact match, partial satisfying, orderless match etc,
> > > >> > Calcite's
> > > >> > > > > default implementation always require exact satisfying. But
> > we can
> > > >> > still
> > > >> > > > > make use of "passThrough" and "derive" to achieve our goal.
> > i.e.
> > > >> the
> > > >> > > > > aggregate generated by implementation rule requires itself
> > and its
> > > >> > child to
> > > >> > > > > delivered distribution on [a,b], but the "derive" method tells
> > > >> > Aggregate
> > > >> > > > > that [b,a] is available, it can generate another option to
> > require
> > > >> > [b,a]
> > > >> > > > > instead.
> > > >> > > > > >
> > > >> > > > > > > In distributed engines, the nodes emitted from rules are
> > > >> > basically
> > > >> > > > > "templates"
> > > >> > > > > > > that must be replaced with normal nodes.
> > > >> > > > > >
> > > >> > > > > > There is no difference between distributed and
> > non-distributed
> > > >> > engines
> > > >> > > > > when dealing with this. In Orca and CockroachDB optimizer, the
> > > >> nodes
> > > >> > > > > emitted from rules are operators without physical properties,
> > the
> > > >> > optimizer
> > > >> > > > > then request physical properties in top-down manner, either
> > > >> > recursively or
> > > >> > > > > stack, or state machine. Calcite is quite different. when the
> > > >> > physical
> > > >> > > > > operator is generated by implementation rule, the physical
> > > >> operator
> > > >> > must
> > > >> > > > > has its own traits, at the same time, the traits that it
> > expects
> > > >> its
> > > >> > child
> > > >> > > > > operators to deliver. So in Calcite, they are not
> > "templates". The
> > > >> > > > > difference is there since Calcite's inception.
> > > >> > > > > >
> > > >> > > > > > Regards,
> > > >> > > > > > Haisheng Yuan
> > > >> > > > > >
> > > >> > > > > > On 2021/05/27 08:59:33, Vladimir Ozerov <ppoze...@gmail.com
> > >
> > > >> > wrote:
> > > >> > > > > > > Hi Haisheng,
> > > >> > > > > > >
> > > >> > > > > > > Thank you for your inputs. They are really helpful. Let me
> > > >> > summarize
> > > >> > > > > your
> > > >> > > > > > > feedback in my own words to verify that I understand it
> > > >> > correctly.
> > > >> > > > > > >
> > > >> > > > > > >    1. In distributed systems, an implementation rule may
> > > >> produce
> > > >> > > > > different
> > > >> > > > > > >    physical operators depending on the input traits.
> > Examples
> > > >> are
> > > >> > > > > Aggregate,
> > > >> > > > > > >    Sort, Window.
> > > >> > > > > > >    2. Since input traits are not known when the rule is
> > fired,
> > > >> > we must
> > > >> > > > > > >    generate *all possible combinations* of physical
> > operators
> > > >> > that we
> > > >> > > > > may
> > > >> > > > > > >    need. For LogicalAggregate, we must generate 1-phase
> > and
> > > >> > 2-phase
> > > >> > > > > > >    alternatives. For LogicalSort, we also have 1-phase and
> > > >> > 2-phase
> > > >> > > > > > >    alternatives. Etc.
> > > >> > > > > > >    3. If all combinations are generated, it is expected
> > that
> > > >> > > > > "passThrough"
> > > >> > > > > > >    and "derive" would be just trivial replacements of
> > traits
> > > >> for
> > > >> > most
> > > >> > > > > cases.
> > > >> > > > > > >    This is why "passThroughTraits" and "deriveTraits" are
> > > >> > recommended.
> > > >> > > > > A
> > > >> > > > > > >    notable exception is TableScan that may emit
> > alternative
> > > >> > indexes in
> > > >> > > > > > >    response to the pass-through requests.
> > > >> > > > > > >
> > > >> > > > > > > If my understanding is correct, then there are several
> > issues
> > > >> > with this
> > > >> > > > > > > approach still.
> > > >> > > > > > >
> > > >> > > > > > > 1. Unlike Aggregate or Sort, which may have only 1 or 2
> > > >> phases,
> > > >> > certain
> > > >> > > > > > > logical operators may have many physical alternatives.
> > > >> Consider
> > > >> > the
> > > >> > > > > Window
> > > >> > > > > > > function:
> > > >> > > > > > > SELECT
> > > >> > > > > > >   AGG1 over (partition by a),
> > > >> > > > > > >   AGG2 over (partition by b),
> > > >> > > > > > >   AGG3 over (partition by c),
> > > >> > > > > > >   ...
> > > >> > > > > > > FROM input
> > > >> > > > > > >
> > > >> > > > > > > To calculate each aggregate, we need to re-shuffle the
> > input
> > > >> > based on
> > > >> > > > > the
> > > >> > > > > > > partition key. The key question is the order of
> > reshuffling.
> > > >> If
> > > >> > the
> > > >> > > > > input
> > > >> > > > > > > is shared by [a], I want to calculate AGG1 locally and
> > then
> > > >> > re-shuffle
> > > >> > > > > the
> > > >> > > > > > > input to calculate other aggregates. For the remaining
> > AGG2
> > > >> and
> > > >> > AGG3,
> > > >> > > > > the
> > > >> > > > > > > order is also important. If the parent demands sharding by
> > > >> [b],
> > > >> > then
> > > >> > > > > the
> > > >> > > > > > > proper sequence is b-c-a:
> > > >> > > > > > > 1: Window[AGG2 over (partition by b)]     // SHARDED[b]
> > > >> > > > > > > 2:   Window[AGG3 over (partition by c)]   // SHARDED[c]
> > > >> > > > > > > 3:     Window[AGG1 over (partition by a)] // SHARDED[a]
> > > >> > > > > > > 4:       Input                            // SHARDED[a]
> > > >> > > > > > >
> > > >> > > > > > > But if the parent demands [c], the proper sequence is
> > c-b-a.
> > > >> > Since we
> > > >> > > > > do
> > > >> > > > > > > not know real distributions when the rule is fired, we
> > must
> > > >> emit
> > > >> > all
> > > >> > > > > the
> > > >> > > > > > > permutations to ensure that no optimization opportunity is
> > > >> > missed. But
> > > >> > > > > with
> > > >> > > > > > > complex window aggregate, this might be impractical
> > because we
> > > >> > will
> > > >> > > > > emit
> > > >> > > > > > > lots of unnecessary nodes.
> > > >> > > > > > >
> > > >> > > > > > > 2. As input traits are not known when the rule is fired,
> > the
> > > >> > nodes
> > > >> > > > > emitted
> > > >> > > > > > > from the implementation rules most likely would not be
> > used in
> > > >> > the
> > > >> > > > > final
> > > >> > > > > > > plan. For example, I can create a physical aggregate that
> > > >> demands
> > > >> > > > > > > non-strict distribution {a,b} from its input, meaning that
> > > >> both
> > > >> > [a,b]
> > > >> > > > > and
> > > >> > > > > > > [b,a] is ok. However, in the final plan, we are obligated
> > to
> > > >> > have a
> > > >> > > > > strict
> > > >> > > > > > > distribution - either [a, b] in that order, or [b, a] in
> > that
> > > >> > order -
> > > >> > > > > > > otherwise, physical operators like Join and Union will not
> > > >> work.
> > > >> > In
> > > >> > > > > > > distributed engines, the nodes emitted from rules are
> > > >> basically
> > > >> > > > > "templates"
> > > >> > > > > > > that must be replaced with normal nodes.
> > > >> > > > > > >
> > > >> > > > > > > Does this reasoning make any sense? If yes, it means that
> > the
> > > >> > current
> > > >> > > > > > > approach forces us to produce many unnecessary nodes to
> > > >> explore
> > > >> > the
> > > >> > > > > full
> > > >> > > > > > > search space. The question is whether alternative
> > approaches
> > > >> > could
> > > >> > > > > better
> > > >> > > > > > > fit the requirements of the distributed engine? This is a
> > > >> purely
> > > >> > > > > > > theoretical question. I am currently looking deeper at
> > > >> > CockroachDB.
> > > >> > > > > They
> > > >> > > > > > > have very different architecture: no separation between
> > > >> logical
> > > >> > and
> > > >> > > > > > > physical nodes, physical properties are completely
> > decoupled
> > > >> from
> > > >> > > > > nodes,
> > > >> > > > > > > usage of recursion instead of the stack, etc.
> > > >> > > > > > >
> > > >> > > > > > > Regards,
> > > >> > > > > > > Vladimir.
> > > >> > > > > > >
> > > >> > > > > > > чт, 27 мая 2021 г. в 03:19, Haisheng Yuan <
> > hy...@apache.org>:
> > > >> > > > > > >
> > > >> > > > > > > > Another point I would like to mention is that it is not
> > > >> > recommended
> > > >> > > > > to
> > > >> > > > > > > > override method "passThrough" and "derive" directly,
> > > >> override
> > > >> > > > > > > > "passThroughTraits" and "deriveTraits" instead, so that
> > we
> > > >> can
> > > >> > make
> > > >> > > > > sure
> > > >> > > > > > > > only the same type of physical node is created and no
> > nested
> > > >> > > > > relnodes or
> > > >> > > > > > > > additional RelSets are created, unless you know you
> > have to
> > > >> > create
> > > >> > > > > > > > different type of nodes. For example, if the table foo
> > has
> > > >> an
> > > >> > btree
> > > >> > > > > index
> > > >> > > > > > > > on column a, and the parent relnode is requesting
> > ordering
> > > >> on
> > > >> > column
> > > >> > > > > a,
> > > >> > > > > > > > then we may consider to override "passThrough" of
> > TableScan
> > > >> to
> > > >> > > > > return an
> > > >> > > > > > > > IndexScan instead of a TableScan.
> > > >> > > > > > > >
> > > >> > > > > > > > Regards,
> > > >> > > > > > > > Haisheng Yuan
> > > >> > > > > > > > On 2021/05/26 22:45:20, Haisheng Yuan <hy...@apache.org
> > >
> > > >> > wrote:
> > > >> > > > > > > > > Hi Vladimir,
> > > >> > > > > > > > >
> > > >> > > > > > > > > 1. You need a logical rule to split the aggregate
> > into a
> > > >> > local
> > > >> > > > > aggregate
> > > >> > > > > > > > and global aggregate, for example:
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > >
> > > >> >
> > > >>
> > https://github.com/greenplum-db/gporca/blob/master/libgpopt/src/xforms/CXformSplitGbAgg.cpp
> > > >> > > > > > > > > Only implementation rules can convert a logical node
> > to a
> > > >> > physical
> > > >> > > > > node
> > > >> > > > > > > > or multiple physical nodes.
> > > >> > > > > > > > > After physical implementation, you have 2 physical
> > > >> > alternatives:
> > > >> > > > > > > > > 1) single phase global physical aggregate,
> > > >> > > > > > > > > 2) 2 phase physical aggregate with local and global
> > > >> > aggregate.
> > > >> > > > > > > > > It should be up to the cost to decide which one to
> > choose.
> > > >> > > > > > > > >
> > > >> > > > > > > > > 2. Given a desired traitset from parent node, the
> > current
> > > >> > relnode
> > > >> > > > > only
> > > >> > > > > > > > needs to generate a single relnode after passing down
> > the
> > > >> > traitset.
> > > >> > > > > Given a
> > > >> > > > > > > > traitset delivered by child node, the current relnode
> > only
> > > >> > derive a
> > > >> > > > > single
> > > >> > > > > > > > relnode. Quite unlike other optimizer, in Calcite's
> > top-down
> > > >> > > > > optimizer, you
> > > >> > > > > > > > don't need to worry about issuing multiple optimization
> > > >> > requests to
> > > >> > > > > inputs,
> > > >> > > > > > > > which is handled by Calcite framework secretly. i.e.
> > > >> > > > > > > > > SELECT a, b, min(c) from foo group by a, b;
> > > >> > > > > > > > > In many other optimizer, we probably need ask the
> > > >> aggregate
> > > >> > to
> > > >> > > > > issue 3
> > > >> > > > > > > > distribution requests for tablescan on foo, which are
> > > >> > > > > > > > > 1) hash distributed by a,
> > > >> > > > > > > > > 2) hash distributed by b,
> > > >> > > > > > > > > 3) hash distributed by a, b
> > > >> > > > > > > > > However in Calcite top-down optimizer, your physical
> > > >> > > > > implementation rule
> > > >> > > > > > > > for global aggregate only need generate a single
> > physical
> > > >> node
> > > >> > with
> > > >> > > > > hash
> > > >> > > > > > > > distribution by a, b. In case the table foo happens to
> > be
> > > >> > > > > distributed by a,
> > > >> > > > > > > > or b, the derive() method will tell you there is an
> > > >> > opportunity.
> > > >> > > > > This is
> > > >> > > > > > > > the feature that Calcite's top-down optimizer excels
> > over
> > > >> other
> > > >> > > > > optimizers,
> > > >> > > > > > > > because this can dramatically reduce the search space
> > while
> > > >> > keeping
> > > >> > > > > the
> > > >> > > > > > > > optimal optimization opportunity.
> > > >> > > > > > > > >
> > > >> > > > > > > > > 3. This is by design. Nodes produced from
> > "passThrough"
> > > >> and
> > > >> > > > > "derive" and
> > > >> > > > > > > > just sibling physical node with different traitset, we
> > only
> > > >> > need the
> > > >> > > > > > > > initial physical nodes after implementation to avoid
> > > >> > unnecessary
> > > >> > > > > > > > operations. The fundamental reason is, unlike Orca
> > optimizer
> > > >> > where
> > > >> > > > > physical
> > > >> > > > > > > > node and physical property are separate things,
> > Calcite's
> > > >> > > > > logical/physical
> > > >> > > > > > > > nodes contains traitset. With regard to the latter
> > question,
> > > >> > can you
> > > >> > > > > give
> > > >> > > > > > > > an example?
> > > >> > > > > > > > >
> > > >> > > > > > > > > Regards,
> > > >> > > > > > > > > Haisheng Yuan
> > > >> > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > > > On 2021/05/26 20:11:57, Vladimir Ozerov <
> > > >> ppoze...@gmail.com>
> > > >> > > > > wrote:
> > > >> > > > > > > > > > Hi,
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > I tried to optimize a certain combination of
> > operators
> > > >> for
> > > >> > the
> > > >> > > > > > > > distributed
> > > >> > > > > > > > > > engine and got stuck with the trait propagation in
> > the
> > > >> > top-down
> > > >> > > > > > > > engine. I
> > > >> > > > > > > > > > want to ask the community for advice on whether the
> > > >> > problem is
> > > >> > > > > solvable
> > > >> > > > > > > > > > with the current Apache Calcite implementation or
> > not.
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Consider the following logical tree:
> > > >> > > > > > > > > > 3: LogicalAggregate[group=[a], F2(c)]
> > > >> > > > > > > > > > 2:  LogicalAggregate[group=[a,b], F1(c)]
> > > >> > > > > > > > > > 1:    LogicalScan[t]
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Consider that these two aggregates cannot be merged
> > or
> > > >> > > > > simplified for
> > > >> > > > > > > > > > whatever reason. We have only a set of physical
> > rules to
> > > >> > > > > translate this
> > > >> > > > > > > > > > logical tree to a physical tree. Also, there could
> > be
> > > >> any
> > > >> > number
> > > >> > > > > of
> > > >> > > > > > > > > > other operators between these two aggregates. We
> > omit
> > > >> them
> > > >> > for
> > > >> > > > > clarity,
> > > >> > > > > > > > > > assuming that the distribution is not destroyed.
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > In the distributed environment, non-collocated
> > > >> aggregates
> > > >> > are
> > > >> > > > > often
> > > >> > > > > > > > > > implemented in two phases: local pre-aggregation and
> > > >> final
> > > >> > > > > aggregation,
> > > >> > > > > > > > > > with an exchange in between. Consider that the Scan
> > > >> > operator is
> > > >> > > > > hash
> > > >> > > > > > > > > > distributed by some key other than [a] or [b]. If we
> > > >> > optimize
> > > >> > > > > operators
> > > >> > > > > > > > > > without considering the whole plan, we may optimize
> > each
> > > >> > operator
> > > >> > > > > > > > > > independently, which would give us the following
> > plan:
> > > >> > > > > > > > > > 3: PhysicalAggregate[group=[a], F2_phase2(c)]
> > > >> >  //
> > > >> > > > > > > > > > HASH_DISTRIBUTED [a]
> > > >> > > > > > > > > > 3:   Exchange[a]
> > > >> > //
> > > >> > > > > > > > > > HASH_DISTRIBUTED [a]
> > > >> > > > > > > > > > 3:     PhysicalAggregate[group=[a], F2_phase1(c)]
> > > >> >  //
> > > >> > > > > > > > > > HASH_DISTRIBUTED [a,b]
> > > >> > > > > > > > > > 2:       PhysicalAggregate[group=[a,b],
> > F1_phase2(c)]
> > > >> >  //
> > > >> > > > > > > > > > HASH_DISTRIBUTED [a,b]
> > > >> > > > > > > > > > 2:         Exchange[a, b]
> > > >> >  //
> > > >> > > > > > > > > > HASH_DISTRIBUTED [a,b]
> > > >> > > > > > > > > > 2:           PhysicalAggregate[group=[a,b],
> > > >> F1_phase1(c)]
> > > >> > //
> > > >> > > > > > > > > > HASH_DISTRIBUTED [d]
> > > >> > > > > > > > > > 1:             PhysicalScan[t]
> > > >> > //
> > > >> > > > > > > > > > HASH_DISTRIBUTED [d]
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > This plan is not optimal, because we re-hash inputs
> > > >> twice.
> > > >> > A
> > > >> > > > > better
> > > >> > > > > > > > plan
> > > >> > > > > > > > > > that we want to get:
> > > >> > > > > > > > > > 3: PhysicalAggregate[group=[a], F2(c)]
> >   //
> > > >> > > > > > > > HASH_DISTRIBUTED
> > > >> > > > > > > > > > [a]
> > > >> > > > > > > > > > 2:   PhysicalAggregate[group=[a,b], F1_phase2(c)]
> >    //
> > > >> > > > > > > > HASH_DISTRIBUTED
> > > >> > > > > > > > > > [a]
> > > >> > > > > > > > > > 2:     Exchange[a]
> >   //
> > > >> > > > > > > > HASH_DISTRIBUTED
> > > >> > > > > > > > > > [a]
> > > >> > > > > > > > > > 2:       PhysicalAggregate[group=[a,b],
> > F1_phase1(c)] //
> > > >> > > > > > > > HASH_DISTRIBUTED
> > > >> > > > > > > > > > [d]
> > > >> > > > > > > > > > 1:         PhysicalScan[t]
> >   //
> > > >> > > > > > > > HASH_DISTRIBUTED
> > > >> > > > > > > > > > [d]
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > In this case, we take advantage of the fact that the
> > > >> > > > > distribution [a]
> > > >> > > > > > > > is
> > > >> > > > > > > > > > compatible with [a,b]. Therefore we may enforce only
> > > >> [a],
> > > >> > > > > instead of
> > > >> > > > > > > > doing
> > > >> > > > > > > > > > [a,b] and then [a]. Since exchange operators are
> > very
> > > >> > expensive,
> > > >> > > > > this
> > > >> > > > > > > > > > optimization may bring a significant boost to the
> > query
> > > >> > engine.
> > > >> > > > > Now the
> > > >> > > > > > > > > > question - how do we reach that state? Intuitively,
> > a
> > > >> > > > > pass-through is
> > > >> > > > > > > > > > exactly what we need. We may pass the optimization
> > > >> request
> > > >> > from
> > > >> > > > > top
> > > >> > > > > > > > > > aggregate to bottom aggregate to find physical
> > > >> > implementations
> > > >> > > > > shared
> > > >> > > > > > > > by
> > > >> > > > > > > > > > [a]. But the devil is in the details - when and how
> > > >> > exactly to
> > > >> > > > > pass
> > > >> > > > > > > > this
> > > >> > > > > > > > > > request?
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Typically, we have a conversion rule that converts a
> > > >> > logical
> > > >> > > > > aggregate
> > > >> > > > > > > > to a
> > > >> > > > > > > > > > physical aggregate. We may invoke "convert" on the
> > > >> input to
> > > >> > > > > initiate
> > > >> > > > > > > > the
> > > >> > > > > > > > > > pass-through:
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > RelNode convert(...) {
> > > >> > > > > > > > > >     return new PhysicalAggregate(
> > > >> > > > > > > > > >         convert(input, HASH_DISTRIBUTED[a])
> > > >> > > > > > > > > >     )
> > > >> > > > > > > > > > }
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > The first problem - we cannot create the normal
> > physical
> > > >> > > > > aggregate here
> > > >> > > > > > > > > > because we do not know input traits yet. The final
> > > >> decision
> > > >> > > > > whether to
> > > >> > > > > > > > do a
> > > >> > > > > > > > > > one-phase or two-phase aggregate can be made only
> > in the
> > > >> > > > > > > > > > "PhysicalNode.derive" method when concrete input
> > traits
> > > >> are
> > > >> > > > > resolved.
> > > >> > > > > > > > > > Therefore the converter rule should create a kind of
> > > >> > "template"
> > > >> > > > > > > > physical
> > > >> > > > > > > > > > operator, which would be used to construct the final
> > > >> > operator(s)
> > > >> > > > > when
> > > >> > > > > > > > input
> > > >> > > > > > > > > > traits are resolved. AFAIU Enumerable works
> > similarly:
> > > >> we
> > > >> > create
> > > >> > > > > > > > operators
> > > >> > > > > > > > > > with virtually arbitrary traits taken from logical
> > nodes
> > > >> > in the
> > > >> > > > > > > > conversion
> > > >> > > > > > > > > > rules. We only later do create normal nodes in the
> > > >> derive()
> > > >> > > > > methods.
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > The second problem - our top aggregate doesn't
> > actually
> > > >> > need the
> > > >> > > > > > > > > > HASH_DISTRIBUTED[a] input. Instead, it may accept
> > inputs
> > > >> > with any
> > > >> > > > > > > > > > distribution. What we really need is to inform the
> > input
> > > >> > (bottom
> > > >> > > > > > > > aggregate)
> > > >> > > > > > > > > > that it should look for additional implementations
> > that
> > > >> > satisfy
> > > >> > > > > > > > > > HASH_DISTRIBUTED[a]. Therefore, enforcing a specific
> > > >> > > > > distribution on
> > > >> > > > > > > > the
> > > >> > > > > > > > > > input using the "convert" method is not what we need
> > > >> > because this
> > > >> > > > > > > > > > conversion might enforce unnecessary exchanges.
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > The third problem - derivation. Consider that we
> > > >> delivered
> > > >> > the
> > > >> > > > > > > > optimization
> > > >> > > > > > > > > > request to the bottom aggregate. As an implementor,
> > what
> > > >> > am I
> > > >> > > > > supposed
> > > >> > > > > > > > to
> > > >> > > > > > > > > > do in this method? I cannot return the final
> > aggregate
> > > >> > from here
> > > >> > > > > > > > because
> > > >> > > > > > > > > > the real input traits are not derived yet.
> > Therefore, I
> > > >> > can only
> > > >> > > > > return
> > > >> > > > > > > > > > another template, hoping that the "derive" method
> > will
> > > >> be
> > > >> > called
> > > >> > > > > on it.
> > > >> > > > > > > > > > However, this will not happen because trait
> > derivation
> > > >> is
> > > >> > > > > skipped on
> > > >> > > > > > > > the
> > > >> > > > > > > > > > nodes emitted from pass-through. See
> > > >> "DeriveTrait.perform"
> > > >> > [1].
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > BottomAggregate {
> > > >> > > > > > > > > >     RelNode
> > > >> passThrough(distribution=HASH_DISTRIBUTED[a]) {
> > > >> > > > > > > > > >         // ???
> > > >> > > > > > > > > >     }
> > > >> > > > > > > > > > }
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > I feel that I am either going in the wrong
> > direction, or
> > > >> > some
> > > >> > > > > gaps in
> > > >> > > > > > > > the
> > > >> > > > > > > > > > product disallow such optimization. So I would like
> > to
> > > >> ask
> > > >> > the
> > > >> > > > > > > > community to
> > > >> > > > > > > > > > assist with the following questions:
> > > >> > > > > > > > > > 1. In the top-down optimizer, how should we convert
> > a
> > > >> > logical
> > > >> > > > > node to a
> > > >> > > > > > > > > > physical node, provided that "derive" is not called
> > > >> yet? I
> > > >> > have
> > > >> > > > > a gut
> > > >> > > > > > > > > > feeling that the trait propagation is currently not
> > > >> > implemented
> > > >> > > > > to the
> > > >> > > > > > > > full
> > > >> > > > > > > > > > extent because based on Cascades paper I would
> > expect
> > > >> that
> > > >> > parent
> > > >> > > > > > > > physical
> > > >> > > > > > > > > > nodes are produced after the child physical nodes.
> > But
> > > >> in
> > > >> > our
> > > >> > > > > rules,
> > > >> > > > > > > > this
> > > >> > > > > > > > > > is not the case - some physical nodes are produced
> > > >> before
> > > >> > the
> > > >> > > > > trait
> > > >> > > > > > > > > > derivation.
> > > >> > > > > > > > > > 2. How to propagate several optimization requests to
> > > >> > inputs? We
> > > >> > > > > need
> > > >> > > > > > > > either
> > > >> > > > > > > > > > inputs with a specific distribution or inputs with
> > an
> > > >> > arbitrary
> > > >> > > > > > > > > > distribution in the example above. It seems that to
> > > >> achieve
> > > >> > > > > that, I
> > > >> > > > > > > > need to
> > > >> > > > > > > > > > emit several alternative nodes with different
> > > >> requirements
> > > >> > to
> > > >> > > > > inputs.
> > > >> > > > > > > > Does
> > > >> > > > > > > > > > it make sense?
> > > >> > > > > > > > > > 3. Why are nodes produced from the "passThrough"
> > method
> > > >> > excluded
> > > >> > > > > from
> > > >> > > > > > > > trait
> > > >> > > > > > > > > > derivation? If this is by design, how can I
> > preserve the
> > > >> > > > > optimization
> > > >> > > > > > > > > > request to satisfy it on the derivation stage when
> > input
> > > >> > traits
> > > >> > > > > are
> > > >> > > > > > > > > > available?
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > Regards,
> > > >> > > > > > > > > > Vladimir.
> > > >> > > > > > > > > >
> > > >> > > > > > > > > > [1]
> > > >> > > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > >
> > > >> >
> > > >>
> > https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java#L828
> > > >> > > > > > > > > >
> > > >> > > > > > > > >
> > > >> > > > > > > >
> > > >> > > > > > >
> > > >> > > > > >
> > > >> > > > >
> > > >> > > >
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
> >
> 

Reply via email to