How does it relate with "derive" to get the desired plan?

Initially PhysicalAggregate1 requests HASH[b,c], PhysicalAggregate2 requests 
HASH[a,b,c]. PhysicalAggregate2 is called on "passthrough" by passing 
HASH[b,c], then generate another PhysicalAggregate2 with trait HASH[b,c]. You 
don't need the involvement of "derive".

Haisheng Yuan

On 2021/06/13 16:58:53, Vladimir Ozerov <ppoze...@gmail.com> wrote: 
> Hi,
> 
> I tried to apply different approaches, but eventually, I failed to achieve
> my goals. It seems that the current implementation cannot handle the
> required scenario, as explained below.
> 
> Consider the following tree:
> LogicalAggregate1[group=[b,c]]
>   LogicalAggregate2[group=[a,b,c]]
>     LogicalInput
> 
> I want to find the plan to do these two aggregations without an exchange in
> between because they may have compatible distributions. Example:
> PhysicalAggregate1[group=[b,c]]     // SHARDED[b,c]
>   PhysicalAggregate2[group=[a,b,c]] // SHARDED[b,c]
>     Exchange                        // SHARDED[b,c]
>       PhysicalInput                 // SHARDED[?]
> 
> The fundamental problem is that it is impossible to save the optimization
> request and resolve traits in the "derive" phase afterward. What we need is
> to send the optimization request "SHARDED by [b,c] in any order" to
> PhysicalAggregate2, and use it in the derive phase so that the new
> PhysicalAggregate2 is created with [b,c] or [c,b], but strictly without
> [a]. Unfortunately, this doesn't work because the nodes emitted from the
> pass-through do not participate in the "derive" phase.
> 
> This could be fixed with a trivial change - to allow certain nodes emitted
> from the "passThrough" to participate in "derive". We can do that using a
> marker interface or an extension to a PhysicalRel interface. For example:
> interface PhysicalRel {
>     boolean enforceDerive();
> }
> 
> When set to "true", the node would not be added to the pass-through cache.
> This way, we may use this node as *storage* for the optimization request.
> When the "derive" is called later, we know both the parent requirements and
> the child traits. This would be sufficient to solve my problem. I already
> tried to do this by disabling the pass-through cache completely and
> confirmed that the required plan is found.
> 
> Do you have any objections to such a change?
> 
> Regards,
> Vladimir.
> 
> сб, 29 мая 2021 г. в 11:59, Vladimir Ozerov <ppoze...@gmail.com>:
> 
> > Hi Haisheng, Jinpeng
> >
> > I think we are more or less on the same page:
> >
> >    1. The current implementation of Apache Calcite may generate wasteful
> >    alternatives because rules lack the optimization context.
> >    2. But the actual impact on efficiency is not clear.
> >
> > The (2) is essential to understand whether my efforts make any practical
> > sense. And so far, I have only a vague common sense and some simple
> > examples in mind, which is not sufficient to make any claims.
> >
> > Nevertheless, I've checked the source code of the original Columbia
> > optimizer. I was wrong in my original claim that Columbia doesn't pass
> > optimization context to rules. It does [1]. The context consists of
> > required traits and cost budget. In Apache Calcite terms, the context is
> > passed to both "RelRule.matches" and "RelRule.onMatch", so that the rule
> > may decide on the optimization strategy based on parent request. This is
> > exactly what I was trying to achieve in my system with some hacks around
> > derive/passThrough.
> >
> > Regarding the example with join, my proposal is not likely to make any
> > difference because the tables are not co-located on the join key, and hence
> > join may emit several distributions. Consider the different situation -
> > data is already collocated. Without the context, I will emit both 1-phase
> > and 2-phase aggregates because I do not know which distributions are
> > available below. With the context available, I can collect propagate
> > promising optimization requests from Aggregate rules (1-phase, 2-phase).
> > Then wait for input optimization and check what is returned. If only
> > [dist=a] is returned, I can skip the 2-phase aggregate completely.
> > Aggregate[group=a]
> >   Join[foo.a=bar.b]
> >     Input(foo, dist=a)
> >     Input(bar, dist=b)
> >
> > Another possible use case is join on several keys. By issuing a
> > context-aware optimization request [dist a1] from Aggregate to Join, we
> > can establish tight cost bounds on Aggregate and Join equivalence groups
> > very early so that all other options (broadcasts, sharding in [a1,a2], ...)
> > would be pruned without even entering MEMO.
> > Aggregate[group=a1]
> >   Join[foo.a1=bar.b1 AND foo.a2=bar.b2]
> >     Input(foo, dist=a1)
> >     Input(bar, dist=b2)
> >
> > As far as Jinpeng's example with logical multi-phase aggregates - I think
> > this is a great example of why logical split might be useful. Thank you for
> > that. This reminded me about another concerning use case. Consider an
> > Aggregate on top of a UnionAll:
> > LogicalAggregate[group=a, COUNT(b)]
> >   UnionAll
> >     Input1
> >     Input2
> >
> > With Calcite rules, we may push the aggregate down:
> > LogicalAggregate[group=a, SUM(COUNT)]
> >   UnionAll
> >     LogicalAggregate[group=a, COUNT(b)] // <-- Possible exchange here
> >       Input1
> >     LogicalAggregate[group=a, COUNT(b)] // <-- Possible exchange here
> >       Input2
> >
> > In my optimizer, all logical aggregates are treated in the same way. So if
> > the Input1 is not shared by [a], I will generate an exchange. However, if
> > we apply your suggestion, we may first split the logical aggregate into two
> > tagged logical aggregates:
> > LogicalAggregate[group=a, SUM(COUNT), type=global]
> >   LogicalAggregate[group=a, COUNT(b), type=local]
> >     UnionAll
> >       Input1
> >       Input2
> >
> > Then we may implement a transformation rule that pushes down only
> > pre-aggregates. As a result, bottom aggregates will be converted into
> > single-phase physical aggregate, leading to a much better plan.
> > LogicalAggregate[group=a, SUM(COUNT), type=global]
> >   UnionAll
> >     LogicalAggregate[group=a, COUNT(b), type=local] // <-- No exchange
> >       Input1
> >     LogicalAggregate[group=a, COUNT(b), type=local] // <-- No exchange
> >       Input2
> >
> > So I agree with you that logical optimization might be very useful. The
> > main practical concern is the complexity. We essentially introduce new
> > logical operators that cannot be used by the existing Apache Calcite
> > logical rule library in the general case.
> >
> > Regards,
> > Vladimir.
> >
> > [1]
> > https://github.com/yongwen/columbia/blob/master/header/rules.h#L383-L397
> >
> > сб, 29 мая 2021 г. в 04:30, Jinpeng Wu <wjpabc...@gmail.com>:
> >
> >> Hi, Vladimir.
> >>
> >> As another topic, it is highly recommended that you split the aggregation
> >> in logical stages, not only for traits related matters. It is true that
> >> you
> >> need to annotate the node with different flags or subclasses and it's a
> >> large refactor. But after that, you may find much much bigger benefits.
> >>
> >> The most important benefit is aggregation pushing down. For example, the
> >> query:
> >>
> >> select t1.value, agg(t2.value)  from t1 join t2 on t1.key = t2.key;
> >>
> >> You may be able to generate such plan:
> >>
> >> PhysicalAggregatePhase(group=t1.value, method=agg_final(t2.value))
> >>   Exchange(dist = t1.value)
> >>       Join (t1.key = t2.key)
> >>          Exchange(dist = t1.key)
> >>              scan(t1)
> >>          Exchange(dist = t2.key)
> >>              PhysicalAggregationPhase(group = t2.key, f_partial(a))
> >>                 scan(t2)
> >>
> >> The pushed "PhysicalAggregationPhase(group = t2.key, f_partial(a))" may be
> >> able to reduce the input data size of the exchange operation dramatically.
> >>
> >> There has been lots of research on aggregation push down. But partial
> >> aggregate pushing down could achieve much more benefits:
> >> 1. Unlike pushing down a full aggregation, the partial aggregate requires
> >> no extra exchanges. So it could be a pure gain.
> >> 2. The pushing down can apply to any aggregation functions, including
> >> user-defined aggregation functions.
> >> 3. By introducing the middle phase (the 3-pass aggregation
> >> implementation).
> >> Aggregation can be splitted into any number of phases and partial
> >> aggregation can be pushed down through any number of joins, somewhat like:
> >>
> >> AggregatePhase(final)
> >>    Exchange
> >>       AggregatePhase(middle)
> >>         JOIN
> >>            Exchange
> >>                AggregatePhase(middle)
> >>                  JOIN
> >>                      Exchange
> >>                          AggregatePhase(middle)
> >>                          ...
> >>                            JOIN
> >>                                Exchange
> >>                                    AggregatePhase(partial)
> >>                                        TableScan
> >>                    ...
> >> Note that AggregatePhase(middle) could work in an adaptive manner: after
> >> processing some data, if it discovers no data reduction, it could
> >> just degenerate to a NOP operation and can be very light weight.
> >>
> >> Thanks,
> >> Jinpeng Wu
> >>
> >>
> >> On Sat, May 29, 2021 at 8:32 AM Haisheng Yuan <hy...@apache.org> wrote:
> >>
> >> > > 2) Optimization requests are basically sent to RelSet-s, not
> >> RelSubset-s,
> >> > > as we make pairwise comparisons between the requested RelSubset and
> >> other
> >> > > subsets in the set [5][6].
> >> >
> >> > I agree with you. There could be some waste when the new delivered /
> >> > required traitset is generated by "passThrough"/ "derive", in which
> >> case,
> >> > we only need enforcer between the pair of subsets, instead of pairing
> >> with
> >> > all other required / delivered subsets in the RelSet. i.e.
> >> > In the MEMO group, we have 2 required traitsets:
> >> > 1) Hash[a] Sort[b]
> >> > 2) Hash[b] Sort[c]
> >> >
> >> > When we try to pass Hash[a] Sort[b] to one of physical operators say
> >> > Project, we found that we can pass down Hash[a] down to its child, then
> >> we
> >> > get a new physical Project with traitset Hash[a], we only need enforcer
> >> > between Hash[a] and Hash[a]Sort[b], but currently in method
> >> > "addConverters", we also generate enforcer between Hash[a] and
> >> > Hash[b]Sort[c], which is not actually what we want.
> >> >
> >> > I think it is definitely worth trying to optimize.
> >> >
> >> > Regards,
> >> > Haisheng Yuan
> >> > On 2021/05/28 19:15:03, Haisheng Yuan <hy...@apache.org> wrote:
> >> > > Hi Vladimir,
> >> > >
> >> > > The top-down optimizer does NOT require implementation rule to
> >> generate
> >> > 1 to 1 physical operator for a logical operator, as you can see, if you
> >> > generate a 2 phase physical aggregates for the logical aggregate in the
> >> > implementation rule, it still works. Window is special because we can
> >> > reshuffle the execution order of window functions, and that order makes
> >> a
> >> > difference according to different parent physical property request. A
> >> > single converged physical Window operator catered for this speciality.
> >> > However as I said I don't think it is a common scenario.
> >> > >
> >> > > > the whole decision of whether to go with 1-phase or 2-phase
> >> > > > aggregate is a physical decision that should be made based on
> >> > available (or
> >> > > > assumed) input traits.
> >> > > What is the problem of generating both 1-phase and 2-phase aggregates
> >> > and choose the best one based on the cost?
> >> > >
> >> > > Let's see the following query:
> >> > > select a, min(b) from (select * from foo, bar where foo.a=bar.a) t
> >> group
> >> > by a;
> >> > > suppose foo is randomly distributed fact table, and bar is randomly
> >> > distributed dimension table.
> >> > > Consider the 2 following plans:
> >> > > 1)
> >> > > PhysicalAggregate
> >> > >    +-- HashJoin
> >> > >               +--  HashDistribute by a
> >> > >                          +-- TableScan on foo
> >> > >               +--  HashDistribute by a
> >> > >                          +-- TableScan on bar
> >> > >
> >> > > 2)
> >> > > PhysicalAggregate(global)
> >> > >    +--  HashDistribute by a
> >> > >             +---- PhysicalAggregate(local)
> >> > >                         +---- HashJoin
> >> > >                                      +-- TableScan on foo
> >> > >                                      +--  Broadcast
> >> > >                                                +-- TableScan on bar
> >> > >
> >> > > Can you tell that the single phase aggregate plan is always better
> >> than
> >> > the 2 phase aggregate plan?
> >> > >
> >> > > > Therefore, the typical way to optimize
> >> > > > LogicalAggregate is to split in the physical phase (implementation
> >> > rule,
> >> > > > pass-through, derive). Practical systems like Dremio [1] and Flink
> >> [2]
> >> > > > work this way.
> >> > > Dremio and Flink work this way doesn't mean it is a good way.
> >> Greenplum
> >> > Orca and Alibaba MaxCompute optimizer work in another way. In Flink and
> >> > Dremio, they have HashAggPRule to generate 1 phase HashAgg and 2 phase
> >> > HashAgg, SortAggPRule to generate 1 phase SortAgg and 2 phase SortAgg.
> >> > However do you think there is possibility that the global SortAgg
> >> combined
> >> > with local HashAgg, or the global HashAgg combined with local SortAgg
> >> may
> >> > perform better in difference cases? Are you going to generate all the 4
> >> > combinations in the implementation rule? There are some cases we found
> >> we'd
> >> > better to split the aggregate into 3 phase aggregate [1], in which case,
> >> > will the implementation rule generate 3 HashAggs or 3 SortAggs, or all
> >> the
> >> > 6 combinations?
> >> > >
> >> > > In our system, we have 1 phase, 2 phase, 3 phase logical aggregate
> >> rules
> >> > to transform the LogicalAggregate to another kind of logical
> >> aggregate(s)
> >> > with phase info, say LogicalXXXAggregate, then our physical aggregate
> >> rules
> >> > match this kind of node to generate HashAgg or StreamAgg. Of course, in
> >> the
> >> > logical rules, we can add business logic to guess the possible traits
> >> > delivered by child nodes to determine whether the rule definitely won't
> >> > generate a better alternative and may decide to abort this
> >> transformation
> >> > early. But I would rather let the cost model decide.
> >> > >
> >> > > Admittedly, the current top-down optimization is not pure on-demand
> >> > request oriented, because it will always generate a physical request
> >> > regardless the parent nodes' trait request. For example the following
> >> query
> >> > in a non-distributed environment:
> >> > > select a, b, c, max(d) from foo group by a, b, c order by a desc;
> >> > >
> >> > > It will first generate a StreamAgg[a ASC, b ASC, c ASC] no matter what
> >> > the parent node requires, then the "passThrough" tells StreamAgg that
> >> > parent requires [a DESC], we get a StreamAgg[a DESC, b ASC, c ASC]. It
> >> > would be ideal if we only generate StreamAgg[a DESC, b ASC, c ASC] by
> >> > request, but I don't think that will make much difference, the
> >> bottleneck
> >> > relies on the join order enumeration and the Project related operation.
> >> > >
> >> > > Regards,
> >> > > Haisheng Yuan
> >> > >
> >> > > [1]
> >> >
> >> https://github.com/greenplum-db/gporca/blob/master/libgpopt/src/xforms/CXformSplitDQA.cpp
> >> > >
> >> > > On 2021/05/28 09:17:45, Vladimir Ozerov <ppoze...@gmail.com> wrote:
> >> > > > Hi Jinpeng, Haisheng,
> >> > > >
> >> > > > Thank you for your inputs. I really appreciate that. Let me try to
> >> > address
> >> > > > some of your comments and share some experience with the
> >> > implementation of
> >> > > > optimizers for a distributed engine I am currently working with.
> >> > > >
> >> > > > First of all, I would argue that multiple logical operators do not
> >> > have a
> >> > > > 1-1 mapping to physical operators, and Window is not special here.
> >> For
> >> > > > instance, LogicalAggregate doesn't have 1-1 mapping to physical
> >> > aggregates
> >> > > > because the physical implementation can be either 1-phase or
> >> 2-phase.
> >> > It
> >> > > > doesn't matter that the 2-phase aggregate is a composition of two
> >> > 1-phase
> >> > > > aggregates: the whole decision of whether to go with 1-phase or
> >> 2-phase
> >> > > > aggregate is a physical decision that should be made based on
> >> > available (or
> >> > > > assumed) input traits.
> >> > > >
> >> > > > Consider the following logical tree:
> >> > > > LogicalAggregate[group=$0, agg=SUM($1)]
> >> > > >   Input
> >> > > >
> >> > > > If I do the split on the logical phase with a separate
> >> transformation
> >> > rule,
> >> > > > I will get the following tree:
> >> > > > LogicalAggregate[group=$0, agg=SUM($1)]
> >> > > >   LogicalAggregate[group=$0, agg=SUM($1)]
> >> > > >     Input
> >> > > >
> >> > > > Now we have an infinite loop because the rule takes one aggregate
> >> and
> >> > > > produces two aggregates. To fix that, we may extend the
> >> > LogicalAggregate
> >> > > > with some flag or so. But this (1) potentially breaks other
> >> > LogicalAggregate
> >> > > > optimizations (e.g., transpose with other operators), and (2) breaks
> >> > the
> >> > > > whole idea of the logical operators because the execution phase
> >> > > > (pre-aggregate of final aggregate) is a property of concrete
> >> backend,
> >> > not a
> >> > > > property of relational algebra. Therefore, the typical way to
> >> optimize
> >> > > > LogicalAggregate is to split in the physical phase (implementation
> >> > rule,
> >> > > > pass-through, derive). Practical systems like Dremio [1] and Flink
> >> [2]
> >> > > > work this way.
> >> > > >
> >> > > > That said, as an optimizer developer, I need the flexibility to emit
> >> > any
> >> > > > physical trees for the given logical operator, and 1-1 mapping
> >> cannot
> >> > be
> >> > > > assumed. Calcite's API allows for that, and I am not aware of formal
> >> > > > documentation or guidelines that discourage that.
> >> > > >
> >> > > > Now the question when exactly to emit the operators. Normally, we
> >> > produce
> >> > > > operators from rules. As discussed above, if the logical operator
> >> may
> >> > > > produce different physical trees depending on input traits, the
> >> > > > recommendation is to emit all combinations, even though we do not
> >> know
> >> > > > whether there would be good inputs for that alternatives. This
> >> > contradicts
> >> > > > the idea of the guided top-down search, where we explore the search
> >> > space
> >> > > > in response to a concrete optimization request, rather than with a
> >> > > > pessimistic assumption that a certain plan might be required in the
> >> > future.
> >> > > >
> >> > > > I found a way to mitigate this problem partially. Funny, my
> >> solution is
> >> > > > almost similar to what Haisheng proposed for the Window operator.
> >> > > > 1. For every logical operator, I emit a single physical operator
> >> from
> >> > the
> >> > > > implementation rule, maintaining the exact 1-1 mapping. The emitted
> >> > > > operators (1) have a special flag "template" which makes their const
> >> > > > infinite, (2) never exposes or demands non-default traits except for
> >> > > > convention, (3) have OMAKASE derivation mode.
> >> > > > 2. When the input is optimized, the "derive" is called on the
> >> template,
> >> > > > which produces the concrete physical tree, that is not necessarily
> >> 1-1
> >> > to
> >> > > > the original logical node.
> >> > > >
> >> > > > Before rule:
> >> > > > LogicalAggregate[group=$0, agg=SUM($1)]
> >> > > >   LogicalInput
> >> > > >
> >> > > > After rule:
> >> > > > PhysicalAggregate[group=$0, agg=SUM($1), template=true,
> >> cost=infinite]
> >> > > >   LogicalInput
> >> > > >
> >> > > > After "derive" if the input is not shared on $0:
> >> > > > PhysicalAggregate[group=$0, agg=SUM($1)]
> >> > > >   Exchange
> >> > > >     PhysicalAggregate[group=$0, agg=SUM($1)]
> >> > > >       PhysicalInputNotSharded
> >> > > >
> >> > > > After "derive" if the input is shared on $0:
> >> > > > PhysicalAggregate[group=$0, agg=SUM($1)]
> >> > > >   PhysicalInputNotSharded
> >> > > >
> >> > > > This approach allows me to avoid the generation of unnecessary
> >> > alternatives
> >> > > > by delaying the optimization to derive phase. The aggregate split is
> >> > > > implemented in rules in Dremio/Flink, but in my case, this logic
> >> > migrates
> >> > > > to "derive".
> >> > > >
> >> > > > This solution worked well for the whole TPC-DS suite until we
> >> wanted to
> >> > > > optimize combinations of operators rather than individual
> >> operators. A
> >> > good
> >> > > > example is TPC-DS query 1 [3]. During the logical optimization, we
> >> get
> >> > the
> >> > > > following logical tree, which is exactly the case that I
> >> demonstrated
> >> > at
> >> > > > the beginning of this mail thread:
> >> > > > G1: Aggregate(groupBy=[ctr_store_sk])
> >> > > > G2:  Aggregate(groupBy=[ctr_customer_sk, ctr_store_sk]
> >> > > >
> >> > > > And this is where I got stuck. I need to do a simple thing -
> >> propagate
> >> > an
> >> > > > optimization request from G1 to G2, informing G2 that it should
> >> > consider
> >> > > > the distribution [ctr_store_sk]. I can deliver that request to my
> >> > physical
> >> > > > template in G2 through "convert". But the problem is that the
> >> current
> >> > > > Calcite implementation doesn't allow me to satisfy this request
> >> later
> >> > on in
> >> > > > the derivation stage. Instead, I am forced to emit the final
> >> execution
> >> > tree
> >> > > > from the "passThrough" method, which will not be notified at the
> >> > derivation
> >> > > > stage. I prepared a scheme [4] that demonstrates the problem.
> >> > > >
> >> > > > It feels that I almost achieved what I need. The last step is to
> >> ensure
> >> > > > that "derive" is called on the newly created template. And this is
> >> > where I
> >> > > > think I reach the inflexibility of the current top-down optimizer
> >> > > > implementation. The current design forces us to define all possible
> >> > > > structures of physical operators in advance, but I want to delay the
> >> > > > decision to the derive stage when input traits are known because
> >> these
> >> > > > traits are essential to make the proper physical decisions.
> >> > > >
> >> > > > There are some similarities with Haisheng's proposal about the
> >> Window
> >> > > > operator. We also maintain the 1-1 correspondence between the
> >> logical
> >> > > > operator and a physical template. However, Haisheng's proposal is
> >> > basically
> >> > > > heuristic, as we split optimization into two phases (implementation,
> >> > > > post-processing). It is impossible to properly calculate the cost of
> >> > the
> >> > > > Window operator because we do not know which exchanges would be
> >> needed
> >> > > > before the post-processing. In my case, we do the proper cost
> >> > estimation
> >> > > > within a single expanded MEMO.
> >> > > >
> >> > > > Now switching to theoretical considerations. We may make several
> >> > > > observations from the previous discussion:
> >> > > > 1) Our ideas converge to the solution where every logical operator
> >> has
> >> > a
> >> > > > single corresponding physical operator, which is later expanded into
> >> > more
> >> > > > alternatives.
> >> > > > 2) Optimization requests are basically sent to RelSet-s, not
> >> > RelSubset-s,
> >> > > > as we make pairwise comparisons between the requested RelSubset and
> >> > other
> >> > > > subsets in the set [5][6].
> >> > > > 3) Irrespective of the design, the complete exploration requires
> >> > multiple
> >> > > > invocations of some implementation logic for different combinations
> >> of
> >> > > > required traits and available input traits.
> >> > > >
> >> > > > These observations led me to think that maybe trait propagation
> >> through
> >> > > > some dedicated nodes (templates in my case and Haisheng's Window
> >> > proposal,
> >> > > > or pessimistically emitted physical nodes in the previous
> >> > Jinpeng/Haisheng
> >> > > > proposal) is not the ideal design, at least for some cases.
> >> > > >
> >> > > > From the design standpoint, we propagate traits top-down and
> >> bottom-up
> >> > > > across equivalence groups, not individual RelSubset-s or RelNode-s.
> >> > > > Currently, we ignore the optimization context when optimizing the
> >> group
> >> > > > (except for the cost pruning). Rules emit partially constructed
> >> nodes
> >> > since
> >> > > > neither parent requirements nor child traits are available to the
> >> rule.
> >> > > >
> >> > > > Instead, there could exist a true guided top-down optimization flow
> >> > when
> >> > > > the "guided" term applies to rules as well:
> >> > > > 1. Pass-through: RelSet receives an optimization request and chooses
> >> > > > appropriate implementation rules to fire. A rule receives
> >> optimization
> >> > > > requests, constructs optimization requests for children (adjusting
> >> > traits,
> >> > > > optimization budget, etc.), then sends these requests down. The
> >> process
> >> > > > repeated recursively until we either reach the bottom node or some
> >> set
> >> > that
> >> > > > is already optimized for this request.
> >> > > > 3. Derive: given the now known input traits, emit appropriate
> >> physical
> >> > > > nodes from the rule. Then notify the parent. Repeat the process
> >> > recursively.
> >> > > >
> >> > > > For common use cases, this design would require the same logic,
> >> which
> >> > is
> >> > > > currently split between rules, "derive" and "passThrough", just the
> >> > code
> >> > > > location will be different, as everything now converges to the rule.
> >> > But
> >> > > > for the advanced use cases, that approach may allow for more
> >> flexible
> >> > > > optimization patterns, like for these two chained aggregates.
> >> > > >
> >> > > > I'll try to implement both solutions - (1) emit multiple nodes from
> >> a
> >> > > > physical rule, and (2) enable derivation for some nodes emitted from
> >> > > > "passThrough", and share the results here.
> >> > > >
> >> > > > Regards,
> >> > > > Vladimir.
> >> > > >
> >> > > > [1]
> >> > > >
> >> >
> >> https://github.com/dremio/dremio-oss/blob/8e85901e7222c81ccad3436ba9b63485503472ac/sabot/kernel/src/main/java/com/dremio/exec/planner/physical/HashAggPrule.java#L123-L146
> >> > > > [2]
> >> > > >
> >> >
> >> https://github.com/apache/flink/blob/release-1.13.0/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashAggRule.scala#L101-L147
> >> > > > [3] https://github.com/Agirish/tpcds/blob/master/query1.sql
> >> > > > [4]
> >> > > >
> >> >
> >> https://docs.google.com/document/d/1u7V4bNp51OjytKnS2kzD_ikHLiNUPbhjZfRjkTfdsDA/edit
> >> > > > [5]
> >> > > >
> >> >
> >> https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java#L196
> >> > > > [6]
> >> > > >
> >> >
> >> https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java#L203
> >> > > >
> >> > > > пт, 28 мая 2021 г. в 00:10, Haisheng Yuan <hy...@apache.org>:
> >> > > >
> >> > > > > Getting back to your window query example:
> >> > > > >
> >> > > > > > Consider the Window function:
> >> > > > > > SELECT
> >> > > > > >   AGG1 over (partition by a),
> >> > > > > >   AGG2 over (partition by b),
> >> > > > > >   AGG3 over (partition by c),
> >> > > > > >   ...
> >> > > > > > FROM input
> >> > > > >
> >> > > > > Window is quite special because the logical vs physical operator
> >> > count is
> >> > > > > not 1 to 1, generally we generate a physical window operator for
> >> each
> >> > > > > window function with different partition column. That determines
> >> > that once
> >> > > > > the physical operators are created, their order can't be changed.
> >> > Hence
> >> > > > > your proposal of passing required traits to physical rule can
> >> > mitigate the
> >> > > > > problem.
> >> > > > >
> >> > > > > But things would be much easier if we define a different physical
> >> > window
> >> > > > > operator.
> >> > > > > For the above query, we can generate the *Single* physical window
> >> > operator
> >> > > > > like this:
> >> > > > > PhysicalWindow[AGG1 over (partition by a), AGG2 over (partition by
> >> > b),
> >> > > > > AGG3 over (partition by c)]
> >> > > > > or PhysicalWindow(a, b, c) for brevity.
> >> > > > > How do we define the physical properties for it?
> >> > > > > The operator delivers hash distribution on first window partition
> >> > column
> >> > > > > a, but requires its child input to be hash distributed by its last
> >> > window
> >> > > > > partition column c.
> >> > > > >
> >> > > > > If the parent operator request hash distribution on b, or c, the
> >> > window
> >> > > > > operator will be called on "passthrough" method and generate
> >> > > > > PhysicalWindow(b, a, c), or PhysicalWindow(c, a, b). After final
> >> > plan is
> >> > > > > generated, during post processing, we can replace the window
> >> > operator with
> >> > > > > multiple layer nested window operators, and insert Exchange
> >> > operators if
> >> > > > > necessary. But frankly speaking, I haven't seen any use cases of
> >> > this kind
> >> > > > > in production.
> >> > > > >
> >> > > > > Regarding the rule alternative you proposed;
> >> > > > > > class PhysicalAggregateRule extends PhysicalRule {
> >> > > > > >  void onMatch(RelOptRuleCall call, *RelTraitSet requiredTraits*)
> >> > {...
> >> > > > >
> >> > > > > Consider the following plan:
> >> > > > > InnerJoin (on a)
> >> > > > >   +-- Agg (on b)
> >> > > > >   +-- Scan
> >> > > > >
> >> > > > > For the inner join, we can generate sort merge join and hash join.
> >> > > > > The sort merge join can request the following traits to Agg:
> >> > > > > 1) Singleton
> >> > > > > 2) hash distribution on a, sorted by a
> >> > > > > The hash join can request the following traits to Agg:
> >> > > > > 1) Singleton
> >> > > > > 2) hash distribution on a
> >> > > > > 3) any distribution
> >> > > > > 4) broadcast distribution
> >> > > > >
> >> > > > > The PhysicalAggregateRule will be called and executed 5 times,
> >> while
> >> > > > > generating the same physical aggregate candidates, unless we pass
> >> a
> >> > whole
> >> > > > > list of required traits to the physical rule, which I have
> >> > prototyped some
> >> > > > > time ago with the exact idea.
> >> > > > >
> >> > > > > Regards,
> >> > > > > Haisheng Yuan
> >> > > > >
> >> > > > > On 2021/05/27 17:44:23, Haisheng Yuan <hy...@apache.org> wrote:
> >> > > > > > >    In distributed systems, an implementation rule may produce
> >> > different
> >> > > > > > >    physical operators depending on the input traits. Examples
> >> are
> >> > > > > Aggregate,
> >> > > > > > >    Sort, Window.
> >> > > > > >
> >> > > > > > No, in most cases, physical operators are generated regardless
> >> the
> >> > > > > input, because the input traits are not know yet. Window might be
> >> an
> >> > > > > exception.
> >> > > > > >
> >> > > > > > >    Since input traits are not known when the rule is fired, we
> >> > must
> >> > > > > > >    generate *all possible combinations* of physical operators
> >> > that we
> >> > > > > may
> >> > > > > > >    need. For LogicalAggregate, we must generate 1-phase and
> >> > 2-phase
> >> > > > > > >    alternatives. For LogicalSort, we also have 1-phase and
> >> > 2-phase
> >> > > > > > >    alternatives. Etc.
> >> > > > > >
> >> > > > > > IMHO, 1 phase and 2 phase are just different logical
> >> alternatives,
> >> > that
> >> > > > > is also why I call it a logical rule to split the aggregate into
> >> a 2
> >> > phase
> >> > > > > aggregate. But HashAggregate and StreamAggregate are indeed the
> >> > different
> >> > > > > physical alternatives for a LogicalAggregate.
> >> > > > > >
> >> > > > > >
> >> > > > > > >   Unlike Aggregate or Sort, which may have only 1 or 2 phases,
> >> > certain
> >> > > > > > >   logical operators may have many physical alternatives.
> >> > Consider the
> >> > > > > Window
> >> > > > > > >   function:......
> >> > > > > >
> >> > > > > > In window implementation rule, when building physical operator
> >> for
> >> > > > > Window that has multiple window functions but with different
> >> > partition
> >> > > > > columns, we can infer the possible traits that can be delivered by
> >> > input
> >> > > > > operators by creating your own RelMetaData, hence multiple window
> >> > > > > combination with certain order, but not exhausted enumeration. In
> >> > fact, the
> >> > > > > window ordering problem exists in every different kind of
> >> optimizer.
> >> > > > > >
> >> > > > > > > As input traits are not known when the rule is fired, the
> >> nodes
> >> > emitted
> >> > > > > > > from the implementation rules most likely would not be used in
> >> > the
> >> > > > > final
> >> > > > > > > plan.
> >> > > > > >
> >> > > > > > That is quite normal, any operator generated by implementation
> >> rule
> >> > > > > might not be used in the final plan, because there may be tens of
> >> > thousands
> >> > > > > of alternatives, we only choose the one with lowest cost.
> >> > > > > >
> >> > > > > > > For example, I can create a physical aggregate that demands
> >> > > > > > > non-strict distribution {a,b} from its input, meaning that
> >> both
> >> > [a,b]
> >> > > > > and
> >> > > > > > > [b,a] is ok. However, in the final plan, we are obligated to
> >> > have a
> >> > > > > strict
> >> > > > > > > distribution - either [a, b] in that order, or [b, a] in that
> >> > order -
> >> > > > > > > otherwise, physical operators like Join and Union will not
> >> work.
> >> > > > > >
> >> > > > > > It depends on your own satisfaction model and how do you
> >> coordinate
> >> > > > > property requirement among child operators. Unlike Orca optimizer,
> >> > where
> >> > > > > there is exact match, partial satisfying, orderless match etc,
> >> > Calcite's
> >> > > > > default implementation always require exact satisfying. But we can
> >> > still
> >> > > > > make use of "passThrough" and "derive" to achieve our goal. i.e.
> >> the
> >> > > > > aggregate generated by implementation rule requires itself and its
> >> > child to
> >> > > > > delivered distribution on [a,b], but the "derive" method tells
> >> > Aggregate
> >> > > > > that [b,a] is available, it can generate another option to require
> >> > [b,a]
> >> > > > > instead.
> >> > > > > >
> >> > > > > > > In distributed engines, the nodes emitted from rules are
> >> > basically
> >> > > > > "templates"
> >> > > > > > > that must be replaced with normal nodes.
> >> > > > > >
> >> > > > > > There is no difference between distributed and non-distributed
> >> > engines
> >> > > > > when dealing with this. In Orca and CockroachDB optimizer, the
> >> nodes
> >> > > > > emitted from rules are operators without physical properties, the
> >> > optimizer
> >> > > > > then request physical properties in top-down manner, either
> >> > recursively or
> >> > > > > stack, or state machine. Calcite is quite different. when the
> >> > physical
> >> > > > > operator is generated by implementation rule, the physical
> >> operator
> >> > must
> >> > > > > has its own traits, at the same time, the traits that it expects
> >> its
> >> > child
> >> > > > > operators to deliver. So in Calcite, they are not "templates". The
> >> > > > > difference is there since Calcite's inception.
> >> > > > > >
> >> > > > > > Regards,
> >> > > > > > Haisheng Yuan
> >> > > > > >
> >> > > > > > On 2021/05/27 08:59:33, Vladimir Ozerov <ppoze...@gmail.com>
> >> > wrote:
> >> > > > > > > Hi Haisheng,
> >> > > > > > >
> >> > > > > > > Thank you for your inputs. They are really helpful. Let me
> >> > summarize
> >> > > > > your
> >> > > > > > > feedback in my own words to verify that I understand it
> >> > correctly.
> >> > > > > > >
> >> > > > > > >    1. In distributed systems, an implementation rule may
> >> produce
> >> > > > > different
> >> > > > > > >    physical operators depending on the input traits. Examples
> >> are
> >> > > > > Aggregate,
> >> > > > > > >    Sort, Window.
> >> > > > > > >    2. Since input traits are not known when the rule is fired,
> >> > we must
> >> > > > > > >    generate *all possible combinations* of physical operators
> >> > that we
> >> > > > > may
> >> > > > > > >    need. For LogicalAggregate, we must generate 1-phase and
> >> > 2-phase
> >> > > > > > >    alternatives. For LogicalSort, we also have 1-phase and
> >> > 2-phase
> >> > > > > > >    alternatives. Etc.
> >> > > > > > >    3. If all combinations are generated, it is expected that
> >> > > > > "passThrough"
> >> > > > > > >    and "derive" would be just trivial replacements of traits
> >> for
> >> > most
> >> > > > > cases.
> >> > > > > > >    This is why "passThroughTraits" and "deriveTraits" are
> >> > recommended.
> >> > > > > A
> >> > > > > > >    notable exception is TableScan that may emit alternative
> >> > indexes in
> >> > > > > > >    response to the pass-through requests.
> >> > > > > > >
> >> > > > > > > If my understanding is correct, then there are several issues
> >> > with this
> >> > > > > > > approach still.
> >> > > > > > >
> >> > > > > > > 1. Unlike Aggregate or Sort, which may have only 1 or 2
> >> phases,
> >> > certain
> >> > > > > > > logical operators may have many physical alternatives.
> >> Consider
> >> > the
> >> > > > > Window
> >> > > > > > > function:
> >> > > > > > > SELECT
> >> > > > > > >   AGG1 over (partition by a),
> >> > > > > > >   AGG2 over (partition by b),
> >> > > > > > >   AGG3 over (partition by c),
> >> > > > > > >   ...
> >> > > > > > > FROM input
> >> > > > > > >
> >> > > > > > > To calculate each aggregate, we need to re-shuffle the input
> >> > based on
> >> > > > > the
> >> > > > > > > partition key. The key question is the order of reshuffling.
> >> If
> >> > the
> >> > > > > input
> >> > > > > > > is shared by [a], I want to calculate AGG1 locally and then
> >> > re-shuffle
> >> > > > > the
> >> > > > > > > input to calculate other aggregates. For the remaining AGG2
> >> and
> >> > AGG3,
> >> > > > > the
> >> > > > > > > order is also important. If the parent demands sharding by
> >> [b],
> >> > then
> >> > > > > the
> >> > > > > > > proper sequence is b-c-a:
> >> > > > > > > 1: Window[AGG2 over (partition by b)]     // SHARDED[b]
> >> > > > > > > 2:   Window[AGG3 over (partition by c)]   // SHARDED[c]
> >> > > > > > > 3:     Window[AGG1 over (partition by a)] // SHARDED[a]
> >> > > > > > > 4:       Input                            // SHARDED[a]
> >> > > > > > >
> >> > > > > > > But if the parent demands [c], the proper sequence is c-b-a.
> >> > Since we
> >> > > > > do
> >> > > > > > > not know real distributions when the rule is fired, we must
> >> emit
> >> > all
> >> > > > > the
> >> > > > > > > permutations to ensure that no optimization opportunity is
> >> > missed. But
> >> > > > > with
> >> > > > > > > complex window aggregate, this might be impractical because we
> >> > will
> >> > > > > emit
> >> > > > > > > lots of unnecessary nodes.
> >> > > > > > >
> >> > > > > > > 2. As input traits are not known when the rule is fired, the
> >> > nodes
> >> > > > > emitted
> >> > > > > > > from the implementation rules most likely would not be used in
> >> > the
> >> > > > > final
> >> > > > > > > plan. For example, I can create a physical aggregate that
> >> demands
> >> > > > > > > non-strict distribution {a,b} from its input, meaning that
> >> both
> >> > [a,b]
> >> > > > > and
> >> > > > > > > [b,a] is ok. However, in the final plan, we are obligated to
> >> > have a
> >> > > > > strict
> >> > > > > > > distribution - either [a, b] in that order, or [b, a] in that
> >> > order -
> >> > > > > > > otherwise, physical operators like Join and Union will not
> >> work.
> >> > In
> >> > > > > > > distributed engines, the nodes emitted from rules are
> >> basically
> >> > > > > "templates"
> >> > > > > > > that must be replaced with normal nodes.
> >> > > > > > >
> >> > > > > > > Does this reasoning make any sense? If yes, it means that the
> >> > current
> >> > > > > > > approach forces us to produce many unnecessary nodes to
> >> explore
> >> > the
> >> > > > > full
> >> > > > > > > search space. The question is whether alternative approaches
> >> > could
> >> > > > > better
> >> > > > > > > fit the requirements of the distributed engine? This is a
> >> purely
> >> > > > > > > theoretical question. I am currently looking deeper at
> >> > CockroachDB.
> >> > > > > They
> >> > > > > > > have very different architecture: no separation between
> >> logical
> >> > and
> >> > > > > > > physical nodes, physical properties are completely decoupled
> >> from
> >> > > > > nodes,
> >> > > > > > > usage of recursion instead of the stack, etc.
> >> > > > > > >
> >> > > > > > > Regards,
> >> > > > > > > Vladimir.
> >> > > > > > >
> >> > > > > > > чт, 27 мая 2021 г. в 03:19, Haisheng Yuan <hy...@apache.org>:
> >> > > > > > >
> >> > > > > > > > Another point I would like to mention is that it is not
> >> > recommended
> >> > > > > to
> >> > > > > > > > override method "passThrough" and "derive" directly,
> >> override
> >> > > > > > > > "passThroughTraits" and "deriveTraits" instead, so that we
> >> can
> >> > make
> >> > > > > sure
> >> > > > > > > > only the same type of physical node is created and no nested
> >> > > > > relnodes or
> >> > > > > > > > additional RelSets are created, unless you know you have to
> >> > create
> >> > > > > > > > different type of nodes. For example, if the table foo has
> >> an
> >> > btree
> >> > > > > index
> >> > > > > > > > on column a, and the parent relnode is requesting ordering
> >> on
> >> > column
> >> > > > > a,
> >> > > > > > > > then we may consider to override "passThrough" of TableScan
> >> to
> >> > > > > return an
> >> > > > > > > > IndexScan instead of a TableScan.
> >> > > > > > > >
> >> > > > > > > > Regards,
> >> > > > > > > > Haisheng Yuan
> >> > > > > > > > On 2021/05/26 22:45:20, Haisheng Yuan <hy...@apache.org>
> >> > wrote:
> >> > > > > > > > > Hi Vladimir,
> >> > > > > > > > >
> >> > > > > > > > > 1. You need a logical rule to split the aggregate into a
> >> > local
> >> > > > > aggregate
> >> > > > > > > > and global aggregate, for example:
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > >
> >> >
> >> https://github.com/greenplum-db/gporca/blob/master/libgpopt/src/xforms/CXformSplitGbAgg.cpp
> >> > > > > > > > > Only implementation rules can convert a logical node to a
> >> > physical
> >> > > > > node
> >> > > > > > > > or multiple physical nodes.
> >> > > > > > > > > After physical implementation, you have 2 physical
> >> > alternatives:
> >> > > > > > > > > 1) single phase global physical aggregate,
> >> > > > > > > > > 2) 2 phase physical aggregate with local and global
> >> > aggregate.
> >> > > > > > > > > It should be up to the cost to decide which one to choose.
> >> > > > > > > > >
> >> > > > > > > > > 2. Given a desired traitset from parent node, the current
> >> > relnode
> >> > > > > only
> >> > > > > > > > needs to generate a single relnode after passing down the
> >> > traitset.
> >> > > > > Given a
> >> > > > > > > > traitset delivered by child node, the current relnode only
> >> > derive a
> >> > > > > single
> >> > > > > > > > relnode. Quite unlike other optimizer, in Calcite's top-down
> >> > > > > optimizer, you
> >> > > > > > > > don't need to worry about issuing multiple optimization
> >> > requests to
> >> > > > > inputs,
> >> > > > > > > > which is handled by Calcite framework secretly. i.e.
> >> > > > > > > > > SELECT a, b, min(c) from foo group by a, b;
> >> > > > > > > > > In many other optimizer, we probably need ask the
> >> aggregate
> >> > to
> >> > > > > issue 3
> >> > > > > > > > distribution requests for tablescan on foo, which are
> >> > > > > > > > > 1) hash distributed by a,
> >> > > > > > > > > 2) hash distributed by b,
> >> > > > > > > > > 3) hash distributed by a, b
> >> > > > > > > > > However in Calcite top-down optimizer, your physical
> >> > > > > implementation rule
> >> > > > > > > > for global aggregate only need generate a single physical
> >> node
> >> > with
> >> > > > > hash
> >> > > > > > > > distribution by a, b. In case the table foo happens to be
> >> > > > > distributed by a,
> >> > > > > > > > or b, the derive() method will tell you there is an
> >> > opportunity.
> >> > > > > This is
> >> > > > > > > > the feature that Calcite's top-down optimizer excels over
> >> other
> >> > > > > optimizers,
> >> > > > > > > > because this can dramatically reduce the search space while
> >> > keeping
> >> > > > > the
> >> > > > > > > > optimal optimization opportunity.
> >> > > > > > > > >
> >> > > > > > > > > 3. This is by design. Nodes produced from "passThrough"
> >> and
> >> > > > > "derive" and
> >> > > > > > > > just sibling physical node with different traitset, we only
> >> > need the
> >> > > > > > > > initial physical nodes after implementation to avoid
> >> > unnecessary
> >> > > > > > > > operations. The fundamental reason is, unlike Orca optimizer
> >> > where
> >> > > > > physical
> >> > > > > > > > node and physical property are separate things, Calcite's
> >> > > > > logical/physical
> >> > > > > > > > nodes contains traitset. With regard to the latter question,
> >> > can you
> >> > > > > give
> >> > > > > > > > an example?
> >> > > > > > > > >
> >> > > > > > > > > Regards,
> >> > > > > > > > > Haisheng Yuan
> >> > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > > > On 2021/05/26 20:11:57, Vladimir Ozerov <
> >> ppoze...@gmail.com>
> >> > > > > wrote:
> >> > > > > > > > > > Hi,
> >> > > > > > > > > >
> >> > > > > > > > > > I tried to optimize a certain combination of operators
> >> for
> >> > the
> >> > > > > > > > distributed
> >> > > > > > > > > > engine and got stuck with the trait propagation in the
> >> > top-down
> >> > > > > > > > engine. I
> >> > > > > > > > > > want to ask the community for advice on whether the
> >> > problem is
> >> > > > > solvable
> >> > > > > > > > > > with the current Apache Calcite implementation or not.
> >> > > > > > > > > >
> >> > > > > > > > > > Consider the following logical tree:
> >> > > > > > > > > > 3: LogicalAggregate[group=[a], F2(c)]
> >> > > > > > > > > > 2:  LogicalAggregate[group=[a,b], F1(c)]
> >> > > > > > > > > > 1:    LogicalScan[t]
> >> > > > > > > > > >
> >> > > > > > > > > > Consider that these two aggregates cannot be merged or
> >> > > > > simplified for
> >> > > > > > > > > > whatever reason. We have only a set of physical rules to
> >> > > > > translate this
> >> > > > > > > > > > logical tree to a physical tree. Also, there could be
> >> any
> >> > number
> >> > > > > of
> >> > > > > > > > > > other operators between these two aggregates. We omit
> >> them
> >> > for
> >> > > > > clarity,
> >> > > > > > > > > > assuming that the distribution is not destroyed.
> >> > > > > > > > > >
> >> > > > > > > > > > In the distributed environment, non-collocated
> >> aggregates
> >> > are
> >> > > > > often
> >> > > > > > > > > > implemented in two phases: local pre-aggregation and
> >> final
> >> > > > > aggregation,
> >> > > > > > > > > > with an exchange in between. Consider that the Scan
> >> > operator is
> >> > > > > hash
> >> > > > > > > > > > distributed by some key other than [a] or [b]. If we
> >> > optimize
> >> > > > > operators
> >> > > > > > > > > > without considering the whole plan, we may optimize each
> >> > operator
> >> > > > > > > > > > independently, which would give us the following plan:
> >> > > > > > > > > > 3: PhysicalAggregate[group=[a], F2_phase2(c)]
> >> >  //
> >> > > > > > > > > > HASH_DISTRIBUTED [a]
> >> > > > > > > > > > 3:   Exchange[a]
> >> > //
> >> > > > > > > > > > HASH_DISTRIBUTED [a]
> >> > > > > > > > > > 3:     PhysicalAggregate[group=[a], F2_phase1(c)]
> >> >  //
> >> > > > > > > > > > HASH_DISTRIBUTED [a,b]
> >> > > > > > > > > > 2:       PhysicalAggregate[group=[a,b], F1_phase2(c)]
> >> >  //
> >> > > > > > > > > > HASH_DISTRIBUTED [a,b]
> >> > > > > > > > > > 2:         Exchange[a, b]
> >> >  //
> >> > > > > > > > > > HASH_DISTRIBUTED [a,b]
> >> > > > > > > > > > 2:           PhysicalAggregate[group=[a,b],
> >> F1_phase1(c)]
> >> > //
> >> > > > > > > > > > HASH_DISTRIBUTED [d]
> >> > > > > > > > > > 1:             PhysicalScan[t]
> >> > //
> >> > > > > > > > > > HASH_DISTRIBUTED [d]
> >> > > > > > > > > >
> >> > > > > > > > > > This plan is not optimal, because we re-hash inputs
> >> twice.
> >> > A
> >> > > > > better
> >> > > > > > > > plan
> >> > > > > > > > > > that we want to get:
> >> > > > > > > > > > 3: PhysicalAggregate[group=[a], F2(c)]                //
> >> > > > > > > > HASH_DISTRIBUTED
> >> > > > > > > > > > [a]
> >> > > > > > > > > > 2:   PhysicalAggregate[group=[a,b], F1_phase2(c)]     //
> >> > > > > > > > HASH_DISTRIBUTED
> >> > > > > > > > > > [a]
> >> > > > > > > > > > 2:     Exchange[a]                                    //
> >> > > > > > > > HASH_DISTRIBUTED
> >> > > > > > > > > > [a]
> >> > > > > > > > > > 2:       PhysicalAggregate[group=[a,b], F1_phase1(c)] //
> >> > > > > > > > HASH_DISTRIBUTED
> >> > > > > > > > > > [d]
> >> > > > > > > > > > 1:         PhysicalScan[t]                            //
> >> > > > > > > > HASH_DISTRIBUTED
> >> > > > > > > > > > [d]
> >> > > > > > > > > >
> >> > > > > > > > > > In this case, we take advantage of the fact that the
> >> > > > > distribution [a]
> >> > > > > > > > is
> >> > > > > > > > > > compatible with [a,b]. Therefore we may enforce only
> >> [a],
> >> > > > > instead of
> >> > > > > > > > doing
> >> > > > > > > > > > [a,b] and then [a]. Since exchange operators are very
> >> > expensive,
> >> > > > > this
> >> > > > > > > > > > optimization may bring a significant boost to the query
> >> > engine.
> >> > > > > Now the
> >> > > > > > > > > > question - how do we reach that state? Intuitively, a
> >> > > > > pass-through is
> >> > > > > > > > > > exactly what we need. We may pass the optimization
> >> request
> >> > from
> >> > > > > top
> >> > > > > > > > > > aggregate to bottom aggregate to find physical
> >> > implementations
> >> > > > > shared
> >> > > > > > > > by
> >> > > > > > > > > > [a]. But the devil is in the details - when and how
> >> > exactly to
> >> > > > > pass
> >> > > > > > > > this
> >> > > > > > > > > > request?
> >> > > > > > > > > >
> >> > > > > > > > > > Typically, we have a conversion rule that converts a
> >> > logical
> >> > > > > aggregate
> >> > > > > > > > to a
> >> > > > > > > > > > physical aggregate. We may invoke "convert" on the
> >> input to
> >> > > > > initiate
> >> > > > > > > > the
> >> > > > > > > > > > pass-through:
> >> > > > > > > > > >
> >> > > > > > > > > > RelNode convert(...) {
> >> > > > > > > > > >     return new PhysicalAggregate(
> >> > > > > > > > > >         convert(input, HASH_DISTRIBUTED[a])
> >> > > > > > > > > >     )
> >> > > > > > > > > > }
> >> > > > > > > > > >
> >> > > > > > > > > > The first problem - we cannot create the normal physical
> >> > > > > aggregate here
> >> > > > > > > > > > because we do not know input traits yet. The final
> >> decision
> >> > > > > whether to
> >> > > > > > > > do a
> >> > > > > > > > > > one-phase or two-phase aggregate can be made only in the
> >> > > > > > > > > > "PhysicalNode.derive" method when concrete input traits
> >> are
> >> > > > > resolved.
> >> > > > > > > > > > Therefore the converter rule should create a kind of
> >> > "template"
> >> > > > > > > > physical
> >> > > > > > > > > > operator, which would be used to construct the final
> >> > operator(s)
> >> > > > > when
> >> > > > > > > > input
> >> > > > > > > > > > traits are resolved. AFAIU Enumerable works similarly:
> >> we
> >> > create
> >> > > > > > > > operators
> >> > > > > > > > > > with virtually arbitrary traits taken from logical nodes
> >> > in the
> >> > > > > > > > conversion
> >> > > > > > > > > > rules. We only later do create normal nodes in the
> >> derive()
> >> > > > > methods.
> >> > > > > > > > > >
> >> > > > > > > > > > The second problem - our top aggregate doesn't actually
> >> > need the
> >> > > > > > > > > > HASH_DISTRIBUTED[a] input. Instead, it may accept inputs
> >> > with any
> >> > > > > > > > > > distribution. What we really need is to inform the input
> >> > (bottom
> >> > > > > > > > aggregate)
> >> > > > > > > > > > that it should look for additional implementations that
> >> > satisfy
> >> > > > > > > > > > HASH_DISTRIBUTED[a]. Therefore, enforcing a specific
> >> > > > > distribution on
> >> > > > > > > > the
> >> > > > > > > > > > input using the "convert" method is not what we need
> >> > because this
> >> > > > > > > > > > conversion might enforce unnecessary exchanges.
> >> > > > > > > > > >
> >> > > > > > > > > > The third problem - derivation. Consider that we
> >> delivered
> >> > the
> >> > > > > > > > optimization
> >> > > > > > > > > > request to the bottom aggregate. As an implementor, what
> >> > am I
> >> > > > > supposed
> >> > > > > > > > to
> >> > > > > > > > > > do in this method? I cannot return the final aggregate
> >> > from here
> >> > > > > > > > because
> >> > > > > > > > > > the real input traits are not derived yet. Therefore, I
> >> > can only
> >> > > > > return
> >> > > > > > > > > > another template, hoping that the "derive" method will
> >> be
> >> > called
> >> > > > > on it.
> >> > > > > > > > > > However, this will not happen because trait derivation
> >> is
> >> > > > > skipped on
> >> > > > > > > > the
> >> > > > > > > > > > nodes emitted from pass-through. See
> >> "DeriveTrait.perform"
> >> > [1].
> >> > > > > > > > > >
> >> > > > > > > > > > BottomAggregate {
> >> > > > > > > > > >     RelNode
> >> passThrough(distribution=HASH_DISTRIBUTED[a]) {
> >> > > > > > > > > >         // ???
> >> > > > > > > > > >     }
> >> > > > > > > > > > }
> >> > > > > > > > > >
> >> > > > > > > > > > I feel that I am either going in the wrong direction, or
> >> > some
> >> > > > > gaps in
> >> > > > > > > > the
> >> > > > > > > > > > product disallow such optimization. So I would like to
> >> ask
> >> > the
> >> > > > > > > > community to
> >> > > > > > > > > > assist with the following questions:
> >> > > > > > > > > > 1. In the top-down optimizer, how should we convert a
> >> > logical
> >> > > > > node to a
> >> > > > > > > > > > physical node, provided that "derive" is not called
> >> yet? I
> >> > have
> >> > > > > a gut
> >> > > > > > > > > > feeling that the trait propagation is currently not
> >> > implemented
> >> > > > > to the
> >> > > > > > > > full
> >> > > > > > > > > > extent because based on Cascades paper I would expect
> >> that
> >> > parent
> >> > > > > > > > physical
> >> > > > > > > > > > nodes are produced after the child physical nodes. But
> >> in
> >> > our
> >> > > > > rules,
> >> > > > > > > > this
> >> > > > > > > > > > is not the case - some physical nodes are produced
> >> before
> >> > the
> >> > > > > trait
> >> > > > > > > > > > derivation.
> >> > > > > > > > > > 2. How to propagate several optimization requests to
> >> > inputs? We
> >> > > > > need
> >> > > > > > > > either
> >> > > > > > > > > > inputs with a specific distribution or inputs with an
> >> > arbitrary
> >> > > > > > > > > > distribution in the example above. It seems that to
> >> achieve
> >> > > > > that, I
> >> > > > > > > > need to
> >> > > > > > > > > > emit several alternative nodes with different
> >> requirements
> >> > to
> >> > > > > inputs.
> >> > > > > > > > Does
> >> > > > > > > > > > it make sense?
> >> > > > > > > > > > 3. Why are nodes produced from the "passThrough" method
> >> > excluded
> >> > > > > from
> >> > > > > > > > trait
> >> > > > > > > > > > derivation? If this is by design, how can I preserve the
> >> > > > > optimization
> >> > > > > > > > > > request to satisfy it on the derivation stage when input
> >> > traits
> >> > > > > are
> >> > > > > > > > > > available?
> >> > > > > > > > > >
> >> > > > > > > > > > Regards,
> >> > > > > > > > > > Vladimir.
> >> > > > > > > > > >
> >> > > > > > > > > > [1]
> >> > > > > > > > > >
> >> > > > > > > >
> >> > > > >
> >> >
> >> https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java#L828
> >> > > > > > > > > >
> >> > > > > > > > >
> >> > > > > > > >
> >> > > > > > >
> >> > > > > >
> >> > > > >
> >> > > >
> >> > >
> >> >
> >>
> >
> 

Reply via email to