Hi Haisheng,

Thank you for your inputs. They are really helpful. Let me summarize your
feedback in my own words to verify that I understand it correctly.

   1. In distributed systems, an implementation rule may produce different
   physical operators depending on the input traits. Examples are Aggregate,
   Sort, Window.
   2. Since input traits are not known when the rule is fired, we must
   generate *all possible combinations* of physical operators that we may
   need. For LogicalAggregate, we must generate 1-phase and 2-phase
   alternatives. For LogicalSort, we also have 1-phase and 2-phase
   alternatives. Etc.
   3. If all combinations are generated, it is expected that "passThrough"
   and "derive" would be just trivial replacements of traits for most cases.
   This is why "passThroughTraits" and "deriveTraits" are recommended. A
   notable exception is TableScan that may emit alternative indexes in
   response to the pass-through requests.

If my understanding is correct, then there are several issues with this
approach still.

1. Unlike Aggregate or Sort, which may have only 1 or 2 phases, certain
logical operators may have many physical alternatives. Consider the Window
function:
SELECT
  AGG1 over (partition by a),
  AGG2 over (partition by b),
  AGG3 over (partition by c),
  ...
FROM input

To calculate each aggregate, we need to re-shuffle the input based on the
partition key. The key question is the order of reshuffling. If the input
is shared by [a], I want to calculate AGG1 locally and then re-shuffle the
input to calculate other aggregates. For the remaining AGG2 and AGG3, the
order is also important. If the parent demands sharding by [b], then the
proper sequence is b-c-a:
1: Window[AGG2 over (partition by b)]     // SHARDED[b]
2:   Window[AGG3 over (partition by c)]   // SHARDED[c]
3:     Window[AGG1 over (partition by a)] // SHARDED[a]
4:       Input                            // SHARDED[a]

But if the parent demands [c], the proper sequence is c-b-a. Since we do
not know real distributions when the rule is fired, we must emit all the
permutations to ensure that no optimization opportunity is missed. But with
complex window aggregate, this might be impractical because we will emit
lots of unnecessary nodes.

2. As input traits are not known when the rule is fired, the nodes emitted
from the implementation rules most likely would not be used in the final
plan. For example, I can create a physical aggregate that demands
non-strict distribution {a,b} from its input, meaning that both [a,b] and
[b,a] is ok. However, in the final plan, we are obligated to have a strict
distribution - either [a, b] in that order, or [b, a] in that order -
otherwise, physical operators like Join and Union will not work. In
distributed engines, the nodes emitted from rules are basically "templates"
that must be replaced with normal nodes.

Does this reasoning make any sense? If yes, it means that the current
approach forces us to produce many unnecessary nodes to explore the full
search space. The question is whether alternative approaches could better
fit the requirements of the distributed engine? This is a purely
theoretical question. I am currently looking deeper at CockroachDB. They
have very different architecture: no separation between logical and
physical nodes, physical properties are completely decoupled from nodes,
usage of recursion instead of the stack, etc.

Regards,
Vladimir.

чт, 27 мая 2021 г. в 03:19, Haisheng Yuan <[email protected]>:

