NEUpanning opened a new issue, #8184:
URL: https://github.com/apache/incubator-gluten/issues/8184
### Backend
VL (Velox)
### Bug description
### Describe the issue
**Reproducing SQL:**
```sql
CREATE OR REPLACE TEMP VIEW temp_table AS
SELECT * FROM VALUES
(1, 'a'), (1, 'b'), (1, 'c'),
(2, 'd'), (2, 'e'), (2, 'f'),
(3, 'g'), (3, 'h'), (3, 'i')
AS t(id, value);
SELECT id, collect_list(value) AS values_list
FROM (
SELECT * FROM
(SELECT id, value
FROM temp_table
DISTRIBUTE BY rand()) -- Forces a shuffle
DISTRIBUTE BY id SORT BY id, value
) t
GROUP BY id;
```
**Results:**
- The **vanilla** result is deterministic and values_list is sorted:
```
id values_list
1 ["a", "b", "c"]
2 ["d", "e", "f"]
3 ["g", "h", "i"]
```
- The **gluten** result is non-deterministic and values_list is not sorted,
e.g. :
```
id values_list
1 ["a", "c", "b"]
3 ["g", "i", "h"]
2 ["f", "e", "d"]
```
The gluten physical plan:
```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
VeloxColumnarToRowExec
+- ^(9) HashAggregateTransformer(keys=[id#0],
functions=[velox_collect_list(value#1)], isStreamingAgg=false, output=[id#0,
values_list#22])
+- ^(9) HashAggregateTransformer(keys=[id#0],
functions=[partial_velox_collect_list(value#1)], isStreamingAgg=false,
output=[id#0, buffer#29])
+- ^(9) InputIteratorTransformer[id#0, value#1]
+- ShuffleQueryStage 1
+- ColumnarExchange hashpartitioning(id#0, 20),
REPARTITION_WITH_NUM, [id#0, value#1], [id=#1293], [id=#1293], [OUTPUT]
List(id:IntegerType, value:StringType), [OUTPUT] List(id:IntegerType,
value:StringType)
+- VeloxAppendBatches 3276
+- ^(8) ProjectExecTransformer [hash(id#0, 42) AS
hash_partition_key#31, id#0, value#1]
+- ^(8) InputIteratorTransformer[id#0, value#1,
_nondeterministic#24]
+- ShuffleQueryStage 0
+- ColumnarExchange
hashpartitioning(_nondeterministic#24, 20), REPARTITION_WITH_NUM, [id#0,
value#1, _nondeterministic#24], [id=#1239], [id=#1239], [OUTPUT]
List(id:IntegerType, value:StringType, _nondeterministic:DoubleType), [OUTPUT]
List(id:IntegerType, value:StringType, _nondeterministic:DoubleType)
+- VeloxAppendBatches 3276
+- ^(7) ProjectExecTransformer
[hash(_nondeterministic#24, 42) AS hash_partition_key#30, id#0, value#1,
_nondeterministic#24]
+- ^(7)
InputIteratorTransformer[id#0, value#1, _nondeterministic#24]
+- RowToVeloxColumnar
+- LocalTableScan [id#0,
value#1, _nondeterministic#24]
+- == Initial Plan ==
SortAggregate(key=[id#0], functions=[velox_collect_list(value#1)],
output=[id#0, values_list#22])
+- SortAggregate(key=[id#0],
functions=[partial_velox_collect_list(value#1)], output=[id#0, buffer#29])
+- Sort [id#0 ASC NULLS FIRST, value#1 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(id#0, 20), REPARTITION_WITH_NUM,
[id=#1211]
+- Project [id#0, value#1]
+- Exchange hashpartitioning(_nondeterministic#24, 20),
REPARTITION_WITH_NUM, [id=#1209]
+- LocalTableScan [id#0, value#1, _nondeterministic#24]
```
Even though the collect_list function is non-deterministic, as stated in the
documentation, some ETL tasks in our production environment depend on this
behavior in vanilla Spark.
### Root cause for this issue
We can see the Sort operator is removed through the gluten plan. This change
appears to be due to this code snippet: [code
link](https://github.com/apache/incubator-gluten/blob/branch-1.2/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala#L438).
```scala
class ReplaceSingleNode() extends LogLevelUtil with Logging {
def doReplace(p: SparkPlan): SparkPlan = {
// ....
case plan: SortAggregateExec =>
logDebug(s"Columnar Processing for ${plan.getClass} is currently
supported.")
HashAggregateExecBaseTransformer.from(plan)(SortUtils.dropPartialSort)
// ...
}
object SortUtils {
def dropPartialSort(plan: SparkPlan): SparkPlan = plan match {
case RewrittenNodeWall(p) => RewrittenNodeWall(dropPartialSort(p))
case PartialSortLike(child) => child
// from pre/post project-pulling
case ProjectLike(PartialSortLike(ProjectLike(child))) if plan.outputSet
== child.outputSet =>
child
case ProjectLike(PartialSortLike(child)) =>
plan.withNewChildren(Seq(child))
case _ => plan
}
}
```
I'm wondering why the partial sort needs to be removed for
SortAggregateExec. Would it be possible to retain the partial sort operator?
### Spark version
None
### Spark configurations
_No response_
### System information
_No response_
### Relevant logs
_No response_
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]