jiangjiangtian opened a new issue, #12413:
URL: https://github.com/apache/gluten/issues/12413
### Backend
VL (Velox)
### Bug description
Suppose we want to execute the following SQL (the SQL is just for showing
and doesn't mean anything):
```SQL
CREATE OR REPLACE TEMP VIEW employees AS
SELECT * FROM VALUES
(1, 'Alice', 'Sales,Computer', 1000),
(2, 'Bob', 'Marketing,Sales', 2000),
(3, 'Charlie', 'Marketing,Trading', 3000)
AS employees(emp_id, emp_name, dept_names, sale);
set spark.sql.adaptive.enabled=false;
EXPLAIN WITH base AS (
SELECT * FROM employees
LATERAL VIEW explode(split(dept_names, ',')) AS dept_name
)
SELECT * FROM
(
SELECT emp_id, emp_name, COUNT(*)
FROM (
SELECT emp_id, emp_name, dept_names, sale
FROM base
GROUP BY 1, 2, 3, 4
)
GROUP BY 1, 2
) a
JOIN
(
SELECT emp_id, SUM(sale)
FROM (
SELECT emp_id, emp_name, dept_names, sale
FROM base
GROUP BY 1, 2, 3, 4
)
GROUP BY 1
) b
ON a.emp_id = b.emp_id;
```
The plan is:
```
== Physical Plan ==
VeloxColumnarToRow
+- ^(6) BroadcastHashJoinExecTransformer [emp_id#78], [emp_id#86], Inner,
BuildRight, false
:- ^(6) HashAggregateTransformer(keys=[emp_id#78, emp_name#79],
functions=[count(1)], isStreamingAgg=false)
: +- ^(6) InputIteratorTransformer[emp_id#78, emp_name#79, count#101L]
: +- ColumnarExchange hashpartitioning(emp_id#78, emp_name#79, 2000),
ENSURE_REQUIREMENTS, [emp_id#78, emp_name#79, count#101L], [plan_id=1889],
[shuffle_writer_type=hash], [OUTPUT] List(emp_id:IntegerType,
emp_name:StringType, count:LongType)
: +- VeloxResizeBatches 1024, 2147483647, 10485760
: +- ^(2) ProjectExecTransformer [hash(emp_id#78, emp_name#79,
42) AS hash_partition_key#129, emp_id#78, emp_name#79, count#101L]
: +- ^(2) FlushableHashAggregateTransformer(keys=[emp_id#78,
emp_name#79], functions=[partial_count(1)], isStreamingAgg=false)
: +- ^(2) ProjectExecTransformer [emp_id#78, emp_name#79]
: +- ^(2) HashAggregateTransformer(keys=[emp_id#78,
emp_name#79, dept_names#80, sale#81], functions=[], isStreamingAgg=false)
: +- ^(2) InputIteratorTransformer[emp_id#78,
emp_name#79, dept_names#80, sale#81]
: +- ColumnarExchange
hashpartitioning(emp_id#78, emp_name#79, dept_names#80, sale#81, 2000),
ENSURE_REQUIREMENTS, [emp_id#78, emp_name#79, dept_names#80, sale#81],
[plan_id=1880], [shuffle_writer_type=hash], [OUTPUT] List(emp_id:IntegerType,
emp_name:StringType, dept_names:StringType, sale:IntegerType)
: +- VeloxResizeBatches 1024, 2147483647,
10485760
: +- ^(1) ProjectExecTransformer
[hash(emp_id#78, emp_name#79, dept_names#80, sale#81, 42) AS
hash_partition_key#128, emp_id#78, emp_name#79, dept_names#80, sale#81]
: +- ^(1)
FlushableHashAggregateTransformer(keys=[emp_id#78, emp_name#79, dept_names#80,
sale#81], functions=[], isStreamingAgg=false)
: +- ^(1) ProjectExecTransformer
[emp_id#78, emp_name#79, dept_names#80, sale#81]
: +- ^(1) GenerateExecTransformer
explode(split(dept_names#80, ,, -1) AS _pre_0#104), [emp_id#78, emp_name#79,
dept_names#80, sale#81], false, [dept_name#95]
: +- ^(1)
ProjectExecTransformer [emp_id#78, emp_name#79, dept_names#80, sale#81,
split(dept_names#80, ,, -1) AS _pre_0#104]
: +- ^(1)
InputIteratorTransformer[emp_id#78, emp_name#79, dept_names#80, sale#81]
: +- RowToVeloxColumnar
: +- LocalTableScan
[emp_id#78, emp_name#79, dept_names#80, sale#81]
+- ^(6) InputIteratorTransformer[emp_id#86, sum(sale)#99L]
+- ColumnarBroadcastExchange
HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false),
[plan_id=1916]
+- ^(5) HashAggregateTransformer(keys=[emp_id#86],
functions=[sum(sale#89)], isStreamingAgg=false)
+- ^(5) InputIteratorTransformer[emp_id#86, sum#103L]
+- ColumnarExchange hashpartitioning(emp_id#86, 2000),
ENSURE_REQUIREMENTS, [emp_id#86, sum#103L], [plan_id=1911],
[shuffle_writer_type=hash], [OUTPUT] List(emp_id:IntegerType, sum:LongType)
+- VeloxResizeBatches 1024, 2147483647, 10485760
+- ^(4) ProjectExecTransformer [hash(emp_id#86, 42) AS
hash_partition_key#131, emp_id#86, sum#103L]
+- ^(4)
FlushableHashAggregateTransformer(keys=[emp_id#86],
functions=[partial_sum(sale#89)], isStreamingAgg=false)
+- ^(4) ProjectExecTransformer [emp_id#86,
sale#89]
+- ^(4)
HashAggregateTransformer(keys=[emp_id#86, emp_name#87, dept_names#88, sale#89],
functions=[], isStreamingAgg=false)
+- ^(4) InputIteratorTransformer[emp_id#86,
emp_name#87, dept_names#88, sale#89]
+- ColumnarExchange
hashpartitioning(emp_id#86, emp_name#87, dept_names#88, sale#89, 2000),
ENSURE_REQUIREMENTS, [emp_id#86, emp_name#87, dept_names#88, sale#89],
[plan_id=1902], [shuffle_writer_type=hash], [OUTPUT] List(emp_id:IntegerType,
emp_name:StringType, dept_names:StringType, sale:IntegerType)
+- VeloxResizeBatches 1024,
2147483647, 10485760
+- ^(3) ProjectExecTransformer
[hash(emp_id#86, emp_name#87, dept_names#88, sale#89, 42) AS
hash_partition_key#130, emp_id#86, emp_name#87, dept_names#88, sale#89]
+- ^(3)
FlushableHashAggregateTransformer(keys=[emp_id#86, emp_name#87, dept_names#88,
sale#89], functions=[], isStreamingAgg=false)
+- ^(3)
ProjectExecTransformer [emp_id#86, emp_name#87, dept_names#88, sale#89]
+- ^(3)
GenerateExecTransformer explode(split(dept_names#88, ,, -1) AS _pre_1#105),
[emp_id#86, emp_name#87, dept_names#88, sale#89], false, [dept_name#96]
+- ^(3)
ProjectExecTransformer [emp_id#86, emp_name#87, dept_names#88, sale#89,
split(dept_names#88, ,, -1) AS _pre_1#105]
+- ^(3)
InputIteratorTransformer[emp_id#86, emp_name#87, dept_names#88, sale#89]
+-
RowToVeloxColumnar
+-
LocalTableScan [emp_id#86, emp_name#87, dept_names#88, sale#89]
```
The plan is correct but we lose the opportunity to apply the optimization of
reusing exchange. The two ColumnarExchange nodes (plan_id=1880 and
plan_id=1902) are structurally identical, but are not reused (no ReusedExchange
appears in the plan).
The root cause is the Alias expression in the generator of
GenerateExecTransformer. In the plan above, the first branch has:
```
GenerateExecTransformer explode(split(dept_names#80, ,, -1) AS _pre_0#104),
...
```
while the second branch has:
```
GenerateExecTransformer explode(split(dept_names#88, ,, -1) AS _pre_1#105),
...
```
The Alias (AS _pre_0#104 / AS _pre_1#105) is nested inside the generator
expression. Spark's only assigns position-based ExprId to top-level Alias nodes
in the plan's expressions. Since the generator (e.g., Explode) is not an Alias,
it falls into the case other branch which calls
[normalizeExpressions](https://github.com/apache/spark/blob/branch-3.5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala#L650-L668).
However, normalizeExpressions only replaces the exprId of AttributeReference —
it does not modify the exprId of a nested Alias. As a result, the Alias inside
the generator retains its original globally-unique exprId (e.g., #104 vs #105).
Since compares exprId, the two structurally-identical sub-plans produce
different canonicalized forms, preventing ReuseExchange from reusing the
exchange.
### Gluten version
_No response_
### Spark version
None
### Spark configurations
3.5
### System information
_No response_
### Relevant logs
```bash
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]