I'm not sure how you got the conclusion that LoptOptmizeJoinRule would not
produce bushy tree join plan.  I just tried with tpch Q5 and Q10 on the
sample dataset, and seems that the plans that I got are not left-deep join
tree. ( I could not upload an image to show the visualized plan for those
two queries).

My impression is that LoptOptimizeJoinRule would produce bushy tree and
left-deep join tree.

For example, here is the plan for Q5. Operator 00-07 HashJoin has two
inputs from two HashJoins.

00-00    Screen : rowType = RecordType(ANY n_name, ANY revenue):
rowcount = 6017.5, cumulative cost = {358148.75 rows, 4407383.86090918
cpu, 0.0 io, 0.0 network, 1250857.6 memory}, id = 10943
00-01      Project(n_name=[$0], revenue=[$1]) : rowType =
RecordType(ANY n_name, ANY revenue): rowcount = 6017.5, cumulative
cost = {357547.0 rows, 4406782.11090918 cpu, 0.0 io, 0.0 network,
1250857.6 memory}, id = 10942
00-02        SelectionVectorRemover : rowType = RecordType(ANY n_name,
ANY revenue): rowcount = 6017.5, cumulative cost = {357547.0 rows,
4406782.11090918 cpu, 0.0 io, 0.0 network, 1250857.6 memory}, id =
10941
00-03          Sort(sort0=[$1], dir0=[DESC]) : rowType =
RecordType(ANY n_name, ANY revenue): rowcount = 6017.5, cumulative
cost = {351529.5 rows, 4400764.61090918 cpu, 0.0 io, 0.0 network,
1250857.6 memory}, id = 10940
00-04            HashAgg(group=[{0}], revenue=[SUM($1)]) : rowType =
RecordType(ANY n_name, ANY revenue): rowcount = 6017.5, cumulative
cost = {345512.0 rows, 4098567.0 cpu, 0.0 io, 0.0 network, 1154577.6
memory}, id = 10939
00-05              Project(n_name=[$11], $f1=[*($7, -(1, $8))]) :
rowType = RecordType(ANY n_name, ANY $f1): rowcount = 60175.0,
cumulative cost = {285337.0 rows, 2895067.0 cpu, 0.0 io, 0.0 network,
95497.6 memory}, id = 10938
00-06                Project(c_custkey=[$7], c_nationkey=[$8],
o_custkey=[$4], o_orderkey=[$5], o_orderdate=[$6], l_orderkey=[$0],
l_suppkey=[$1], l_extendedprice=[$2], l_discount=[$3], s_suppkey=[$9],
s_nationkey=[$10], n_name=[$11], n_nationkey=[$12], n_regionkey=[$13],
r_regionkey=[$14], r_name=[$15]) : rowType = RecordType(ANY c_custkey,
ANY c_nationkey, ANY o_custkey, ANY o_orderkey, ANY o_orderdate, ANY
l_orderkey, ANY l_suppkey, ANY l_extendedprice, ANY l_discount, ANY
s_suppkey, ANY s_nationkey, ANY n_name, ANY n_nationkey, ANY
n_regionkey, ANY r_regionkey, ANY r_name): rowcount = 60175.0,
cumulative cost = {225162.0 rows, 2654367.0 cpu, 0.0 io, 0.0 network,
95497.6 memory}, id = 10937
00-07                  HashJoin(condition=[AND(=($1, $9), =($8,
$10))], joinType=[inner]) : rowType = RecordType(ANY l_orderkey, ANY
l_suppkey, ANY l_extendedprice, ANY l_discount, ANY o_custkey, ANY
o_orderkey, ANY o_orderdate, ANY c_custkey, ANY c_nationkey, ANY
s_suppkey, ANY s_nationkey, ANY n_name, ANY n_nationkey, ANY
n_regionkey, ANY r_regionkey, ANY r_name): rowcount = 60175.0,
cumulative cost = {225162.0 rows, 2654367.0 cpu, 0.0 io, 0.0 network,
95497.6 memory}, id = 10936
00-09                    HashJoin(condition=[=($5, $0)],
joinType=[inner]) : rowType = RecordType(ANY l_orderkey, ANY
l_suppkey, ANY l_extendedprice, ANY l_discount, ANY o_custkey, ANY
o_orderkey, ANY o_orderdate, ANY c_custkey, ANY c_nationkey): rowcount
= 60175.0, cumulative cost = {164600.0 rows, 1206550.0 cpu, 0.0 io,
0.0 network, 92400.0 memory}, id = 10928
00-13                      Scan(groupscan=[ParquetGroupScan
[entries=[ReadEntryWithPath [path=classpath:/tpch/lineitem.parquet]],
selectionRoot=classpath:/tpch/lineitem.parquet, numFiles=1,
usedMetadataFile=false, columns=[`l_orderkey`, `l_suppkey`,
`l_extendedprice`, `l_discount`]]]) : rowType = RecordType(ANY
l_orderkey, ANY l_suppkey, ANY l_extendedprice, ANY l_discount):
rowcount = 60175.0, cumulative cost = {60175.0 rows, 240700.0 cpu, 0.0
io, 0.0 network, 0.0 memory}, id = 10922
00-12                      HashJoin(condition=[=($3, $0)],
joinType=[inner]) : rowType = RecordType(ANY o_custkey, ANY
o_orderkey, ANY o_orderdate, ANY c_custkey, ANY c_nationkey): rowcount
= 3750.0, cumulative cost = {40500.0 rows, 213750.0 cpu, 0.0 io, 0.0
network, 26400.000000000004 memory}, id = 10927
00-17                        SelectionVectorRemover : rowType =
RecordType(ANY o_custkey, ANY o_orderkey, ANY o_orderdate): rowcount =
3750.0, cumulative cost = {33750.0 rows, 153750.0 cpu, 0.0 io, 0.0
network, 0.0 memory}, id = 10925
00-19                          Filter(condition=[AND(>=($2,
1997-01-01), <($2, 1998-01-01 00:00:00))]) : rowType = RecordType(ANY
o_custkey, ANY o_orderkey, ANY o_orderdate): rowcount = 3750.0,
cumulative cost = {30000.0 rows, 150000.0 cpu, 0.0 io, 0.0 network,
0.0 memory}, id = 10924
00-21                            Scan(groupscan=[ParquetGroupScan
[entries=[ReadEntryWithPath [path=classpath:/tpch/orders.parquet]],
selectionRoot=classpath:/tpch/orders.parquet, numFiles=1,
usedMetadataFile=false, columns=[`o_custkey`, `o_orderkey`,
`o_orderdate`]]]) : rowType = RecordType(ANY o_custkey, ANY
o_orderkey, ANY o_orderdate): rowcount = 15000.0, cumulative cost =
{15000.0 rows, 45000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id =
10923
00-16                        Scan(groupscan=[ParquetGroupScan
[entries=[ReadEntryWithPath [path=classpath:/tpch/customer.parquet]],
selectionRoot=classpath:/tpch/customer.parquet, numFiles=1,
usedMetadataFile=false, columns=[`c_custkey`, `c_nationkey`]]]) :
rowType = RecordType(ANY c_custkey, ANY c_nationkey): rowcount =
1500.0, cumulative cost = {1500.0 rows, 3000.0 cpu, 0.0 io, 0.0
network, 0.0 memory}, id = 10926
00-08                    HashJoin(condition=[=($1, $3)],
joinType=[inner]) : rowType = RecordType(ANY s_suppkey, ANY
s_nationkey, ANY n_name, ANY n_nationkey, ANY n_regionkey, ANY
r_regionkey, ANY r_name): rowcount = 100.0, cumulative cost = {287.0
rows, 2017.0 cpu, 0.0 io, 0.0 network, 457.6000000000001 memory}, id =
10935
00-11                      Scan(groupscan=[ParquetGroupScan
[entries=[ReadEntryWithPath [path=classpath:/tpch/supplier.parquet]],
selectionRoot=classpath:/tpch/supplier.parquet, numFiles=1,
usedMetadataFile=false, columns=[`s_suppkey`, `s_nationkey`]]]) :
rowType = RecordType(ANY s_suppkey, ANY s_nationkey): rowcount =
100.0, cumulative cost = {100.0 rows, 200.0 cpu, 0.0 io, 0.0 network,
0.0 memory}, id = 10929
00-10                      HashJoin(condition=[=($2, $3)],
joinType=[inner]) : rowType = RecordType(ANY n_name, ANY n_nationkey,
ANY n_regionkey, ANY r_regionkey, ANY r_name): rowcount = 25.0,
cumulative cost = {62.0 rows, 417.0 cpu, 0.0 io, 0.0 network, 17.6
memory}, id = 10934
00-15                        Scan(groupscan=[ParquetGroupScan
[entries=[ReadEntryWithPath [path=classpath:/tpch/nation.parquet]],
selectionRoot=classpath:/tpch/nation.parquet, numFiles=1,
usedMetadataFile=false, columns=[`n_name`, `n_nationkey`,
`n_regionkey`]]]) : rowType = RecordType(ANY n_name, ANY n_nationkey,
ANY n_regionkey): rowcount = 25.0, cumulative cost = {25.0 rows, 75.0
cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 10930
00-14                        SelectionVectorRemover : rowType =
RecordType(ANY r_regionkey, ANY r_name): rowcount = 1.0, cumulative
cost = {11.0 rows, 34.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id =
10933
00-18                          Filter(condition=[=($1, 'EUROPE')]) :
rowType = RecordType(ANY r_regionkey, ANY r_name): rowcount = 1.0,
cumulative cost = {10.0 rows, 33.0 cpu, 0.0 io, 0.0 network, 0.0
memory}, id = 10932
00-20                            Scan(groupscan=[ParquetGroupScan
[entries=[ReadEntryWithPath [path=classpath:/tpch/region.parquet]],
selectionRoot=classpath:/tpch/region.parquet, numFiles=1,
usedMetadataFile=false, columns=[`r_regionkey`, `r_name`]]]) : rowType
= RecordType(ANY r_regionkey, ANY r_name): rowcount = 5.0, cumulative
cost = {5.0 rows, 10.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id =
10931


On Mon, May 27, 2019 at 8:23 PM weijie tong <[email protected]> wrote:

> Thanks for the answer. The blog[1] from hive shows that a optimal bushy
> tree plan could give a better query performance.At the bushy join case, it
> will make the more build side of hash join nodes works parallel  also with
> reduced intermediate data size.  To the worry about plan time cost, most
> bushy join query optimization use the heuristic planner [2] to identify the
> pattern matches the bushy join to reduce the tree space(That's also what
> calcite does).  I wonder whether we can replace the   LoptOptimizeJoinRule
> with MultiJoinOptimizeBushyRule.
>
> [1]
>
> https://hortonworks.com/blog/hive-0-14-cost-based-optimizer-cbo-technical-overview/
> [2] http://www.vldb.org/pvldb/vol9/p1401-chen.pdf
>
> On Tue, May 28, 2019 at 5:48 AM Paul Rogers <[email protected]>
> wrote:
>
> > Hi All,
> >
> > Weijie, do you have some example plans that would appear to be
> > sub-optimal, and would be improved with a bushy join plan? What
> > characteristic of the query or schema causes the need for a busy plan?
> >
> > FWIW, Impala uses a compromise approach: it evaluates left-deep plans,
> > then will "flip" a join if the build side turns out to be larger than the
> > probe side. This may just be an artifact of Impala's cost model which is
> > designed for star schemas, looks only one step ahead, and struggles with
> > queries that do not fit the pattern. (Impala especially struggles with
> > multi-key joins and correlated filters on joined tables.) But, since the
> > classic data warehouse use case tends to have simple star schemas, the
> > Impala approach works pretty well in practice. (Turns out that Snowflake,
> > in their paper, claims to do something similar. [1])
> >
> > On the other hand, it might be that Calcite, because it uses a true cost
> > model, already produces optimal plans and the join-flip trick is
> > unnecessary.
> >
> > A case where this trick seemed to help is the idea of joining two fact
> > tables, each of which is filtered via dimension tables. Making something
> up:
> >
> > - join on itemid
> >   - join on sales.stateid = state.id
> >     - state table where state.name = "CA"
> >     - sales
> >  - join on returns.reasonId = reason.id
> >     - reason table where reason.name = "defective"
> >     - returns
> >
> >
> > That is, we have large fact tables for sales and returns. We filter both
> > using a dimension table. Then, we join the (greatly reduced) fact data
> sets
> > on the item ID. A left-deep play will necessarily be less efficient
> because
> > of the need to move an entire fact set though a join. (Though the JPPD
> > feature might reduce the cost by filtering early.)
> >
> >
> > In any event, it would be easy to experiment with this idea in Drill.
> > Drill already has several post-Calcite rule sets. It might be fairly easy
> > to add one that implements the join-flip case. Running this experiment
> on a
> > test workload would identify if the rule is ever needed, and if it is
> > triggered, if the result improves performance.
> >
> >
> > Thanks,
> > - Paul
> >
> > [1] http://info.snowflake.net/rs/252-RFO-227/images/Snowflake_SIGMOD.pdf
> >
> >
> >     On Monday, May 27, 2019, 2:04:29 PM PDT, Aman Sinha <
> > [email protected]> wrote:
> >
> >  Hi Weijie,
> > As you might imagine Busy joins have pros and cons compared to Left-deep
> > only plans:  The main pro is that they enumerate a lot more plan choices
> > such that the planner is likely to find the optimal join order.  On the
> > other hand, there are significant cons: (a) by enumerating more join
> > orders, they would substantially increase planning time (depending on the
> > number of tables).  (b) the size of the intermediate results produced by
> > the join must be accurately estimated in order to avoid situations where
> > hash join build side turns out to be orders of magnitude more than
> > estimated.  This could happen easily in big data systems where statistics
> > are constantly changing due to new data ingestion and even running
> ANALYZE
> > continuously is not feasible.
> > That said, it is not a bad idea to experiment with such plans with say
> more
> > than 5 table joins and compare with left-deep plans.
> >
> > Aman
> >
> > On Mon, May 27, 2019 at 7:00 AM weijie tong <[email protected]>
> > wrote:
> >
> > > Hi all:
> > >  Does anyone know why we don't support bushy join in the query plan
> > > generation while hep planner is enabled. The codebase shows the fact
> that
> > > the PlannerPhase.JOIN_PLANNING use the LoptOptimizeJoinRule not
> calcite's
> > > MultiJoinOptimizeBushyRule.
> > >
> >
>

Reply via email to