> 2) Optimization requests are basically sent to RelSet-s, not RelSubset-s,
> as we make pairwise comparisons between the requested RelSubset and other
> subsets in the set [5][6].

I agree with you. There could be some waste when the new delivered / required 
traitset is generated by "passThrough"/ "derive", in which case, we only need 
enforcer between the pair of subsets, instead of pairing with all other 
required / delivered subsets in the RelSet. i.e.
In the MEMO group, we have 2 required traitsets:
1) Hash[a] Sort[b]
2) Hash[b] Sort[c]

When we try to pass Hash[a] Sort[b] to one of physical operators say Project, 
we found that we can pass down Hash[a] down to its child, then we get a new 
physical Project with traitset Hash[a], we only need enforcer between Hash[a] 
and Hash[a]Sort[b], but currently in method "addConverters", we also generate 
enforcer between Hash[a] and Hash[b]Sort[c], which is not actually what we want.

I think it is definitely worth trying to optimize.

Regards,
Haisheng Yuan
On 2021/05/28 19:15:03, Haisheng Yuan <hy...@apache.org> wrote: 
> Hi Vladimir,
> 
> The top-down optimizer does NOT require implementation rule to generate 1 to 
> 1 physical operator for a logical operator, as you can see, if you generate a 
> 2 phase physical aggregates for the logical aggregate in the implementation 
> rule, it still works. Window is special because we can reshuffle the 
> execution order of window functions, and that order makes a difference 
> according to different parent physical property request. A single converged 
> physical Window operator catered for this speciality. However as I said I 
> don't think it is a common scenario.
> 
> > the whole decision of whether to go with 1-phase or 2-phase
> > aggregate is a physical decision that should be made based on available (or
> > assumed) input traits.
> What is the problem of generating both 1-phase and 2-phase aggregates and 
> choose the best one based on the cost?
> 
> Let's see the following query:
> select a, min(b) from (select * from foo, bar where foo.a=bar.a) t group by a;
> suppose foo is randomly distributed fact table, and bar is randomly 
> distributed dimension table.
> Consider the 2 following plans:
> 1) 
> PhysicalAggregate
>    +-- HashJoin
>               +--  HashDistribute by a
>                          +-- TableScan on foo
>               +--  HashDistribute by a
>                          +-- TableScan on bar
> 
> 2) 
> PhysicalAggregate(global)
>    +--  HashDistribute by a
>             +---- PhysicalAggregate(local)
>                         +---- HashJoin
>                                      +-- TableScan on foo
>                                      +--  Broadcast
>                                                +-- TableScan on bar
> 
> Can you tell that the single phase aggregate plan is always better than the 2 
> phase aggregate plan?
> 
> > Therefore, the typical way to optimize
> > LogicalAggregate is to split in the physical phase (implementation rule,
> > pass-through, derive). Practical systems like Dremio [1] and Flink [2]
> > work this way.
> Dremio and Flink work this way doesn't mean it is a good way. Greenplum Orca 
> and Alibaba MaxCompute optimizer work in another way. In Flink and Dremio, 
> they have HashAggPRule to generate 1 phase HashAgg and 2 phase HashAgg, 
> SortAggPRule to generate 1 phase SortAgg and 2 phase SortAgg. However do you 
> think there is possibility that the global SortAgg combined with local 
> HashAgg, or the global HashAgg combined with local SortAgg may perform better 
> in difference cases? Are you going to generate all the 4 combinations in the 
> implementation rule? There are some cases we found we'd better to split the 
> aggregate into 3 phase aggregate [1], in which case, will the implementation 
> rule generate 3 HashAggs or 3 SortAggs, or all the 6 combinations?
> 
> In our system, we have 1 phase, 2 phase, 3 phase logical aggregate rules to 
> transform the LogicalAggregate to another kind of logical aggregate(s) with 
> phase info, say LogicalXXXAggregate, then our physical aggregate rules match 
> this kind of node to generate HashAgg or StreamAgg. Of course, in the logical 
> rules, we can add business logic to guess the possible traits delivered by 
> child nodes to determine whether the rule definitely won't generate a better 
> alternative and may decide to abort this transformation early. But I would 
> rather let the cost model decide.
> 
> Admittedly, the current top-down optimization is not pure on-demand request 
> oriented, because it will always generate a physical request regardless the 
> parent nodes' trait request. For example the following query in a 
> non-distributed environment:
> select a, b, c, max(d) from foo group by a, b, c order by a desc;
> 
> It will first generate a StreamAgg[a ASC, b ASC, c ASC] no matter what the 
> parent node requires, then the "passThrough" tells StreamAgg that parent 
> requires [a DESC], we get a StreamAgg[a DESC, b ASC, c ASC]. It would be 
> ideal if we only generate StreamAgg[a DESC, b ASC, c ASC] by request, but I 
> don't think that will make much difference, the bottleneck relies on the join 
> order enumeration and the Project related operation.
> 
> Regards,
> Haisheng Yuan
> 
> [1] 
> https://github.com/greenplum-db/gporca/blob/master/libgpopt/src/xforms/CXformSplitDQA.cpp
> 
> On 2021/05/28 09:17:45, Vladimir Ozerov <ppoze...@gmail.com> wrote: 
> > Hi Jinpeng, Haisheng,
> > 
> > Thank you for your inputs. I really appreciate that. Let me try to address
> > some of your comments and share some experience with the implementation of
> > optimizers for a distributed engine I am currently working with.
> > 
> > First of all, I would argue that multiple logical operators do not have a
> > 1-1 mapping to physical operators, and Window is not special here. For
> > instance, LogicalAggregate doesn't have 1-1 mapping to physical aggregates
> > because the physical implementation can be either 1-phase or 2-phase. It
> > doesn't matter that the 2-phase aggregate is a composition of two 1-phase
> > aggregates: the whole decision of whether to go with 1-phase or 2-phase
> > aggregate is a physical decision that should be made based on available (or
> > assumed) input traits.
> > 
> > Consider the following logical tree:
> > LogicalAggregate[group=$0, agg=SUM($1)]
> >   Input
> > 
> > If I do the split on the logical phase with a separate transformation rule,
> > I will get the following tree:
> > LogicalAggregate[group=$0, agg=SUM($1)]
> >   LogicalAggregate[group=$0, agg=SUM($1)]
> >     Input
> > 
> > Now we have an infinite loop because the rule takes one aggregate and
> > produces two aggregates. To fix that, we may extend the LogicalAggregate
> > with some flag or so. But this (1) potentially breaks other LogicalAggregate
> > optimizations (e.g., transpose with other operators), and (2) breaks the
> > whole idea of the logical operators because the execution phase
> > (pre-aggregate of final aggregate) is a property of concrete backend, not a
> > property of relational algebra. Therefore, the typical way to optimize
> > LogicalAggregate is to split in the physical phase (implementation rule,
> > pass-through, derive). Practical systems like Dremio [1] and Flink [2]
> > work this way.
> > 
> > That said, as an optimizer developer, I need the flexibility to emit any
> > physical trees for the given logical operator, and 1-1 mapping cannot be
> > assumed. Calcite's API allows for that, and I am not aware of formal
> > documentation or guidelines that discourage that.
> > 
> > Now the question when exactly to emit the operators. Normally, we produce
> > operators from rules. As discussed above, if the logical operator may
> > produce different physical trees depending on input traits, the
> > recommendation is to emit all combinations, even though we do not know
> > whether there would be good inputs for that alternatives. This contradicts
> > the idea of the guided top-down search, where we explore the search space
> > in response to a concrete optimization request, rather than with a
> > pessimistic assumption that a certain plan might be required in the future.
> > 
> > I found a way to mitigate this problem partially. Funny, my solution is
> > almost similar to what Haisheng proposed for the Window operator.
> > 1. For every logical operator, I emit a single physical operator from the
> > implementation rule, maintaining the exact 1-1 mapping. The emitted
> > operators (1) have a special flag "template" which makes their const
> > infinite, (2) never exposes or demands non-default traits except for
> > convention, (3) have OMAKASE derivation mode.
> > 2. When the input is optimized, the "derive" is called on the template,
> > which produces the concrete physical tree, that is not necessarily 1-1 to
> > the original logical node.
> > 
> > Before rule:
> > LogicalAggregate[group=$0, agg=SUM($1)]
> >   LogicalInput
> > 
> > After rule:
> > PhysicalAggregate[group=$0, agg=SUM($1), template=true, cost=infinite]
> >   LogicalInput
> > 
> > After "derive" if the input is not shared on $0:
> > PhysicalAggregate[group=$0, agg=SUM($1)]
> >   Exchange
> >     PhysicalAggregate[group=$0, agg=SUM($1)]
> >       PhysicalInputNotSharded
> > 
> > After "derive" if the input is shared on $0:
> > PhysicalAggregate[group=$0, agg=SUM($1)]
> >   PhysicalInputNotSharded
> > 
> > This approach allows me to avoid the generation of unnecessary alternatives
> > by delaying the optimization to derive phase. The aggregate split is
> > implemented in rules in Dremio/Flink, but in my case, this logic migrates
> > to "derive".
> > 
> > This solution worked well for the whole TPC-DS suite until we wanted to
> > optimize combinations of operators rather than individual operators. A good
> > example is TPC-DS query 1 [3]. During the logical optimization, we get the
> > following logical tree, which is exactly the case that I demonstrated at
> > the beginning of this mail thread:
> > G1: Aggregate(groupBy=[ctr_store_sk])
> > G2:  Aggregate(groupBy=[ctr_customer_sk, ctr_store_sk]
> > 
> > And this is where I got stuck. I need to do a simple thing - propagate an
> > optimization request from G1 to G2, informing G2 that it should consider
> > the distribution [ctr_store_sk]. I can deliver that request to my physical
> > template in G2 through "convert". But the problem is that the current
> > Calcite implementation doesn't allow me to satisfy this request later on in
> > the derivation stage. Instead, I am forced to emit the final execution tree
> > from the "passThrough" method, which will not be notified at the derivation
> > stage. I prepared a scheme [4] that demonstrates the problem.
> > 
> > It feels that I almost achieved what I need. The last step is to ensure
> > that "derive" is called on the newly created template. And this is where I
> > think I reach the inflexibility of the current top-down optimizer
> > implementation. The current design forces us to define all possible
> > structures of physical operators in advance, but I want to delay the
> > decision to the derive stage when input traits are known because these
> > traits are essential to make the proper physical decisions.
> > 
> > There are some similarities with Haisheng's proposal about the Window
> > operator. We also maintain the 1-1 correspondence between the logical
> > operator and a physical template. However, Haisheng's proposal is basically
> > heuristic, as we split optimization into two phases (implementation,
> > post-processing). It is impossible to properly calculate the cost of the
> > Window operator because we do not know which exchanges would be needed
> > before the post-processing. In my case, we do the proper cost estimation
> > within a single expanded MEMO.
> > 
> > Now switching to theoretical considerations. We may make several
> > observations from the previous discussion:
> > 1) Our ideas converge to the solution where every logical operator has a
> > single corresponding physical operator, which is later expanded into more
> > alternatives.
> > 2) Optimization requests are basically sent to RelSet-s, not RelSubset-s,
> > as we make pairwise comparisons between the requested RelSubset and other
> > subsets in the set [5][6].
> > 3) Irrespective of the design, the complete exploration requires multiple
> > invocations of some implementation logic for different combinations of
> > required traits and available input traits.
> > 
> > These observations led me to think that maybe trait propagation through
> > some dedicated nodes (templates in my case and Haisheng's Window proposal,
> > or pessimistically emitted physical nodes in the previous Jinpeng/Haisheng
> > proposal) is not the ideal design, at least for some cases.
> > 
> > From the design standpoint, we propagate traits top-down and bottom-up
> > across equivalence groups, not individual RelSubset-s or RelNode-s.
> > Currently, we ignore the optimization context when optimizing the group
> > (except for the cost pruning). Rules emit partially constructed nodes since
> > neither parent requirements nor child traits are available to the rule.
> > 
> > Instead, there could exist a true guided top-down optimization flow when
> > the "guided" term applies to rules as well:
> > 1. Pass-through: RelSet receives an optimization request and chooses
> > appropriate implementation rules to fire. A rule receives optimization
> > requests, constructs optimization requests for children (adjusting traits,
> > optimization budget, etc.), then sends these requests down. The process
> > repeated recursively until we either reach the bottom node or some set that
> > is already optimized for this request.
> > 3. Derive: given the now known input traits, emit appropriate physical
> > nodes from the rule. Then notify the parent. Repeat the process recursively.
> > 
> > For common use cases, this design would require the same logic, which is
> > currently split between rules, "derive" and "passThrough", just the code
> > location will be different, as everything now converges to the rule. But
> > for the advanced use cases, that approach may allow for more flexible
> > optimization patterns, like for these two chained aggregates.
> > 
> > I'll try to implement both solutions - (1) emit multiple nodes from a
> > physical rule, and (2) enable derivation for some nodes emitted from
> > "passThrough", and share the results here.
> > 
> > Regards,
> > Vladimir.
> > 
> > [1]
> > https://github.com/dremio/dremio-oss/blob/8e85901e7222c81ccad3436ba9b63485503472ac/sabot/kernel/src/main/java/com/dremio/exec/planner/physical/HashAggPrule.java#L123-L146
> > [2]
> > https://github.com/apache/flink/blob/release-1.13.0/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashAggRule.scala#L101-L147
> > [3] https://github.com/Agirish/tpcds/blob/master/query1.sql
> > [4]
> > https://docs.google.com/document/d/1u7V4bNp51OjytKnS2kzD_ikHLiNUPbhjZfRjkTfdsDA/edit
> > [5]
> > https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java#L196
> > [6]
> > https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java#L203
> > 
> > пт, 28 мая 2021 г. в 00:10, Haisheng Yuan <hy...@apache.org>:
> > 
> > > Getting back to your window query example:
> > >
> > > > Consider the Window function:
> > > > SELECT
> > > >   AGG1 over (partition by a),
> > > >   AGG2 over (partition by b),
> > > >   AGG3 over (partition by c),
> > > >   ...
> > > > FROM input
> > >
> > > Window is quite special because the logical vs physical operator count is
> > > not 1 to 1, generally we generate a physical window operator for each
> > > window function with different partition column. That determines that once
> > > the physical operators are created, their order can't be changed. Hence
> > > your proposal of passing required traits to physical rule can mitigate the
> > > problem.
> > >
> > > But things would be much easier if we define a different physical window
> > > operator.
> > > For the above query, we can generate the *Single* physical window operator
> > > like this:
> > > PhysicalWindow[AGG1 over (partition by a), AGG2 over (partition by b),
> > > AGG3 over (partition by c)]
> > > or PhysicalWindow(a, b, c) for brevity.
> > > How do we define the physical properties for it?
> > > The operator delivers hash distribution on first window partition column
> > > a, but requires its child input to be hash distributed by its last window
> > > partition column c.
> > >
> > > If the parent operator request hash distribution on b, or c, the window
> > > operator will be called on "passthrough" method and generate
> > > PhysicalWindow(b, a, c), or PhysicalWindow(c, a, b). After final plan is
> > > generated, during post processing, we can replace the window operator with
> > > multiple layer nested window operators, and insert Exchange operators if
> > > necessary. But frankly speaking, I haven't seen any use cases of this kind
> > > in production.
> > >
> > > Regarding the rule alternative you proposed;
> > > > class PhysicalAggregateRule extends PhysicalRule {
> > > >  void onMatch(RelOptRuleCall call, *RelTraitSet requiredTraits*) {...
> > >
> > > Consider the following plan:
> > > InnerJoin (on a)
> > >   +-- Agg (on b)
> > >   +-- Scan
> > >
> > > For the inner join, we can generate sort merge join and hash join.
> > > The sort merge join can request the following traits to Agg:
> > > 1) Singleton
> > > 2) hash distribution on a, sorted by a
> > > The hash join can request the following traits to Agg:
> > > 1) Singleton
> > > 2) hash distribution on a
> > > 3) any distribution
> > > 4) broadcast distribution
> > >
> > > The PhysicalAggregateRule will be called and executed 5 times, while
> > > generating the same physical aggregate candidates, unless we pass a whole
> > > list of required traits to the physical rule, which I have prototyped some
> > > time ago with the exact idea.
> > >
> > > Regards,
> > > Haisheng Yuan
> > >
> > > On 2021/05/27 17:44:23, Haisheng Yuan <hy...@apache.org> wrote:
> > > > >    In distributed systems, an implementation rule may produce 
> > > > > different
> > > > >    physical operators depending on the input traits. Examples are
> > > Aggregate,
> > > > >    Sort, Window.
> > > >
> > > > No, in most cases, physical operators are generated regardless the
> > > input, because the input traits are not know yet. Window might be an
> > > exception.
> > > >
> > > > >    Since input traits are not known when the rule is fired, we must
> > > > >    generate *all possible combinations* of physical operators that we
> > > may
> > > > >    need. For LogicalAggregate, we must generate 1-phase and 2-phase
> > > > >    alternatives. For LogicalSort, we also have 1-phase and 2-phase
> > > > >    alternatives. Etc.
> > > >
> > > > IMHO, 1 phase and 2 phase are just different logical alternatives, that
> > > is also why I call it a logical rule to split the aggregate into a 2 phase
> > > aggregate. But HashAggregate and StreamAggregate are indeed the different
> > > physical alternatives for a LogicalAggregate.
> > > >
> > > >
> > > > >   Unlike Aggregate or Sort, which may have only 1 or 2 phases, certain
> > > > >   logical operators may have many physical alternatives. Consider the
> > > Window
> > > > >   function:......
> > > >
> > > > In window implementation rule, when building physical operator for
> > > Window that has multiple window functions but with different partition
> > > columns, we can infer the possible traits that can be delivered by input
> > > operators by creating your own RelMetaData, hence multiple window
> > > combination with certain order, but not exhausted enumeration. In fact, 
> > > the
> > > window ordering problem exists in every different kind of optimizer.
> > > >
> > > > > As input traits are not known when the rule is fired, the nodes 
> > > > > emitted
> > > > > from the implementation rules most likely would not be used in the
> > > final
> > > > > plan.
> > > >
> > > > That is quite normal, any operator generated by implementation rule
> > > might not be used in the final plan, because there may be tens of 
> > > thousands
> > > of alternatives, we only choose the one with lowest cost.
> > > >
> > > > > For example, I can create a physical aggregate that demands
> > > > > non-strict distribution {a,b} from its input, meaning that both [a,b]
> > > and
> > > > > [b,a] is ok. However, in the final plan, we are obligated to have a
> > > strict
> > > > > distribution - either [a, b] in that order, or [b, a] in that order -
> > > > > otherwise, physical operators like Join and Union will not work.
> > > >
> > > > It depends on your own satisfaction model and how do you coordinate
> > > property requirement among child operators. Unlike Orca optimizer, where
> > > there is exact match, partial satisfying, orderless match etc, Calcite's
> > > default implementation always require exact satisfying. But we can still
> > > make use of "passThrough" and "derive" to achieve our goal. i.e. the
> > > aggregate generated by implementation rule requires itself and its child 
> > > to
> > > delivered distribution on [a,b], but the "derive" method tells Aggregate
> > > that [b,a] is available, it can generate another option to require [b,a]
> > > instead.
> > > >
> > > > > In distributed engines, the nodes emitted from rules are basically
> > > "templates"
> > > > > that must be replaced with normal nodes.
> > > >
> > > > There is no difference between distributed and non-distributed engines
> > > when dealing with this. In Orca and CockroachDB optimizer, the nodes
> > > emitted from rules are operators without physical properties, the 
> > > optimizer
> > > then request physical properties in top-down manner, either recursively or
> > > stack, or state machine. Calcite is quite different. when the physical
> > > operator is generated by implementation rule, the physical operator must
> > > has its own traits, at the same time, the traits that it expects its child
> > > operators to deliver. So in Calcite, they are not "templates". The
> > > difference is there since Calcite's inception.
> > > >
> > > > Regards,
> > > > Haisheng Yuan
> > > >
> > > > On 2021/05/27 08:59:33, Vladimir Ozerov <ppoze...@gmail.com> wrote:
> > > > > Hi Haisheng,
> > > > >
> > > > > Thank you for your inputs. They are really helpful. Let me summarize
> > > your
> > > > > feedback in my own words to verify that I understand it correctly.
> > > > >
> > > > >    1. In distributed systems, an implementation rule may produce
> > > different
> > > > >    physical operators depending on the input traits. Examples are
> > > Aggregate,
> > > > >    Sort, Window.
> > > > >    2. Since input traits are not known when the rule is fired, we must
> > > > >    generate *all possible combinations* of physical operators that we
> > > may
> > > > >    need. For LogicalAggregate, we must generate 1-phase and 2-phase
> > > > >    alternatives. For LogicalSort, we also have 1-phase and 2-phase
> > > > >    alternatives. Etc.
> > > > >    3. If all combinations are generated, it is expected that
> > > "passThrough"
> > > > >    and "derive" would be just trivial replacements of traits for most
> > > cases.
> > > > >    This is why "passThroughTraits" and "deriveTraits" are recommended.
> > > A
> > > > >    notable exception is TableScan that may emit alternative indexes in
> > > > >    response to the pass-through requests.
> > > > >
> > > > > If my understanding is correct, then there are several issues with 
> > > > > this
> > > > > approach still.
> > > > >
> > > > > 1. Unlike Aggregate or Sort, which may have only 1 or 2 phases, 
> > > > > certain
> > > > > logical operators may have many physical alternatives. Consider the
> > > Window
> > > > > function:
> > > > > SELECT
> > > > >   AGG1 over (partition by a),
> > > > >   AGG2 over (partition by b),
> > > > >   AGG3 over (partition by c),
> > > > >   ...
> > > > > FROM input
> > > > >
> > > > > To calculate each aggregate, we need to re-shuffle the input based on
> > > the
> > > > > partition key. The key question is the order of reshuffling. If the
> > > input
> > > > > is shared by [a], I want to calculate AGG1 locally and then re-shuffle
> > > the
> > > > > input to calculate other aggregates. For the remaining AGG2 and AGG3,
> > > the
> > > > > order is also important. If the parent demands sharding by [b], then
> > > the
> > > > > proper sequence is b-c-a:
> > > > > 1: Window[AGG2 over (partition by b)]     // SHARDED[b]
> > > > > 2:   Window[AGG3 over (partition by c)]   // SHARDED[c]
> > > > > 3:     Window[AGG1 over (partition by a)] // SHARDED[a]
> > > > > 4:       Input                            // SHARDED[a]
> > > > >
> > > > > But if the parent demands [c], the proper sequence is c-b-a. Since we
> > > do
> > > > > not know real distributions when the rule is fired, we must emit all
> > > the
> > > > > permutations to ensure that no optimization opportunity is missed. But
> > > with
> > > > > complex window aggregate, this might be impractical because we will
> > > emit
> > > > > lots of unnecessary nodes.
> > > > >
> > > > > 2. As input traits are not known when the rule is fired, the nodes
> > > emitted
> > > > > from the implementation rules most likely would not be used in the
> > > final
> > > > > plan. For example, I can create a physical aggregate that demands
> > > > > non-strict distribution {a,b} from its input, meaning that both [a,b]
> > > and
> > > > > [b,a] is ok. However, in the final plan, we are obligated to have a
> > > strict
> > > > > distribution - either [a, b] in that order, or [b, a] in that order -
> > > > > otherwise, physical operators like Join and Union will not work. In
> > > > > distributed engines, the nodes emitted from rules are basically
> > > "templates"
> > > > > that must be replaced with normal nodes.
> > > > >
> > > > > Does this reasoning make any sense? If yes, it means that the current
> > > > > approach forces us to produce many unnecessary nodes to explore the
> > > full
> > > > > search space. The question is whether alternative approaches could
> > > better
> > > > > fit the requirements of the distributed engine? This is a purely
> > > > > theoretical question. I am currently looking deeper at CockroachDB.
> > > They
> > > > > have very different architecture: no separation between logical and
> > > > > physical nodes, physical properties are completely decoupled from
> > > nodes,
> > > > > usage of recursion instead of the stack, etc.
> > > > >
> > > > > Regards,
> > > > > Vladimir.
> > > > >
> > > > > чт, 27 мая 2021 г. в 03:19, Haisheng Yuan <hy...@apache.org>:
> > > > >
> > > > > > Another point I would like to mention is that it is not recommended
> > > to
> > > > > > override method "passThrough" and "derive" directly, override
> > > > > > "passThroughTraits" and "deriveTraits" instead, so that we can make
> > > sure
> > > > > > only the same type of physical node is created and no nested
> > > relnodes or
> > > > > > additional RelSets are created, unless you know you have to create
> > > > > > different type of nodes. For example, if the table foo has an btree
> > > index
> > > > > > on column a, and the parent relnode is requesting ordering on column
> > > a,
> > > > > > then we may consider to override "passThrough" of TableScan to
> > > return an
> > > > > > IndexScan instead of a TableScan.
> > > > > >
> > > > > > Regards,
> > > > > > Haisheng Yuan
> > > > > > On 2021/05/26 22:45:20, Haisheng Yuan <hy...@apache.org> wrote:
> > > > > > > Hi Vladimir,
> > > > > > >
> > > > > > > 1. You need a logical rule to split the aggregate into a local
> > > aggregate
> > > > > > and global aggregate, for example:
> > > > > > >
> > > > > >
> > > https://github.com/greenplum-db/gporca/blob/master/libgpopt/src/xforms/CXformSplitGbAgg.cpp
> > > > > > > Only implementation rules can convert a logical node to a physical
> > > node
> > > > > > or multiple physical nodes.
> > > > > > > After physical implementation, you have 2 physical alternatives:
> > > > > > > 1) single phase global physical aggregate,
> > > > > > > 2) 2 phase physical aggregate with local and global aggregate.
> > > > > > > It should be up to the cost to decide which one to choose.
> > > > > > >
> > > > > > > 2. Given a desired traitset from parent node, the current relnode
> > > only
> > > > > > needs to generate a single relnode after passing down the traitset.
> > > Given a
> > > > > > traitset delivered by child node, the current relnode only derive a
> > > single
> > > > > > relnode. Quite unlike other optimizer, in Calcite's top-down
> > > optimizer, you
> > > > > > don't need to worry about issuing multiple optimization requests to
> > > inputs,
> > > > > > which is handled by Calcite framework secretly. i.e.
> > > > > > > SELECT a, b, min(c) from foo group by a, b;
> > > > > > > In many other optimizer, we probably need ask the aggregate to
> > > issue 3
> > > > > > distribution requests for tablescan on foo, which are
> > > > > > > 1) hash distributed by a,
> > > > > > > 2) hash distributed by b,
> > > > > > > 3) hash distributed by a, b
> > > > > > > However in Calcite top-down optimizer, your physical
> > > implementation rule
> > > > > > for global aggregate only need generate a single physical node with
> > > hash
> > > > > > distribution by a, b. In case the table foo happens to be
> > > distributed by a,
> > > > > > or b, the derive() method will tell you there is an opportunity.
> > > This is
> > > > > > the feature that Calcite's top-down optimizer excels over other
> > > optimizers,
> > > > > > because this can dramatically reduce the search space while keeping
> > > the
> > > > > > optimal optimization opportunity.
> > > > > > >
> > > > > > > 3. This is by design. Nodes produced from "passThrough" and
> > > "derive" and
> > > > > > just sibling physical node with different traitset, we only need the
> > > > > > initial physical nodes after implementation to avoid unnecessary
> > > > > > operations. The fundamental reason is, unlike Orca optimizer where
> > > physical
> > > > > > node and physical property are separate things, Calcite's
> > > logical/physical
> > > > > > nodes contains traitset. With regard to the latter question, can you
> > > give
> > > > > > an example?
> > > > > > >
> > > > > > > Regards,
> > > > > > > Haisheng Yuan
> > > > > > >
> > > > > > >
> > > > > > > On 2021/05/26 20:11:57, Vladimir Ozerov <ppoze...@gmail.com>
> > > wrote:
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > I tried to optimize a certain combination of operators for the
> > > > > > distributed
> > > > > > > > engine and got stuck with the trait propagation in the top-down
> > > > > > engine. I
> > > > > > > > want to ask the community for advice on whether the problem is
> > > solvable
> > > > > > > > with the current Apache Calcite implementation or not.
> > > > > > > >
> > > > > > > > Consider the following logical tree:
> > > > > > > > 3: LogicalAggregate[group=[a], F2(c)]
> > > > > > > > 2:  LogicalAggregate[group=[a,b], F1(c)]
> > > > > > > > 1:    LogicalScan[t]
> > > > > > > >
> > > > > > > > Consider that these two aggregates cannot be merged or
> > > simplified for
> > > > > > > > whatever reason. We have only a set of physical rules to
> > > translate this
> > > > > > > > logical tree to a physical tree. Also, there could be any number
> > > of
> > > > > > > > other operators between these two aggregates. We omit them for
> > > clarity,
> > > > > > > > assuming that the distribution is not destroyed.
> > > > > > > >
> > > > > > > > In the distributed environment, non-collocated aggregates are
> > > often
> > > > > > > > implemented in two phases: local pre-aggregation and final
> > > aggregation,
> > > > > > > > with an exchange in between. Consider that the Scan operator is
> > > hash
> > > > > > > > distributed by some key other than [a] or [b]. If we optimize
> > > operators
> > > > > > > > without considering the whole plan, we may optimize each 
> > > > > > > > operator
> > > > > > > > independently, which would give us the following plan:
> > > > > > > > 3: PhysicalAggregate[group=[a], F2_phase2(c)]             //
> > > > > > > > HASH_DISTRIBUTED [a]
> > > > > > > > 3:   Exchange[a]                                          //
> > > > > > > > HASH_DISTRIBUTED [a]
> > > > > > > > 3:     PhysicalAggregate[group=[a], F2_phase1(c)]         //
> > > > > > > > HASH_DISTRIBUTED [a,b]
> > > > > > > > 2:       PhysicalAggregate[group=[a,b], F1_phase2(c)]     //
> > > > > > > > HASH_DISTRIBUTED [a,b]
> > > > > > > > 2:         Exchange[a, b]                                 //
> > > > > > > > HASH_DISTRIBUTED [a,b]
> > > > > > > > 2:           PhysicalAggregate[group=[a,b], F1_phase1(c)] //
> > > > > > > > HASH_DISTRIBUTED [d]
> > > > > > > > 1:             PhysicalScan[t]                            //
> > > > > > > > HASH_DISTRIBUTED [d]
> > > > > > > >
> > > > > > > > This plan is not optimal, because we re-hash inputs twice. A
> > > better
> > > > > > plan
> > > > > > > > that we want to get:
> > > > > > > > 3: PhysicalAggregate[group=[a], F2(c)]                //
> > > > > > HASH_DISTRIBUTED
> > > > > > > > [a]
> > > > > > > > 2:   PhysicalAggregate[group=[a,b], F1_phase2(c)]     //
> > > > > > HASH_DISTRIBUTED
> > > > > > > > [a]
> > > > > > > > 2:     Exchange[a]                                    //
> > > > > > HASH_DISTRIBUTED
> > > > > > > > [a]
> > > > > > > > 2:       PhysicalAggregate[group=[a,b], F1_phase1(c)] //
> > > > > > HASH_DISTRIBUTED
> > > > > > > > [d]
> > > > > > > > 1:         PhysicalScan[t]                            //
> > > > > > HASH_DISTRIBUTED
> > > > > > > > [d]
> > > > > > > >
> > > > > > > > In this case, we take advantage of the fact that the
> > > distribution [a]
> > > > > > is
> > > > > > > > compatible with [a,b]. Therefore we may enforce only [a],
> > > instead of
> > > > > > doing
> > > > > > > > [a,b] and then [a]. Since exchange operators are very expensive,
> > > this
> > > > > > > > optimization may bring a significant boost to the query engine.
> > > Now the
> > > > > > > > question - how do we reach that state? Intuitively, a
> > > pass-through is
> > > > > > > > exactly what we need. We may pass the optimization request from
> > > top
> > > > > > > > aggregate to bottom aggregate to find physical implementations
> > > shared
> > > > > > by
> > > > > > > > [a]. But the devil is in the details - when and how exactly to
> > > pass
> > > > > > this
> > > > > > > > request?
> > > > > > > >
> > > > > > > > Typically, we have a conversion rule that converts a logical
> > > aggregate
> > > > > > to a
> > > > > > > > physical aggregate. We may invoke "convert" on the input to
> > > initiate
> > > > > > the
> > > > > > > > pass-through:
> > > > > > > >
> > > > > > > > RelNode convert(...) {
> > > > > > > >     return new PhysicalAggregate(
> > > > > > > >         convert(input, HASH_DISTRIBUTED[a])
> > > > > > > >     )
> > > > > > > > }
> > > > > > > >
> > > > > > > > The first problem - we cannot create the normal physical
> > > aggregate here
> > > > > > > > because we do not know input traits yet. The final decision
> > > whether to
> > > > > > do a
> > > > > > > > one-phase or two-phase aggregate can be made only in the
> > > > > > > > "PhysicalNode.derive" method when concrete input traits are
> > > resolved.
> > > > > > > > Therefore the converter rule should create a kind of "template"
> > > > > > physical
> > > > > > > > operator, which would be used to construct the final operator(s)
> > > when
> > > > > > input
> > > > > > > > traits are resolved. AFAIU Enumerable works similarly: we create
> > > > > > operators
> > > > > > > > with virtually arbitrary traits taken from logical nodes in the
> > > > > > conversion
> > > > > > > > rules. We only later do create normal nodes in the derive()
> > > methods.
> > > > > > > >
> > > > > > > > The second problem - our top aggregate doesn't actually need the
> > > > > > > > HASH_DISTRIBUTED[a] input. Instead, it may accept inputs with 
> > > > > > > > any
> > > > > > > > distribution. What we really need is to inform the input (bottom
> > > > > > aggregate)
> > > > > > > > that it should look for additional implementations that satisfy
> > > > > > > > HASH_DISTRIBUTED[a]. Therefore, enforcing a specific
> > > distribution on
> > > > > > the
> > > > > > > > input using the "convert" method is not what we need because 
> > > > > > > > this
> > > > > > > > conversion might enforce unnecessary exchanges.
> > > > > > > >
> > > > > > > > The third problem - derivation. Consider that we delivered the
> > > > > > optimization
> > > > > > > > request to the bottom aggregate. As an implementor, what am I
> > > supposed
> > > > > > to
> > > > > > > > do in this method? I cannot return the final aggregate from here
> > > > > > because
> > > > > > > > the real input traits are not derived yet. Therefore, I can only
> > > return
> > > > > > > > another template, hoping that the "derive" method will be called
> > > on it.
> > > > > > > > However, this will not happen because trait derivation is
> > > skipped on
> > > > > > the
> > > > > > > > nodes emitted from pass-through. See "DeriveTrait.perform" [1].
> > > > > > > >
> > > > > > > > BottomAggregate {
> > > > > > > >     RelNode passThrough(distribution=HASH_DISTRIBUTED[a]) {
> > > > > > > >         // ???
> > > > > > > >     }
> > > > > > > > }
> > > > > > > >
> > > > > > > > I feel that I am either going in the wrong direction, or some
> > > gaps in
> > > > > > the
> > > > > > > > product disallow such optimization. So I would like to ask the
> > > > > > community to
> > > > > > > > assist with the following questions:
> > > > > > > > 1. In the top-down optimizer, how should we convert a logical
> > > node to a
> > > > > > > > physical node, provided that "derive" is not called yet? I have
> > > a gut
> > > > > > > > feeling that the trait propagation is currently not implemented
> > > to the
> > > > > > full
> > > > > > > > extent because based on Cascades paper I would expect that 
> > > > > > > > parent
> > > > > > physical
> > > > > > > > nodes are produced after the child physical nodes. But in our
> > > rules,
> > > > > > this
> > > > > > > > is not the case - some physical nodes are produced before the
> > > trait
> > > > > > > > derivation.
> > > > > > > > 2. How to propagate several optimization requests to inputs? We
> > > need
> > > > > > either
> > > > > > > > inputs with a specific distribution or inputs with an arbitrary
> > > > > > > > distribution in the example above. It seems that to achieve
> > > that, I
> > > > > > need to
> > > > > > > > emit several alternative nodes with different requirements to
> > > inputs.
> > > > > > Does
> > > > > > > > it make sense?
> > > > > > > > 3. Why are nodes produced from the "passThrough" method excluded
> > > from
> > > > > > trait
> > > > > > > > derivation? If this is by design, how can I preserve the
> > > optimization
> > > > > > > > request to satisfy it on the derivation stage when input traits
> > > are
> > > > > > > > available?
> > > > > > > >
> > > > > > > > Regards,
> > > > > > > > Vladimir.
> > > > > > > >
> > > > > > > > [1]
> > > > > > > >
> > > > > >
> > > https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java#L828
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> > 
> 

Reply via email to