> Another point I would like to mention is that it is not recommended to
> override method "passThrough" and "derive" directly, override
> "passThroughTraits" and "deriveTraits" instead, so that we can make sure
> only the same type of physical node is created and no nested relnodes or
> additional RelSets are created, unless you know you have to create
> different type of nodes. For example, if the table foo has an btree index
> on column a, and the parent relnode is requesting ordering on column a,
> then we may consider to override "passThrough" of TableScan to return an
> IndexScan instead of a TableScan.
>
> Regards,
> Haisheng Yuan
> On 2021/05/26 22:45:20, Haisheng Yuan <[email protected]> wrote:
> > Hi Vladimir,
> >
> > 1. You need a logical rule to split the aggregate into a local aggregate
> and global aggregate, for example:
> >
> https://github.com/greenplum-db/gporca/blob/master/libgpopt/src/xforms/CXformSplitGbAgg.cpp
> > Only implementation rules can convert a logical node to a physical node
> or multiple physical nodes.
> > After physical implementation, you have 2 physical alternatives:
> > 1) single phase global physical aggregate,
> > 2) 2 phase physical aggregate with local and global aggregate.
> > It should be up to the cost to decide which one to choose.
> >
> > 2. Given a desired traitset from parent node, the current relnode only
> needs to generate a single relnode after passing down the traitset. Given a
> traitset delivered by child node, the current relnode only derive a single
> relnode. Quite unlike other optimizer, in Calcite's top-down optimizer, you
> don't need to worry about issuing multiple optimization requests to inputs,
> which is handled by Calcite framework secretly. i.e.
> > SELECT a, b, min(c) from foo group by a, b;
> > In many other optimizer, we probably need ask the aggregate to issue 3
> distribution requests for tablescan on foo, which are
> > 1) hash distributed by a,
> > 2) hash distributed by b,
> > 3) hash distributed by a, b
> > However in Calcite top-down optimizer, your physical implementation rule
> for global aggregate only need generate a single physical node with hash
> distribution by a, b. In case the table foo happens to be distributed by a,
> or b, the derive() method will tell you there is an opportunity. This is
> the feature that Calcite's top-down optimizer excels over other optimizers,
> because this can dramatically reduce the search space while keeping the
> optimal optimization opportunity.
> >
> > 3. This is by design. Nodes produced from "passThrough" and "derive" and
> just sibling physical node with different traitset, we only need the
> initial physical nodes after implementation to avoid unnecessary
> operations. The fundamental reason is, unlike Orca optimizer where physical
> node and physical property are separate things, Calcite's logical/physical
> nodes contains traitset. With regard to the latter question, can you give
> an example?
> >
> > Regards,
> > Haisheng Yuan
> >
> >
> > On 2021/05/26 20:11:57, Vladimir Ozerov <[email protected]> wrote:
> > > Hi,
> > >
> > > I tried to optimize a certain combination of operators for the
> distributed
> > > engine and got stuck with the trait propagation in the top-down
> engine. I
> > > want to ask the community for advice on whether the problem is solvable
> > > with the current Apache Calcite implementation or not.
> > >
> > > Consider the following logical tree:
> > > 3: LogicalAggregate[group=[a], F2(c)]
> > > 2:  LogicalAggregate[group=[a,b], F1(c)]
> > > 1:    LogicalScan[t]
> > >
> > > Consider that these two aggregates cannot be merged or simplified for
> > > whatever reason. We have only a set of physical rules to translate this
> > > logical tree to a physical tree. Also, there could be any number of
> > > other operators between these two aggregates. We omit them for clarity,
> > > assuming that the distribution is not destroyed.
> > >
> > > In the distributed environment, non-collocated aggregates are often
> > > implemented in two phases: local pre-aggregation and final aggregation,
> > > with an exchange in between. Consider that the Scan operator is hash
> > > distributed by some key other than [a] or [b]. If we optimize operators
> > > without considering the whole plan, we may optimize each operator
> > > independently, which would give us the following plan:
> > > 3: PhysicalAggregate[group=[a], F2_phase2(c)]             //
> > > HASH_DISTRIBUTED [a]
> > > 3:   Exchange[a]                                          //
> > > HASH_DISTRIBUTED [a]
> > > 3:     PhysicalAggregate[group=[a], F2_phase1(c)]         //
> > > HASH_DISTRIBUTED [a,b]
> > > 2:       PhysicalAggregate[group=[a,b], F1_phase2(c)]     //
> > > HASH_DISTRIBUTED [a,b]
> > > 2:         Exchange[a, b]                                 //
> > > HASH_DISTRIBUTED [a,b]
> > > 2:           PhysicalAggregate[group=[a,b], F1_phase1(c)] //
> > > HASH_DISTRIBUTED [d]
> > > 1:             PhysicalScan[t]                            //
> > > HASH_DISTRIBUTED [d]
> > >
> > > This plan is not optimal, because we re-hash inputs twice. A better
> plan
> > > that we want to get:
> > > 3: PhysicalAggregate[group=[a], F2(c)]                //
> HASH_DISTRIBUTED
> > > [a]
> > > 2:   PhysicalAggregate[group=[a,b], F1_phase2(c)]     //
> HASH_DISTRIBUTED
> > > [a]
> > > 2:     Exchange[a]                                    //
> HASH_DISTRIBUTED
> > > [a]
> > > 2:       PhysicalAggregate[group=[a,b], F1_phase1(c)] //
> HASH_DISTRIBUTED
> > > [d]
> > > 1:         PhysicalScan[t]                            //
> HASH_DISTRIBUTED
> > > [d]
> > >
> > > In this case, we take advantage of the fact that the distribution [a]
> is
> > > compatible with [a,b]. Therefore we may enforce only [a], instead of
> doing
> > > [a,b] and then [a]. Since exchange operators are very expensive, this
> > > optimization may bring a significant boost to the query engine. Now the
> > > question - how do we reach that state? Intuitively, a pass-through is
> > > exactly what we need. We may pass the optimization request from top
> > > aggregate to bottom aggregate to find physical implementations shared
> by
> > > [a]. But the devil is in the details - when and how exactly to pass
> this
> > > request?
> > >
> > > Typically, we have a conversion rule that converts a logical aggregate
> to a
> > > physical aggregate. We may invoke "convert" on the input to initiate
> the
> > > pass-through:
> > >
> > > RelNode convert(...) {
> > >     return new PhysicalAggregate(
> > >         convert(input, HASH_DISTRIBUTED[a])
> > >     )
> > > }
> > >
> > > The first problem - we cannot create the normal physical aggregate here
> > > because we do not know input traits yet. The final decision whether to
> do a
> > > one-phase or two-phase aggregate can be made only in the
> > > "PhysicalNode.derive" method when concrete input traits are resolved.
> > > Therefore the converter rule should create a kind of "template"
> physical
> > > operator, which would be used to construct the final operator(s) when
> input
> > > traits are resolved. AFAIU Enumerable works similarly: we create
> operators
> > > with virtually arbitrary traits taken from logical nodes in the
> conversion
> > > rules. We only later do create normal nodes in the derive() methods.
> > >
> > > The second problem - our top aggregate doesn't actually need the
> > > HASH_DISTRIBUTED[a] input. Instead, it may accept inputs with any
> > > distribution. What we really need is to inform the input (bottom
> aggregate)
> > > that it should look for additional implementations that satisfy
> > > HASH_DISTRIBUTED[a]. Therefore, enforcing a specific distribution on
> the
> > > input using the "convert" method is not what we need because this
> > > conversion might enforce unnecessary exchanges.
> > >
> > > The third problem - derivation. Consider that we delivered the
> optimization
> > > request to the bottom aggregate. As an implementor, what am I supposed
> to
> > > do in this method? I cannot return the final aggregate from here
> because
> > > the real input traits are not derived yet. Therefore, I can only return
> > > another template, hoping that the "derive" method will be called on it.
> > > However, this will not happen because trait derivation is skipped on
> the
> > > nodes emitted from pass-through. See "DeriveTrait.perform" [1].
> > >
> > > BottomAggregate {
> > >     RelNode passThrough(distribution=HASH_DISTRIBUTED[a]) {
> > >         // ???
> > >     }
> > > }
> > >
> > > I feel that I am either going in the wrong direction, or some gaps in
> the
> > > product disallow such optimization. So I would like to ask the
> community to
> > > assist with the following questions:
> > > 1. In the top-down optimizer, how should we convert a logical node to a
> > > physical node, provided that "derive" is not called yet? I have a gut
> > > feeling that the trait propagation is currently not implemented to the
> full
> > > extent because based on Cascades paper I would expect that parent
> physical
> > > nodes are produced after the child physical nodes. But in our rules,
> this
> > > is not the case - some physical nodes are produced before the trait
> > > derivation.
> > > 2. How to propagate several optimization requests to inputs? We need
> either
> > > inputs with a specific distribution or inputs with an arbitrary
> > > distribution in the example above. It seems that to achieve that, I
> need to
> > > emit several alternative nodes with different requirements to inputs.
> Does
> > > it make sense?
> > > 3. Why are nodes produced from the "passThrough" method excluded from
> trait
> > > derivation? If this is by design, how can I preserve the optimization
> > > request to satisfy it on the derivation stage when input traits are
> > > available?
> > >
> > > Regards,
> > > Vladimir.
> > >
> > > [1]
> > >
> https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java#L828
> > >
> >
>

Reply via email to