How does it relate with "derive" to get the desired plan? Initially PhysicalAggregate1 requests HASH[b,c], PhysicalAggregate2 requests HASH[a,b,c]. PhysicalAggregate2 is called on "passthrough" by passing HASH[b,c], then generate another PhysicalAggregate2 with trait HASH[b,c]. You don't need the involvement of "derive".
Haisheng Yuan On 2021/06/13 16:58:53, Vladimir Ozerov <ppoze...@gmail.com> wrote: > Hi, > > I tried to apply different approaches, but eventually, I failed to achieve > my goals. It seems that the current implementation cannot handle the > required scenario, as explained below. > > Consider the following tree: > LogicalAggregate1[group=[b,c]] > LogicalAggregate2[group=[a,b,c]] > LogicalInput > > I want to find the plan to do these two aggregations without an exchange in > between because they may have compatible distributions. Example: > PhysicalAggregate1[group=[b,c]] // SHARDED[b,c] > PhysicalAggregate2[group=[a,b,c]] // SHARDED[b,c] > Exchange // SHARDED[b,c] > PhysicalInput // SHARDED[?] > > The fundamental problem is that it is impossible to save the optimization > request and resolve traits in the "derive" phase afterward. What we need is > to send the optimization request "SHARDED by [b,c] in any order" to > PhysicalAggregate2, and use it in the derive phase so that the new > PhysicalAggregate2 is created with [b,c] or [c,b], but strictly without > [a]. Unfortunately, this doesn't work because the nodes emitted from the > pass-through do not participate in the "derive" phase. > > This could be fixed with a trivial change - to allow certain nodes emitted > from the "passThrough" to participate in "derive". We can do that using a > marker interface or an extension to a PhysicalRel interface. For example: > interface PhysicalRel { > boolean enforceDerive(); > } > > When set to "true", the node would not be added to the pass-through cache. > This way, we may use this node as *storage* for the optimization request. > When the "derive" is called later, we know both the parent requirements and > the child traits. This would be sufficient to solve my problem. I already > tried to do this by disabling the pass-through cache completely and > confirmed that the required plan is found. > > Do you have any objections to such a change? > > Regards, > Vladimir. > > сб, 29 мая 2021 г. в 11:59, Vladimir Ozerov <ppoze...@gmail.com>: > > > Hi Haisheng, Jinpeng > > > > I think we are more or less on the same page: > > > > 1. The current implementation of Apache Calcite may generate wasteful > > alternatives because rules lack the optimization context. > > 2. But the actual impact on efficiency is not clear. > > > > The (2) is essential to understand whether my efforts make any practical > > sense. And so far, I have only a vague common sense and some simple > > examples in mind, which is not sufficient to make any claims. > > > > Nevertheless, I've checked the source code of the original Columbia > > optimizer. I was wrong in my original claim that Columbia doesn't pass > > optimization context to rules. It does [1]. The context consists of > > required traits and cost budget. In Apache Calcite terms, the context is > > passed to both "RelRule.matches" and "RelRule.onMatch", so that the rule > > may decide on the optimization strategy based on parent request. This is > > exactly what I was trying to achieve in my system with some hacks around > > derive/passThrough. > > > > Regarding the example with join, my proposal is not likely to make any > > difference because the tables are not co-located on the join key, and hence > > join may emit several distributions. Consider the different situation - > > data is already collocated. Without the context, I will emit both 1-phase > > and 2-phase aggregates because I do not know which distributions are > > available below. With the context available, I can collect propagate > > promising optimization requests from Aggregate rules (1-phase, 2-phase). > > Then wait for input optimization and check what is returned. If only > > [dist=a] is returned, I can skip the 2-phase aggregate completely. > > Aggregate[group=a] > > Join[foo.a=bar.b] > > Input(foo, dist=a) > > Input(bar, dist=b) > > > > Another possible use case is join on several keys. By issuing a > > context-aware optimization request [dist a1] from Aggregate to Join, we > > can establish tight cost bounds on Aggregate and Join equivalence groups > > very early so that all other options (broadcasts, sharding in [a1,a2], ...) > > would be pruned without even entering MEMO. > > Aggregate[group=a1] > > Join[foo.a1=bar.b1 AND foo.a2=bar.b2] > > Input(foo, dist=a1) > > Input(bar, dist=b2) > > > > As far as Jinpeng's example with logical multi-phase aggregates - I think > > this is a great example of why logical split might be useful. Thank you for > > that. This reminded me about another concerning use case. Consider an > > Aggregate on top of a UnionAll: > > LogicalAggregate[group=a, COUNT(b)] > > UnionAll > > Input1 > > Input2 > > > > With Calcite rules, we may push the aggregate down: > > LogicalAggregate[group=a, SUM(COUNT)] > > UnionAll > > LogicalAggregate[group=a, COUNT(b)] // <-- Possible exchange here > > Input1 > > LogicalAggregate[group=a, COUNT(b)] // <-- Possible exchange here > > Input2 > > > > In my optimizer, all logical aggregates are treated in the same way. So if > > the Input1 is not shared by [a], I will generate an exchange. However, if > > we apply your suggestion, we may first split the logical aggregate into two > > tagged logical aggregates: > > LogicalAggregate[group=a, SUM(COUNT), type=global] > > LogicalAggregate[group=a, COUNT(b), type=local] > > UnionAll > > Input1 > > Input2 > > > > Then we may implement a transformation rule that pushes down only > > pre-aggregates. As a result, bottom aggregates will be converted into > > single-phase physical aggregate, leading to a much better plan. > > LogicalAggregate[group=a, SUM(COUNT), type=global] > > UnionAll > > LogicalAggregate[group=a, COUNT(b), type=local] // <-- No exchange > > Input1 > > LogicalAggregate[group=a, COUNT(b), type=local] // <-- No exchange > > Input2 > > > > So I agree with you that logical optimization might be very useful. The > > main practical concern is the complexity. We essentially introduce new > > logical operators that cannot be used by the existing Apache Calcite > > logical rule library in the general case. > > > > Regards, > > Vladimir. > > > > [1] > > https://github.com/yongwen/columbia/blob/master/header/rules.h#L383-L397 > > > > сб, 29 мая 2021 г. в 04:30, Jinpeng Wu <wjpabc...@gmail.com>: > > > >> Hi, Vladimir. > >> > >> As another topic, it is highly recommended that you split the aggregation > >> in logical stages, not only for traits related matters. It is true that > >> you > >> need to annotate the node with different flags or subclasses and it's a > >> large refactor. But after that, you may find much much bigger benefits. > >> > >> The most important benefit is aggregation pushing down. For example, the > >> query: > >> > >> select t1.value, agg(t2.value) from t1 join t2 on t1.key = t2.key; > >> > >> You may be able to generate such plan: > >> > >> PhysicalAggregatePhase(group=t1.value, method=agg_final(t2.value)) > >> Exchange(dist = t1.value) > >> Join (t1.key = t2.key) > >> Exchange(dist = t1.key) > >> scan(t1) > >> Exchange(dist = t2.key) > >> PhysicalAggregationPhase(group = t2.key, f_partial(a)) > >> scan(t2) > >> > >> The pushed "PhysicalAggregationPhase(group = t2.key, f_partial(a))" may be > >> able to reduce the input data size of the exchange operation dramatically. > >> > >> There has been lots of research on aggregation push down. But partial > >> aggregate pushing down could achieve much more benefits: > >> 1. Unlike pushing down a full aggregation, the partial aggregate requires > >> no extra exchanges. So it could be a pure gain. > >> 2. The pushing down can apply to any aggregation functions, including > >> user-defined aggregation functions. > >> 3. By introducing the middle phase (the 3-pass aggregation > >> implementation). > >> Aggregation can be splitted into any number of phases and partial > >> aggregation can be pushed down through any number of joins, somewhat like: > >> > >> AggregatePhase(final) > >> Exchange > >> AggregatePhase(middle) > >> JOIN > >> Exchange > >> AggregatePhase(middle) > >> JOIN > >> Exchange > >> AggregatePhase(middle) > >> ... > >> JOIN > >> Exchange > >> AggregatePhase(partial) > >> TableScan > >> ... > >> Note that AggregatePhase(middle) could work in an adaptive manner: after > >> processing some data, if it discovers no data reduction, it could > >> just degenerate to a NOP operation and can be very light weight. > >> > >> Thanks, > >> Jinpeng Wu > >> > >> > >> On Sat, May 29, 2021 at 8:32 AM Haisheng Yuan <hy...@apache.org> wrote: > >> > >> > > 2) Optimization requests are basically sent to RelSet-s, not > >> RelSubset-s, > >> > > as we make pairwise comparisons between the requested RelSubset and > >> other > >> > > subsets in the set [5][6]. > >> > > >> > I agree with you. There could be some waste when the new delivered / > >> > required traitset is generated by "passThrough"/ "derive", in which > >> case, > >> > we only need enforcer between the pair of subsets, instead of pairing > >> with > >> > all other required / delivered subsets in the RelSet. i.e. > >> > In the MEMO group, we have 2 required traitsets: > >> > 1) Hash[a] Sort[b] > >> > 2) Hash[b] Sort[c] > >> > > >> > When we try to pass Hash[a] Sort[b] to one of physical operators say > >> > Project, we found that we can pass down Hash[a] down to its child, then > >> we > >> > get a new physical Project with traitset Hash[a], we only need enforcer > >> > between Hash[a] and Hash[a]Sort[b], but currently in method > >> > "addConverters", we also generate enforcer between Hash[a] and > >> > Hash[b]Sort[c], which is not actually what we want. > >> > > >> > I think it is definitely worth trying to optimize. > >> > > >> > Regards, > >> > Haisheng Yuan > >> > On 2021/05/28 19:15:03, Haisheng Yuan <hy...@apache.org> wrote: > >> > > Hi Vladimir, > >> > > > >> > > The top-down optimizer does NOT require implementation rule to > >> generate > >> > 1 to 1 physical operator for a logical operator, as you can see, if you > >> > generate a 2 phase physical aggregates for the logical aggregate in the > >> > implementation rule, it still works. Window is special because we can > >> > reshuffle the execution order of window functions, and that order makes > >> a > >> > difference according to different parent physical property request. A > >> > single converged physical Window operator catered for this speciality. > >> > However as I said I don't think it is a common scenario. > >> > > > >> > > > the whole decision of whether to go with 1-phase or 2-phase > >> > > > aggregate is a physical decision that should be made based on > >> > available (or > >> > > > assumed) input traits. > >> > > What is the problem of generating both 1-phase and 2-phase aggregates > >> > and choose the best one based on the cost? > >> > > > >> > > Let's see the following query: > >> > > select a, min(b) from (select * from foo, bar where foo.a=bar.a) t > >> group > >> > by a; > >> > > suppose foo is randomly distributed fact table, and bar is randomly > >> > distributed dimension table. > >> > > Consider the 2 following plans: > >> > > 1) > >> > > PhysicalAggregate > >> > > +-- HashJoin > >> > > +-- HashDistribute by a > >> > > +-- TableScan on foo > >> > > +-- HashDistribute by a > >> > > +-- TableScan on bar > >> > > > >> > > 2) > >> > > PhysicalAggregate(global) > >> > > +-- HashDistribute by a > >> > > +---- PhysicalAggregate(local) > >> > > +---- HashJoin > >> > > +-- TableScan on foo > >> > > +-- Broadcast > >> > > +-- TableScan on bar > >> > > > >> > > Can you tell that the single phase aggregate plan is always better > >> than > >> > the 2 phase aggregate plan? > >> > > > >> > > > Therefore, the typical way to optimize > >> > > > LogicalAggregate is to split in the physical phase (implementation > >> > rule, > >> > > > pass-through, derive). Practical systems like Dremio [1] and Flink > >> [2] > >> > > > work this way. > >> > > Dremio and Flink work this way doesn't mean it is a good way. > >> Greenplum > >> > Orca and Alibaba MaxCompute optimizer work in another way. In Flink and > >> > Dremio, they have HashAggPRule to generate 1 phase HashAgg and 2 phase > >> > HashAgg, SortAggPRule to generate 1 phase SortAgg and 2 phase SortAgg. > >> > However do you think there is possibility that the global SortAgg > >> combined > >> > with local HashAgg, or the global HashAgg combined with local SortAgg > >> may > >> > perform better in difference cases? Are you going to generate all the 4 > >> > combinations in the implementation rule? There are some cases we found > >> we'd > >> > better to split the aggregate into 3 phase aggregate [1], in which case, > >> > will the implementation rule generate 3 HashAggs or 3 SortAggs, or all > >> the > >> > 6 combinations? > >> > > > >> > > In our system, we have 1 phase, 2 phase, 3 phase logical aggregate > >> rules > >> > to transform the LogicalAggregate to another kind of logical > >> aggregate(s) > >> > with phase info, say LogicalXXXAggregate, then our physical aggregate > >> rules > >> > match this kind of node to generate HashAgg or StreamAgg. Of course, in > >> the > >> > logical rules, we can add business logic to guess the possible traits > >> > delivered by child nodes to determine whether the rule definitely won't > >> > generate a better alternative and may decide to abort this > >> transformation > >> > early. But I would rather let the cost model decide. > >> > > > >> > > Admittedly, the current top-down optimization is not pure on-demand > >> > request oriented, because it will always generate a physical request > >> > regardless the parent nodes' trait request. For example the following > >> query > >> > in a non-distributed environment: > >> > > select a, b, c, max(d) from foo group by a, b, c order by a desc; > >> > > > >> > > It will first generate a StreamAgg[a ASC, b ASC, c ASC] no matter what > >> > the parent node requires, then the "passThrough" tells StreamAgg that > >> > parent requires [a DESC], we get a StreamAgg[a DESC, b ASC, c ASC]. It > >> > would be ideal if we only generate StreamAgg[a DESC, b ASC, c ASC] by > >> > request, but I don't think that will make much difference, the > >> bottleneck > >> > relies on the join order enumeration and the Project related operation. > >> > > > >> > > Regards, > >> > > Haisheng Yuan > >> > > > >> > > [1] > >> > > >> https://github.com/greenplum-db/gporca/blob/master/libgpopt/src/xforms/CXformSplitDQA.cpp > >> > > > >> > > On 2021/05/28 09:17:45, Vladimir Ozerov <ppoze...@gmail.com> wrote: > >> > > > Hi Jinpeng, Haisheng, > >> > > > > >> > > > Thank you for your inputs. I really appreciate that. Let me try to > >> > address > >> > > > some of your comments and share some experience with the > >> > implementation of > >> > > > optimizers for a distributed engine I am currently working with. > >> > > > > >> > > > First of all, I would argue that multiple logical operators do not > >> > have a > >> > > > 1-1 mapping to physical operators, and Window is not special here. > >> For > >> > > > instance, LogicalAggregate doesn't have 1-1 mapping to physical > >> > aggregates > >> > > > because the physical implementation can be either 1-phase or > >> 2-phase. > >> > It > >> > > > doesn't matter that the 2-phase aggregate is a composition of two > >> > 1-phase > >> > > > aggregates: the whole decision of whether to go with 1-phase or > >> 2-phase > >> > > > aggregate is a physical decision that should be made based on > >> > available (or > >> > > > assumed) input traits. > >> > > > > >> > > > Consider the following logical tree: > >> > > > LogicalAggregate[group=$0, agg=SUM($1)] > >> > > > Input > >> > > > > >> > > > If I do the split on the logical phase with a separate > >> transformation > >> > rule, > >> > > > I will get the following tree: > >> > > > LogicalAggregate[group=$0, agg=SUM($1)] > >> > > > LogicalAggregate[group=$0, agg=SUM($1)] > >> > > > Input > >> > > > > >> > > > Now we have an infinite loop because the rule takes one aggregate > >> and > >> > > > produces two aggregates. To fix that, we may extend the > >> > LogicalAggregate > >> > > > with some flag or so. But this (1) potentially breaks other > >> > LogicalAggregate > >> > > > optimizations (e.g., transpose with other operators), and (2) breaks > >> > the > >> > > > whole idea of the logical operators because the execution phase > >> > > > (pre-aggregate of final aggregate) is a property of concrete > >> backend, > >> > not a > >> > > > property of relational algebra. Therefore, the typical way to > >> optimize > >> > > > LogicalAggregate is to split in the physical phase (implementation > >> > rule, > >> > > > pass-through, derive). Practical systems like Dremio [1] and Flink > >> [2] > >> > > > work this way. > >> > > > > >> > > > That said, as an optimizer developer, I need the flexibility to emit > >> > any > >> > > > physical trees for the given logical operator, and 1-1 mapping > >> cannot > >> > be > >> > > > assumed. Calcite's API allows for that, and I am not aware of formal > >> > > > documentation or guidelines that discourage that. > >> > > > > >> > > > Now the question when exactly to emit the operators. Normally, we > >> > produce > >> > > > operators from rules. As discussed above, if the logical operator > >> may > >> > > > produce different physical trees depending on input traits, the > >> > > > recommendation is to emit all combinations, even though we do not > >> know > >> > > > whether there would be good inputs for that alternatives. This > >> > contradicts > >> > > > the idea of the guided top-down search, where we explore the search > >> > space > >> > > > in response to a concrete optimization request, rather than with a > >> > > > pessimistic assumption that a certain plan might be required in the > >> > future. > >> > > > > >> > > > I found a way to mitigate this problem partially. Funny, my > >> solution is > >> > > > almost similar to what Haisheng proposed for the Window operator. > >> > > > 1. For every logical operator, I emit a single physical operator > >> from > >> > the > >> > > > implementation rule, maintaining the exact 1-1 mapping. The emitted > >> > > > operators (1) have a special flag "template" which makes their const > >> > > > infinite, (2) never exposes or demands non-default traits except for > >> > > > convention, (3) have OMAKASE derivation mode. > >> > > > 2. When the input is optimized, the "derive" is called on the > >> template, > >> > > > which produces the concrete physical tree, that is not necessarily > >> 1-1 > >> > to > >> > > > the original logical node. > >> > > > > >> > > > Before rule: > >> > > > LogicalAggregate[group=$0, agg=SUM($1)] > >> > > > LogicalInput > >> > > > > >> > > > After rule: > >> > > > PhysicalAggregate[group=$0, agg=SUM($1), template=true, > >> cost=infinite] > >> > > > LogicalInput > >> > > > > >> > > > After "derive" if the input is not shared on $0: > >> > > > PhysicalAggregate[group=$0, agg=SUM($1)] > >> > > > Exchange > >> > > > PhysicalAggregate[group=$0, agg=SUM($1)] > >> > > > PhysicalInputNotSharded > >> > > > > >> > > > After "derive" if the input is shared on $0: > >> > > > PhysicalAggregate[group=$0, agg=SUM($1)] > >> > > > PhysicalInputNotSharded > >> > > > > >> > > > This approach allows me to avoid the generation of unnecessary > >> > alternatives > >> > > > by delaying the optimization to derive phase. The aggregate split is > >> > > > implemented in rules in Dremio/Flink, but in my case, this logic > >> > migrates > >> > > > to "derive". > >> > > > > >> > > > This solution worked well for the whole TPC-DS suite until we > >> wanted to > >> > > > optimize combinations of operators rather than individual > >> operators. A > >> > good > >> > > > example is TPC-DS query 1 [3]. During the logical optimization, we > >> get > >> > the > >> > > > following logical tree, which is exactly the case that I > >> demonstrated > >> > at > >> > > > the beginning of this mail thread: > >> > > > G1: Aggregate(groupBy=[ctr_store_sk]) > >> > > > G2: Aggregate(groupBy=[ctr_customer_sk, ctr_store_sk] > >> > > > > >> > > > And this is where I got stuck. I need to do a simple thing - > >> propagate > >> > an > >> > > > optimization request from G1 to G2, informing G2 that it should > >> > consider > >> > > > the distribution [ctr_store_sk]. I can deliver that request to my > >> > physical > >> > > > template in G2 through "convert". But the problem is that the > >> current > >> > > > Calcite implementation doesn't allow me to satisfy this request > >> later > >> > on in > >> > > > the derivation stage. Instead, I am forced to emit the final > >> execution > >> > tree > >> > > > from the "passThrough" method, which will not be notified at the > >> > derivation > >> > > > stage. I prepared a scheme [4] that demonstrates the problem. > >> > > > > >> > > > It feels that I almost achieved what I need. The last step is to > >> ensure > >> > > > that "derive" is called on the newly created template. And this is > >> > where I > >> > > > think I reach the inflexibility of the current top-down optimizer > >> > > > implementation. The current design forces us to define all possible > >> > > > structures of physical operators in advance, but I want to delay the > >> > > > decision to the derive stage when input traits are known because > >> these > >> > > > traits are essential to make the proper physical decisions. > >> > > > > >> > > > There are some similarities with Haisheng's proposal about the > >> Window > >> > > > operator. We also maintain the 1-1 correspondence between the > >> logical > >> > > > operator and a physical template. However, Haisheng's proposal is > >> > basically > >> > > > heuristic, as we split optimization into two phases (implementation, > >> > > > post-processing). It is impossible to properly calculate the cost of > >> > the > >> > > > Window operator because we do not know which exchanges would be > >> needed > >> > > > before the post-processing. In my case, we do the proper cost > >> > estimation > >> > > > within a single expanded MEMO. > >> > > > > >> > > > Now switching to theoretical considerations. We may make several > >> > > > observations from the previous discussion: > >> > > > 1) Our ideas converge to the solution where every logical operator > >> has > >> > a > >> > > > single corresponding physical operator, which is later expanded into > >> > more > >> > > > alternatives. > >> > > > 2) Optimization requests are basically sent to RelSet-s, not > >> > RelSubset-s, > >> > > > as we make pairwise comparisons between the requested RelSubset and > >> > other > >> > > > subsets in the set [5][6]. > >> > > > 3) Irrespective of the design, the complete exploration requires > >> > multiple > >> > > > invocations of some implementation logic for different combinations > >> of > >> > > > required traits and available input traits. > >> > > > > >> > > > These observations led me to think that maybe trait propagation > >> through > >> > > > some dedicated nodes (templates in my case and Haisheng's Window > >> > proposal, > >> > > > or pessimistically emitted physical nodes in the previous > >> > Jinpeng/Haisheng > >> > > > proposal) is not the ideal design, at least for some cases. > >> > > > > >> > > > From the design standpoint, we propagate traits top-down and > >> bottom-up > >> > > > across equivalence groups, not individual RelSubset-s or RelNode-s. > >> > > > Currently, we ignore the optimization context when optimizing the > >> group > >> > > > (except for the cost pruning). Rules emit partially constructed > >> nodes > >> > since > >> > > > neither parent requirements nor child traits are available to the > >> rule. > >> > > > > >> > > > Instead, there could exist a true guided top-down optimization flow > >> > when > >> > > > the "guided" term applies to rules as well: > >> > > > 1. Pass-through: RelSet receives an optimization request and chooses > >> > > > appropriate implementation rules to fire. A rule receives > >> optimization > >> > > > requests, constructs optimization requests for children (adjusting > >> > traits, > >> > > > optimization budget, etc.), then sends these requests down. The > >> process > >> > > > repeated recursively until we either reach the bottom node or some > >> set > >> > that > >> > > > is already optimized for this request. > >> > > > 3. Derive: given the now known input traits, emit appropriate > >> physical > >> > > > nodes from the rule. Then notify the parent. Repeat the process > >> > recursively. > >> > > > > >> > > > For common use cases, this design would require the same logic, > >> which > >> > is > >> > > > currently split between rules, "derive" and "passThrough", just the > >> > code > >> > > > location will be different, as everything now converges to the rule. > >> > But > >> > > > for the advanced use cases, that approach may allow for more > >> flexible > >> > > > optimization patterns, like for these two chained aggregates. > >> > > > > >> > > > I'll try to implement both solutions - (1) emit multiple nodes from > >> a > >> > > > physical rule, and (2) enable derivation for some nodes emitted from > >> > > > "passThrough", and share the results here. > >> > > > > >> > > > Regards, > >> > > > Vladimir. > >> > > > > >> > > > [1] > >> > > > > >> > > >> https://github.com/dremio/dremio-oss/blob/8e85901e7222c81ccad3436ba9b63485503472ac/sabot/kernel/src/main/java/com/dremio/exec/planner/physical/HashAggPrule.java#L123-L146 > >> > > > [2] > >> > > > > >> > > >> https://github.com/apache/flink/blob/release-1.13.0/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/batch/BatchPhysicalHashAggRule.scala#L101-L147 > >> > > > [3] https://github.com/Agirish/tpcds/blob/master/query1.sql > >> > > > [4] > >> > > > > >> > > >> https://docs.google.com/document/d/1u7V4bNp51OjytKnS2kzD_ikHLiNUPbhjZfRjkTfdsDA/edit > >> > > > [5] > >> > > > > >> > > >> https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/RelSet.java#L196 > >> > > > [6] > >> > > > > >> > > >> https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java#L203 > >> > > > > >> > > > пт, 28 мая 2021 г. в 00:10, Haisheng Yuan <hy...@apache.org>: > >> > > > > >> > > > > Getting back to your window query example: > >> > > > > > >> > > > > > Consider the Window function: > >> > > > > > SELECT > >> > > > > > AGG1 over (partition by a), > >> > > > > > AGG2 over (partition by b), > >> > > > > > AGG3 over (partition by c), > >> > > > > > ... > >> > > > > > FROM input > >> > > > > > >> > > > > Window is quite special because the logical vs physical operator > >> > count is > >> > > > > not 1 to 1, generally we generate a physical window operator for > >> each > >> > > > > window function with different partition column. That determines > >> > that once > >> > > > > the physical operators are created, their order can't be changed. > >> > Hence > >> > > > > your proposal of passing required traits to physical rule can > >> > mitigate the > >> > > > > problem. > >> > > > > > >> > > > > But things would be much easier if we define a different physical > >> > window > >> > > > > operator. > >> > > > > For the above query, we can generate the *Single* physical window > >> > operator > >> > > > > like this: > >> > > > > PhysicalWindow[AGG1 over (partition by a), AGG2 over (partition by > >> > b), > >> > > > > AGG3 over (partition by c)] > >> > > > > or PhysicalWindow(a, b, c) for brevity. > >> > > > > How do we define the physical properties for it? > >> > > > > The operator delivers hash distribution on first window partition > >> > column > >> > > > > a, but requires its child input to be hash distributed by its last > >> > window > >> > > > > partition column c. > >> > > > > > >> > > > > If the parent operator request hash distribution on b, or c, the > >> > window > >> > > > > operator will be called on "passthrough" method and generate > >> > > > > PhysicalWindow(b, a, c), or PhysicalWindow(c, a, b). After final > >> > plan is > >> > > > > generated, during post processing, we can replace the window > >> > operator with > >> > > > > multiple layer nested window operators, and insert Exchange > >> > operators if > >> > > > > necessary. But frankly speaking, I haven't seen any use cases of > >> > this kind > >> > > > > in production. > >> > > > > > >> > > > > Regarding the rule alternative you proposed; > >> > > > > > class PhysicalAggregateRule extends PhysicalRule { > >> > > > > > void onMatch(RelOptRuleCall call, *RelTraitSet requiredTraits*) > >> > {... > >> > > > > > >> > > > > Consider the following plan: > >> > > > > InnerJoin (on a) > >> > > > > +-- Agg (on b) > >> > > > > +-- Scan > >> > > > > > >> > > > > For the inner join, we can generate sort merge join and hash join. > >> > > > > The sort merge join can request the following traits to Agg: > >> > > > > 1) Singleton > >> > > > > 2) hash distribution on a, sorted by a > >> > > > > The hash join can request the following traits to Agg: > >> > > > > 1) Singleton > >> > > > > 2) hash distribution on a > >> > > > > 3) any distribution > >> > > > > 4) broadcast distribution > >> > > > > > >> > > > > The PhysicalAggregateRule will be called and executed 5 times, > >> while > >> > > > > generating the same physical aggregate candidates, unless we pass > >> a > >> > whole > >> > > > > list of required traits to the physical rule, which I have > >> > prototyped some > >> > > > > time ago with the exact idea. > >> > > > > > >> > > > > Regards, > >> > > > > Haisheng Yuan > >> > > > > > >> > > > > On 2021/05/27 17:44:23, Haisheng Yuan <hy...@apache.org> wrote: > >> > > > > > > In distributed systems, an implementation rule may produce > >> > different > >> > > > > > > physical operators depending on the input traits. Examples > >> are > >> > > > > Aggregate, > >> > > > > > > Sort, Window. > >> > > > > > > >> > > > > > No, in most cases, physical operators are generated regardless > >> the > >> > > > > input, because the input traits are not know yet. Window might be > >> an > >> > > > > exception. > >> > > > > > > >> > > > > > > Since input traits are not known when the rule is fired, we > >> > must > >> > > > > > > generate *all possible combinations* of physical operators > >> > that we > >> > > > > may > >> > > > > > > need. For LogicalAggregate, we must generate 1-phase and > >> > 2-phase > >> > > > > > > alternatives. For LogicalSort, we also have 1-phase and > >> > 2-phase > >> > > > > > > alternatives. Etc. > >> > > > > > > >> > > > > > IMHO, 1 phase and 2 phase are just different logical > >> alternatives, > >> > that > >> > > > > is also why I call it a logical rule to split the aggregate into > >> a 2 > >> > phase > >> > > > > aggregate. But HashAggregate and StreamAggregate are indeed the > >> > different > >> > > > > physical alternatives for a LogicalAggregate. > >> > > > > > > >> > > > > > > >> > > > > > > Unlike Aggregate or Sort, which may have only 1 or 2 phases, > >> > certain > >> > > > > > > logical operators may have many physical alternatives. > >> > Consider the > >> > > > > Window > >> > > > > > > function:...... > >> > > > > > > >> > > > > > In window implementation rule, when building physical operator > >> for > >> > > > > Window that has multiple window functions but with different > >> > partition > >> > > > > columns, we can infer the possible traits that can be delivered by > >> > input > >> > > > > operators by creating your own RelMetaData, hence multiple window > >> > > > > combination with certain order, but not exhausted enumeration. In > >> > fact, the > >> > > > > window ordering problem exists in every different kind of > >> optimizer. > >> > > > > > > >> > > > > > > As input traits are not known when the rule is fired, the > >> nodes > >> > emitted > >> > > > > > > from the implementation rules most likely would not be used in > >> > the > >> > > > > final > >> > > > > > > plan. > >> > > > > > > >> > > > > > That is quite normal, any operator generated by implementation > >> rule > >> > > > > might not be used in the final plan, because there may be tens of > >> > thousands > >> > > > > of alternatives, we only choose the one with lowest cost. > >> > > > > > > >> > > > > > > For example, I can create a physical aggregate that demands > >> > > > > > > non-strict distribution {a,b} from its input, meaning that > >> both > >> > [a,b] > >> > > > > and > >> > > > > > > [b,a] is ok. However, in the final plan, we are obligated to > >> > have a > >> > > > > strict > >> > > > > > > distribution - either [a, b] in that order, or [b, a] in that > >> > order - > >> > > > > > > otherwise, physical operators like Join and Union will not > >> work. > >> > > > > > > >> > > > > > It depends on your own satisfaction model and how do you > >> coordinate > >> > > > > property requirement among child operators. Unlike Orca optimizer, > >> > where > >> > > > > there is exact match, partial satisfying, orderless match etc, > >> > Calcite's > >> > > > > default implementation always require exact satisfying. But we can > >> > still > >> > > > > make use of "passThrough" and "derive" to achieve our goal. i.e. > >> the > >> > > > > aggregate generated by implementation rule requires itself and its > >> > child to > >> > > > > delivered distribution on [a,b], but the "derive" method tells > >> > Aggregate > >> > > > > that [b,a] is available, it can generate another option to require > >> > [b,a] > >> > > > > instead. > >> > > > > > > >> > > > > > > In distributed engines, the nodes emitted from rules are > >> > basically > >> > > > > "templates" > >> > > > > > > that must be replaced with normal nodes. > >> > > > > > > >> > > > > > There is no difference between distributed and non-distributed > >> > engines > >> > > > > when dealing with this. In Orca and CockroachDB optimizer, the > >> nodes > >> > > > > emitted from rules are operators without physical properties, the > >> > optimizer > >> > > > > then request physical properties in top-down manner, either > >> > recursively or > >> > > > > stack, or state machine. Calcite is quite different. when the > >> > physical > >> > > > > operator is generated by implementation rule, the physical > >> operator > >> > must > >> > > > > has its own traits, at the same time, the traits that it expects > >> its > >> > child > >> > > > > operators to deliver. So in Calcite, they are not "templates". The > >> > > > > difference is there since Calcite's inception. > >> > > > > > > >> > > > > > Regards, > >> > > > > > Haisheng Yuan > >> > > > > > > >> > > > > > On 2021/05/27 08:59:33, Vladimir Ozerov <ppoze...@gmail.com> > >> > wrote: > >> > > > > > > Hi Haisheng, > >> > > > > > > > >> > > > > > > Thank you for your inputs. They are really helpful. Let me > >> > summarize > >> > > > > your > >> > > > > > > feedback in my own words to verify that I understand it > >> > correctly. > >> > > > > > > > >> > > > > > > 1. In distributed systems, an implementation rule may > >> produce > >> > > > > different > >> > > > > > > physical operators depending on the input traits. Examples > >> are > >> > > > > Aggregate, > >> > > > > > > Sort, Window. > >> > > > > > > 2. Since input traits are not known when the rule is fired, > >> > we must > >> > > > > > > generate *all possible combinations* of physical operators > >> > that we > >> > > > > may > >> > > > > > > need. For LogicalAggregate, we must generate 1-phase and > >> > 2-phase > >> > > > > > > alternatives. For LogicalSort, we also have 1-phase and > >> > 2-phase > >> > > > > > > alternatives. Etc. > >> > > > > > > 3. If all combinations are generated, it is expected that > >> > > > > "passThrough" > >> > > > > > > and "derive" would be just trivial replacements of traits > >> for > >> > most > >> > > > > cases. > >> > > > > > > This is why "passThroughTraits" and "deriveTraits" are > >> > recommended. > >> > > > > A > >> > > > > > > notable exception is TableScan that may emit alternative > >> > indexes in > >> > > > > > > response to the pass-through requests. > >> > > > > > > > >> > > > > > > If my understanding is correct, then there are several issues > >> > with this > >> > > > > > > approach still. > >> > > > > > > > >> > > > > > > 1. Unlike Aggregate or Sort, which may have only 1 or 2 > >> phases, > >> > certain > >> > > > > > > logical operators may have many physical alternatives. > >> Consider > >> > the > >> > > > > Window > >> > > > > > > function: > >> > > > > > > SELECT > >> > > > > > > AGG1 over (partition by a), > >> > > > > > > AGG2 over (partition by b), > >> > > > > > > AGG3 over (partition by c), > >> > > > > > > ... > >> > > > > > > FROM input > >> > > > > > > > >> > > > > > > To calculate each aggregate, we need to re-shuffle the input > >> > based on > >> > > > > the > >> > > > > > > partition key. The key question is the order of reshuffling. > >> If > >> > the > >> > > > > input > >> > > > > > > is shared by [a], I want to calculate AGG1 locally and then > >> > re-shuffle > >> > > > > the > >> > > > > > > input to calculate other aggregates. For the remaining AGG2 > >> and > >> > AGG3, > >> > > > > the > >> > > > > > > order is also important. If the parent demands sharding by > >> [b], > >> > then > >> > > > > the > >> > > > > > > proper sequence is b-c-a: > >> > > > > > > 1: Window[AGG2 over (partition by b)] // SHARDED[b] > >> > > > > > > 2: Window[AGG3 over (partition by c)] // SHARDED[c] > >> > > > > > > 3: Window[AGG1 over (partition by a)] // SHARDED[a] > >> > > > > > > 4: Input // SHARDED[a] > >> > > > > > > > >> > > > > > > But if the parent demands [c], the proper sequence is c-b-a. > >> > Since we > >> > > > > do > >> > > > > > > not know real distributions when the rule is fired, we must > >> emit > >> > all > >> > > > > the > >> > > > > > > permutations to ensure that no optimization opportunity is > >> > missed. But > >> > > > > with > >> > > > > > > complex window aggregate, this might be impractical because we > >> > will > >> > > > > emit > >> > > > > > > lots of unnecessary nodes. > >> > > > > > > > >> > > > > > > 2. As input traits are not known when the rule is fired, the > >> > nodes > >> > > > > emitted > >> > > > > > > from the implementation rules most likely would not be used in > >> > the > >> > > > > final > >> > > > > > > plan. For example, I can create a physical aggregate that > >> demands > >> > > > > > > non-strict distribution {a,b} from its input, meaning that > >> both > >> > [a,b] > >> > > > > and > >> > > > > > > [b,a] is ok. However, in the final plan, we are obligated to > >> > have a > >> > > > > strict > >> > > > > > > distribution - either [a, b] in that order, or [b, a] in that > >> > order - > >> > > > > > > otherwise, physical operators like Join and Union will not > >> work. > >> > In > >> > > > > > > distributed engines, the nodes emitted from rules are > >> basically > >> > > > > "templates" > >> > > > > > > that must be replaced with normal nodes. > >> > > > > > > > >> > > > > > > Does this reasoning make any sense? If yes, it means that the > >> > current > >> > > > > > > approach forces us to produce many unnecessary nodes to > >> explore > >> > the > >> > > > > full > >> > > > > > > search space. The question is whether alternative approaches > >> > could > >> > > > > better > >> > > > > > > fit the requirements of the distributed engine? This is a > >> purely > >> > > > > > > theoretical question. I am currently looking deeper at > >> > CockroachDB. > >> > > > > They > >> > > > > > > have very different architecture: no separation between > >> logical > >> > and > >> > > > > > > physical nodes, physical properties are completely decoupled > >> from > >> > > > > nodes, > >> > > > > > > usage of recursion instead of the stack, etc. > >> > > > > > > > >> > > > > > > Regards, > >> > > > > > > Vladimir. > >> > > > > > > > >> > > > > > > чт, 27 мая 2021 г. в 03:19, Haisheng Yuan <hy...@apache.org>: > >> > > > > > > > >> > > > > > > > Another point I would like to mention is that it is not > >> > recommended > >> > > > > to > >> > > > > > > > override method "passThrough" and "derive" directly, > >> override > >> > > > > > > > "passThroughTraits" and "deriveTraits" instead, so that we > >> can > >> > make > >> > > > > sure > >> > > > > > > > only the same type of physical node is created and no nested > >> > > > > relnodes or > >> > > > > > > > additional RelSets are created, unless you know you have to > >> > create > >> > > > > > > > different type of nodes. For example, if the table foo has > >> an > >> > btree > >> > > > > index > >> > > > > > > > on column a, and the parent relnode is requesting ordering > >> on > >> > column > >> > > > > a, > >> > > > > > > > then we may consider to override "passThrough" of TableScan > >> to > >> > > > > return an > >> > > > > > > > IndexScan instead of a TableScan. > >> > > > > > > > > >> > > > > > > > Regards, > >> > > > > > > > Haisheng Yuan > >> > > > > > > > On 2021/05/26 22:45:20, Haisheng Yuan <hy...@apache.org> > >> > wrote: > >> > > > > > > > > Hi Vladimir, > >> > > > > > > > > > >> > > > > > > > > 1. You need a logical rule to split the aggregate into a > >> > local > >> > > > > aggregate > >> > > > > > > > and global aggregate, for example: > >> > > > > > > > > > >> > > > > > > > > >> > > > > > >> > > >> https://github.com/greenplum-db/gporca/blob/master/libgpopt/src/xforms/CXformSplitGbAgg.cpp > >> > > > > > > > > Only implementation rules can convert a logical node to a > >> > physical > >> > > > > node > >> > > > > > > > or multiple physical nodes. > >> > > > > > > > > After physical implementation, you have 2 physical > >> > alternatives: > >> > > > > > > > > 1) single phase global physical aggregate, > >> > > > > > > > > 2) 2 phase physical aggregate with local and global > >> > aggregate. > >> > > > > > > > > It should be up to the cost to decide which one to choose. > >> > > > > > > > > > >> > > > > > > > > 2. Given a desired traitset from parent node, the current > >> > relnode > >> > > > > only > >> > > > > > > > needs to generate a single relnode after passing down the > >> > traitset. > >> > > > > Given a > >> > > > > > > > traitset delivered by child node, the current relnode only > >> > derive a > >> > > > > single > >> > > > > > > > relnode. Quite unlike other optimizer, in Calcite's top-down > >> > > > > optimizer, you > >> > > > > > > > don't need to worry about issuing multiple optimization > >> > requests to > >> > > > > inputs, > >> > > > > > > > which is handled by Calcite framework secretly. i.e. > >> > > > > > > > > SELECT a, b, min(c) from foo group by a, b; > >> > > > > > > > > In many other optimizer, we probably need ask the > >> aggregate > >> > to > >> > > > > issue 3 > >> > > > > > > > distribution requests for tablescan on foo, which are > >> > > > > > > > > 1) hash distributed by a, > >> > > > > > > > > 2) hash distributed by b, > >> > > > > > > > > 3) hash distributed by a, b > >> > > > > > > > > However in Calcite top-down optimizer, your physical > >> > > > > implementation rule > >> > > > > > > > for global aggregate only need generate a single physical > >> node > >> > with > >> > > > > hash > >> > > > > > > > distribution by a, b. In case the table foo happens to be > >> > > > > distributed by a, > >> > > > > > > > or b, the derive() method will tell you there is an > >> > opportunity. > >> > > > > This is > >> > > > > > > > the feature that Calcite's top-down optimizer excels over > >> other > >> > > > > optimizers, > >> > > > > > > > because this can dramatically reduce the search space while > >> > keeping > >> > > > > the > >> > > > > > > > optimal optimization opportunity. > >> > > > > > > > > > >> > > > > > > > > 3. This is by design. Nodes produced from "passThrough" > >> and > >> > > > > "derive" and > >> > > > > > > > just sibling physical node with different traitset, we only > >> > need the > >> > > > > > > > initial physical nodes after implementation to avoid > >> > unnecessary > >> > > > > > > > operations. The fundamental reason is, unlike Orca optimizer > >> > where > >> > > > > physical > >> > > > > > > > node and physical property are separate things, Calcite's > >> > > > > logical/physical > >> > > > > > > > nodes contains traitset. With regard to the latter question, > >> > can you > >> > > > > give > >> > > > > > > > an example? > >> > > > > > > > > > >> > > > > > > > > Regards, > >> > > > > > > > > Haisheng Yuan > >> > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > On 2021/05/26 20:11:57, Vladimir Ozerov < > >> ppoze...@gmail.com> > >> > > > > wrote: > >> > > > > > > > > > Hi, > >> > > > > > > > > > > >> > > > > > > > > > I tried to optimize a certain combination of operators > >> for > >> > the > >> > > > > > > > distributed > >> > > > > > > > > > engine and got stuck with the trait propagation in the > >> > top-down > >> > > > > > > > engine. I > >> > > > > > > > > > want to ask the community for advice on whether the > >> > problem is > >> > > > > solvable > >> > > > > > > > > > with the current Apache Calcite implementation or not. > >> > > > > > > > > > > >> > > > > > > > > > Consider the following logical tree: > >> > > > > > > > > > 3: LogicalAggregate[group=[a], F2(c)] > >> > > > > > > > > > 2: LogicalAggregate[group=[a,b], F1(c)] > >> > > > > > > > > > 1: LogicalScan[t] > >> > > > > > > > > > > >> > > > > > > > > > Consider that these two aggregates cannot be merged or > >> > > > > simplified for > >> > > > > > > > > > whatever reason. We have only a set of physical rules to > >> > > > > translate this > >> > > > > > > > > > logical tree to a physical tree. Also, there could be > >> any > >> > number > >> > > > > of > >> > > > > > > > > > other operators between these two aggregates. We omit > >> them > >> > for > >> > > > > clarity, > >> > > > > > > > > > assuming that the distribution is not destroyed. > >> > > > > > > > > > > >> > > > > > > > > > In the distributed environment, non-collocated > >> aggregates > >> > are > >> > > > > often > >> > > > > > > > > > implemented in two phases: local pre-aggregation and > >> final > >> > > > > aggregation, > >> > > > > > > > > > with an exchange in between. Consider that the Scan > >> > operator is > >> > > > > hash > >> > > > > > > > > > distributed by some key other than [a] or [b]. If we > >> > optimize > >> > > > > operators > >> > > > > > > > > > without considering the whole plan, we may optimize each > >> > operator > >> > > > > > > > > > independently, which would give us the following plan: > >> > > > > > > > > > 3: PhysicalAggregate[group=[a], F2_phase2(c)] > >> > // > >> > > > > > > > > > HASH_DISTRIBUTED [a] > >> > > > > > > > > > 3: Exchange[a] > >> > // > >> > > > > > > > > > HASH_DISTRIBUTED [a] > >> > > > > > > > > > 3: PhysicalAggregate[group=[a], F2_phase1(c)] > >> > // > >> > > > > > > > > > HASH_DISTRIBUTED [a,b] > >> > > > > > > > > > 2: PhysicalAggregate[group=[a,b], F1_phase2(c)] > >> > // > >> > > > > > > > > > HASH_DISTRIBUTED [a,b] > >> > > > > > > > > > 2: Exchange[a, b] > >> > // > >> > > > > > > > > > HASH_DISTRIBUTED [a,b] > >> > > > > > > > > > 2: PhysicalAggregate[group=[a,b], > >> F1_phase1(c)] > >> > // > >> > > > > > > > > > HASH_DISTRIBUTED [d] > >> > > > > > > > > > 1: PhysicalScan[t] > >> > // > >> > > > > > > > > > HASH_DISTRIBUTED [d] > >> > > > > > > > > > > >> > > > > > > > > > This plan is not optimal, because we re-hash inputs > >> twice. > >> > A > >> > > > > better > >> > > > > > > > plan > >> > > > > > > > > > that we want to get: > >> > > > > > > > > > 3: PhysicalAggregate[group=[a], F2(c)] // > >> > > > > > > > HASH_DISTRIBUTED > >> > > > > > > > > > [a] > >> > > > > > > > > > 2: PhysicalAggregate[group=[a,b], F1_phase2(c)] // > >> > > > > > > > HASH_DISTRIBUTED > >> > > > > > > > > > [a] > >> > > > > > > > > > 2: Exchange[a] // > >> > > > > > > > HASH_DISTRIBUTED > >> > > > > > > > > > [a] > >> > > > > > > > > > 2: PhysicalAggregate[group=[a,b], F1_phase1(c)] // > >> > > > > > > > HASH_DISTRIBUTED > >> > > > > > > > > > [d] > >> > > > > > > > > > 1: PhysicalScan[t] // > >> > > > > > > > HASH_DISTRIBUTED > >> > > > > > > > > > [d] > >> > > > > > > > > > > >> > > > > > > > > > In this case, we take advantage of the fact that the > >> > > > > distribution [a] > >> > > > > > > > is > >> > > > > > > > > > compatible with [a,b]. Therefore we may enforce only > >> [a], > >> > > > > instead of > >> > > > > > > > doing > >> > > > > > > > > > [a,b] and then [a]. Since exchange operators are very > >> > expensive, > >> > > > > this > >> > > > > > > > > > optimization may bring a significant boost to the query > >> > engine. > >> > > > > Now the > >> > > > > > > > > > question - how do we reach that state? Intuitively, a > >> > > > > pass-through is > >> > > > > > > > > > exactly what we need. We may pass the optimization > >> request > >> > from > >> > > > > top > >> > > > > > > > > > aggregate to bottom aggregate to find physical > >> > implementations > >> > > > > shared > >> > > > > > > > by > >> > > > > > > > > > [a]. But the devil is in the details - when and how > >> > exactly to > >> > > > > pass > >> > > > > > > > this > >> > > > > > > > > > request? > >> > > > > > > > > > > >> > > > > > > > > > Typically, we have a conversion rule that converts a > >> > logical > >> > > > > aggregate > >> > > > > > > > to a > >> > > > > > > > > > physical aggregate. We may invoke "convert" on the > >> input to > >> > > > > initiate > >> > > > > > > > the > >> > > > > > > > > > pass-through: > >> > > > > > > > > > > >> > > > > > > > > > RelNode convert(...) { > >> > > > > > > > > > return new PhysicalAggregate( > >> > > > > > > > > > convert(input, HASH_DISTRIBUTED[a]) > >> > > > > > > > > > ) > >> > > > > > > > > > } > >> > > > > > > > > > > >> > > > > > > > > > The first problem - we cannot create the normal physical > >> > > > > aggregate here > >> > > > > > > > > > because we do not know input traits yet. The final > >> decision > >> > > > > whether to > >> > > > > > > > do a > >> > > > > > > > > > one-phase or two-phase aggregate can be made only in the > >> > > > > > > > > > "PhysicalNode.derive" method when concrete input traits > >> are > >> > > > > resolved. > >> > > > > > > > > > Therefore the converter rule should create a kind of > >> > "template" > >> > > > > > > > physical > >> > > > > > > > > > operator, which would be used to construct the final > >> > operator(s) > >> > > > > when > >> > > > > > > > input > >> > > > > > > > > > traits are resolved. AFAIU Enumerable works similarly: > >> we > >> > create > >> > > > > > > > operators > >> > > > > > > > > > with virtually arbitrary traits taken from logical nodes > >> > in the > >> > > > > > > > conversion > >> > > > > > > > > > rules. We only later do create normal nodes in the > >> derive() > >> > > > > methods. > >> > > > > > > > > > > >> > > > > > > > > > The second problem - our top aggregate doesn't actually > >> > need the > >> > > > > > > > > > HASH_DISTRIBUTED[a] input. Instead, it may accept inputs > >> > with any > >> > > > > > > > > > distribution. What we really need is to inform the input > >> > (bottom > >> > > > > > > > aggregate) > >> > > > > > > > > > that it should look for additional implementations that > >> > satisfy > >> > > > > > > > > > HASH_DISTRIBUTED[a]. Therefore, enforcing a specific > >> > > > > distribution on > >> > > > > > > > the > >> > > > > > > > > > input using the "convert" method is not what we need > >> > because this > >> > > > > > > > > > conversion might enforce unnecessary exchanges. > >> > > > > > > > > > > >> > > > > > > > > > The third problem - derivation. Consider that we > >> delivered > >> > the > >> > > > > > > > optimization > >> > > > > > > > > > request to the bottom aggregate. As an implementor, what > >> > am I > >> > > > > supposed > >> > > > > > > > to > >> > > > > > > > > > do in this method? I cannot return the final aggregate > >> > from here > >> > > > > > > > because > >> > > > > > > > > > the real input traits are not derived yet. Therefore, I > >> > can only > >> > > > > return > >> > > > > > > > > > another template, hoping that the "derive" method will > >> be > >> > called > >> > > > > on it. > >> > > > > > > > > > However, this will not happen because trait derivation > >> is > >> > > > > skipped on > >> > > > > > > > the > >> > > > > > > > > > nodes emitted from pass-through. See > >> "DeriveTrait.perform" > >> > [1]. > >> > > > > > > > > > > >> > > > > > > > > > BottomAggregate { > >> > > > > > > > > > RelNode > >> passThrough(distribution=HASH_DISTRIBUTED[a]) { > >> > > > > > > > > > // ??? > >> > > > > > > > > > } > >> > > > > > > > > > } > >> > > > > > > > > > > >> > > > > > > > > > I feel that I am either going in the wrong direction, or > >> > some > >> > > > > gaps in > >> > > > > > > > the > >> > > > > > > > > > product disallow such optimization. So I would like to > >> ask > >> > the > >> > > > > > > > community to > >> > > > > > > > > > assist with the following questions: > >> > > > > > > > > > 1. In the top-down optimizer, how should we convert a > >> > logical > >> > > > > node to a > >> > > > > > > > > > physical node, provided that "derive" is not called > >> yet? I > >> > have > >> > > > > a gut > >> > > > > > > > > > feeling that the trait propagation is currently not > >> > implemented > >> > > > > to the > >> > > > > > > > full > >> > > > > > > > > > extent because based on Cascades paper I would expect > >> that > >> > parent > >> > > > > > > > physical > >> > > > > > > > > > nodes are produced after the child physical nodes. But > >> in > >> > our > >> > > > > rules, > >> > > > > > > > this > >> > > > > > > > > > is not the case - some physical nodes are produced > >> before > >> > the > >> > > > > trait > >> > > > > > > > > > derivation. > >> > > > > > > > > > 2. How to propagate several optimization requests to > >> > inputs? We > >> > > > > need > >> > > > > > > > either > >> > > > > > > > > > inputs with a specific distribution or inputs with an > >> > arbitrary > >> > > > > > > > > > distribution in the example above. It seems that to > >> achieve > >> > > > > that, I > >> > > > > > > > need to > >> > > > > > > > > > emit several alternative nodes with different > >> requirements > >> > to > >> > > > > inputs. > >> > > > > > > > Does > >> > > > > > > > > > it make sense? > >> > > > > > > > > > 3. Why are nodes produced from the "passThrough" method > >> > excluded > >> > > > > from > >> > > > > > > > trait > >> > > > > > > > > > derivation? If this is by design, how can I preserve the > >> > > > > optimization > >> > > > > > > > > > request to satisfy it on the derivation stage when input > >> > traits > >> > > > > are > >> > > > > > > > > > available? > >> > > > > > > > > > > >> > > > > > > > > > Regards, > >> > > > > > > > > > Vladimir. > >> > > > > > > > > > > >> > > > > > > > > > [1] > >> > > > > > > > > > > >> > > > > > > > > >> > > > > > >> > > >> https://github.com/apache/calcite/blob/calcite-1.26.0/core/src/main/java/org/apache/calcite/plan/volcano/TopDownRuleDriver.java#L828 > >> > > > > > > > > > > >> > > > > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > >> > > > > > >> > > > > >> > > > >> > > >> > > >