alamb commented on code in PR #5770: URL: https://github.com/apache/arrow-datafusion/pull/5770#discussion_r1152250301
##########
benchmarks/expected-plans/q17.txt:
##########
@@ -1,55 +1,49 @@
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| plan_type | plan
|
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| logical_plan | Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) /
Float64(7) AS avg_yearly
|
-| | Aggregate: groupBy=[[]],
aggr=[[SUM(lineitem.l_extendedprice)]]
|
-| | Projection: lineitem.l_extendedprice
|
-| | Filter: CAST(lineitem.l_quantity AS Decimal128(30,
15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15)) AND
__scalar_sq_1.l_partkey = lineitem.l_partkey |
-| | Projection: lineitem.l_partkey, lineitem.l_quantity,
lineitem.l_extendedprice, __scalar_sq_1.l_partkey, __scalar_sq_1.__value
|
-| | Inner Join: part.p_partkey =
__scalar_sq_1.l_partkey
|
-| | Filter: part.p_partkey = lineitem.l_partkey AND
lineitem.l_partkey = part.p_partkey
|
-| | Inner Join: lineitem.l_partkey =
part.p_partkey
|
-| | TableScan: lineitem projection=[l_partkey,
l_quantity, l_extendedprice]
|
-| | Projection: part.p_partkey
|
-| | Filter: part.p_brand = Utf8("Brand#23")
AND part.p_container = Utf8("MED BOX")
|
-| | TableScan: part projection=[p_partkey,
p_brand, p_container]
|
-| | SubqueryAlias: __scalar_sq_1
|
-| | Projection: lineitem.l_partkey, Float64(0.2) *
CAST(AVG(lineitem.l_quantity) AS Float64) AS __value
|
-| | Aggregate: groupBy=[[lineitem.l_partkey]],
aggr=[[AVG(lineitem.l_quantity)]]
|
-| | TableScan: lineitem projection=[l_partkey,
l_quantity]
|
-| physical_plan | ProjectionExec: expr=[CAST(SUM(lineitem.l_extendedprice)@0
AS Float64) / 7 as avg_yearly]
|
-| | AggregateExec: mode=Final, gby=[],
aggr=[SUM(lineitem.l_extendedprice)]
|
-| | CoalescePartitionsExec
|
-| | AggregateExec: mode=Partial, gby=[],
aggr=[SUM(lineitem.l_extendedprice)]
|
-| | ProjectionExec: expr=[l_extendedprice@2 as
l_extendedprice]
|
-| | CoalesceBatchesExec: target_batch_size=8192
|
-| | FilterExec: CAST(l_quantity@1 AS Decimal128(30,
15)) < CAST(__value@4 AS Decimal128(30, 15)) AND l_partkey@3 = l_partkey@0
|
-| | ProjectionExec: expr=[l_partkey@0 as
l_partkey, l_quantity@1 as l_quantity, l_extendedprice@2 as l_extendedprice,
l_partkey@4 as l_partkey, __value@5 as __value] |
-| | CoalesceBatchesExec: target_batch_size=8192
|
-| | HashJoinExec: mode=Partitioned,
join_type=Inner, on=[(Column { name: "p_partkey", index: 3 }, Column { name:
"l_partkey", index: 0 })] |
-| | CoalesceBatchesExec:
target_batch_size=8192
|
-| | RepartitionExec:
partitioning=Hash([Column { name: "p_partkey", index: 3 }], 2),
input_partitions=2 |
-| | CoalesceBatchesExec:
target_batch_size=8192
|
-| | FilterExec: p_partkey@3 =
l_partkey@0 AND l_partkey@0 = p_partkey@3
|
-| | RepartitionExec:
partitioning=RoundRobinBatch(2), input_partitions=2
|
-| | CoalesceBatchesExec:
target_batch_size=8192
|
-| | HashJoinExec:
mode=Partitioned, join_type=Inner, on=[(Column { name: "l_partkey", index: 0 },
Column { name: "p_partkey", index: 0 })] |
-| | CoalesceBatchesExec:
target_batch_size=8192
|
-| | RepartitionExec:
partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2),
input_partitions=0 |
-| | MemoryExec:
partitions=0, partition_sizes=[]
|
-| | CoalesceBatchesExec:
target_batch_size=8192
|
-| | RepartitionExec:
partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2),
input_partitions=2 |
-| | RepartitionExec:
partitioning=RoundRobinBatch(2), input_partitions=0
|
-| | ProjectionExec:
expr=[p_partkey@0 as p_partkey]
|
-| |
CoalesceBatchesExec: target_batch_size=8192
|
-| | FilterExec:
p_brand@1 = Brand#23 AND p_container@2 = MED BOX
|
-| | MemoryExec:
partitions=0, partition_sizes=[]
|
-| | ProjectionExec: expr=[l_partkey@0 as
l_partkey, 0.2 * CAST(AVG(lineitem.l_quantity)@1 AS Float64) as __value]
|
-| | AggregateExec: mode=FinalPartitioned,
gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)]
|
-| | CoalesceBatchesExec:
target_batch_size=8192
|
-| | RepartitionExec:
partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2),
input_partitions=2 |
-| | RepartitionExec:
partitioning=RoundRobinBatch(2), input_partitions=0
|
-| | AggregateExec: mode=Partial,
gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)]
|
-| | MemoryExec: partitions=0,
partition_sizes=[]
|
-| |
|
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| plan_type | plan
|
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| logical_plan | Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) /
Float64(7) AS avg_yearly
|
+| | Aggregate: groupBy=[[]],
aggr=[[SUM(lineitem.l_extendedprice)]]
|
+| | Projection: lineitem.l_extendedprice
|
+| | Inner Join: part.p_partkey = __scalar_sq_1.l_partkey
Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) <
CAST(__scalar_sq_1.__value AS Decimal128(30, 15))
|
Review Comment:
This is a better plan because the redundant
```
Filter: part.p_partkey = lineitem.l_partkey AND lineitem.l_partkey =
part.p_partkey
```
, which is already done by the earlier joins, is removed, right?
##########
datafusion/core/src/physical_plan/planner.rs:
##########
@@ -952,21 +952,21 @@ impl DefaultPhysicalPlanner {
let join_filter = match filter {
Some(expr) => {
- // Extract columns from filter expression
+ // Extract columns from filter expression and
saved in a HashSet
let cols = expr.to_columns()?;
- // Collect left & right field indices
+ // Collect left & right field indices, the field
indices are sorted in ascending order
Review Comment:
If the sort order is important for later stages, can you make a note about
the rationale (so the comment explains why the sorting is important, in
addition to noting the output is sorted)
##########
benchmarks/expected-plans/q17.txt:
##########
@@ -1,55 +1,49 @@
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| plan_type | plan
|
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
-| logical_plan | Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) /
Float64(7) AS avg_yearly
|
-| | Aggregate: groupBy=[[]],
aggr=[[SUM(lineitem.l_extendedprice)]]
|
-| | Projection: lineitem.l_extendedprice
|
-| | Filter: CAST(lineitem.l_quantity AS Decimal128(30,
15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15)) AND
__scalar_sq_1.l_partkey = lineitem.l_partkey |
-| | Projection: lineitem.l_partkey, lineitem.l_quantity,
lineitem.l_extendedprice, __scalar_sq_1.l_partkey, __scalar_sq_1.__value
|
-| | Inner Join: part.p_partkey =
__scalar_sq_1.l_partkey
|
-| | Filter: part.p_partkey = lineitem.l_partkey AND
lineitem.l_partkey = part.p_partkey
|
-| | Inner Join: lineitem.l_partkey =
part.p_partkey
|
-| | TableScan: lineitem projection=[l_partkey,
l_quantity, l_extendedprice]
|
-| | Projection: part.p_partkey
|
-| | Filter: part.p_brand = Utf8("Brand#23")
AND part.p_container = Utf8("MED BOX")
|
-| | TableScan: part projection=[p_partkey,
p_brand, p_container]
|
-| | SubqueryAlias: __scalar_sq_1
|
-| | Projection: lineitem.l_partkey, Float64(0.2) *
CAST(AVG(lineitem.l_quantity) AS Float64) AS __value
|
-| | Aggregate: groupBy=[[lineitem.l_partkey]],
aggr=[[AVG(lineitem.l_quantity)]]
|
-| | TableScan: lineitem projection=[l_partkey,
l_quantity]
|
-| physical_plan | ProjectionExec: expr=[CAST(SUM(lineitem.l_extendedprice)@0
AS Float64) / 7 as avg_yearly]
|
-| | AggregateExec: mode=Final, gby=[],
aggr=[SUM(lineitem.l_extendedprice)]
|
-| | CoalescePartitionsExec
|
-| | AggregateExec: mode=Partial, gby=[],
aggr=[SUM(lineitem.l_extendedprice)]
|
-| | ProjectionExec: expr=[l_extendedprice@2 as
l_extendedprice]
|
-| | CoalesceBatchesExec: target_batch_size=8192
|
-| | FilterExec: CAST(l_quantity@1 AS Decimal128(30,
15)) < CAST(__value@4 AS Decimal128(30, 15)) AND l_partkey@3 = l_partkey@0
|
-| | ProjectionExec: expr=[l_partkey@0 as
l_partkey, l_quantity@1 as l_quantity, l_extendedprice@2 as l_extendedprice,
l_partkey@4 as l_partkey, __value@5 as __value] |
-| | CoalesceBatchesExec: target_batch_size=8192
|
-| | HashJoinExec: mode=Partitioned,
join_type=Inner, on=[(Column { name: "p_partkey", index: 3 }, Column { name:
"l_partkey", index: 0 })] |
-| | CoalesceBatchesExec:
target_batch_size=8192
|
-| | RepartitionExec:
partitioning=Hash([Column { name: "p_partkey", index: 3 }], 2),
input_partitions=2 |
-| | CoalesceBatchesExec:
target_batch_size=8192
|
-| | FilterExec: p_partkey@3 =
l_partkey@0 AND l_partkey@0 = p_partkey@3
|
-| | RepartitionExec:
partitioning=RoundRobinBatch(2), input_partitions=2
|
-| | CoalesceBatchesExec:
target_batch_size=8192
|
-| | HashJoinExec:
mode=Partitioned, join_type=Inner, on=[(Column { name: "l_partkey", index: 0 },
Column { name: "p_partkey", index: 0 })] |
-| | CoalesceBatchesExec:
target_batch_size=8192
|
-| | RepartitionExec:
partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2),
input_partitions=0 |
-| | MemoryExec:
partitions=0, partition_sizes=[]
|
-| | CoalesceBatchesExec:
target_batch_size=8192
|
-| | RepartitionExec:
partitioning=Hash([Column { name: "p_partkey", index: 0 }], 2),
input_partitions=2 |
-| | RepartitionExec:
partitioning=RoundRobinBatch(2), input_partitions=0
|
-| | ProjectionExec:
expr=[p_partkey@0 as p_partkey]
|
-| |
CoalesceBatchesExec: target_batch_size=8192
|
-| | FilterExec:
p_brand@1 = Brand#23 AND p_container@2 = MED BOX
|
-| | MemoryExec:
partitions=0, partition_sizes=[]
|
-| | ProjectionExec: expr=[l_partkey@0 as
l_partkey, 0.2 * CAST(AVG(lineitem.l_quantity)@1 AS Float64) as __value]
|
-| | AggregateExec: mode=FinalPartitioned,
gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)]
|
-| | CoalesceBatchesExec:
target_batch_size=8192
|
-| | RepartitionExec:
partitioning=Hash([Column { name: "l_partkey", index: 0 }], 2),
input_partitions=2 |
-| | RepartitionExec:
partitioning=RoundRobinBatch(2), input_partitions=0
|
-| | AggregateExec: mode=Partial,
gby=[l_partkey@0 as l_partkey], aggr=[AVG(lineitem.l_quantity)]
|
-| | MemoryExec: partitions=0,
partition_sizes=[]
|
-| |
|
-+---------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
\ No newline at end of file
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| plan_type | plan
|
++---------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+| logical_plan | Projection: CAST(SUM(lineitem.l_extendedprice) AS Float64) /
Float64(7) AS avg_yearly
|
+| | Aggregate: groupBy=[[]],
aggr=[[SUM(lineitem.l_extendedprice)]]
|
+| | Projection: lineitem.l_extendedprice
|
+| | Inner Join: part.p_partkey = __scalar_sq_1.l_partkey
Filter: CAST(lineitem.l_quantity AS Decimal128(30, 15)) <
CAST(__scalar_sq_1.__value AS Decimal128(30, 15))
|
Review Comment:
It also pushes the filter
```
| | Filter: CAST(lineitem.l_quantity AS Decimal128(30,
15)) < CAST(__scalar_sq_1.__value AS Decimal128(30, 15)) AND
__scalar_sq_1.l_partkey = lineitem.l_partkey |
``
Into the Join which seems like a win to me (avoid generating output)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
