NEUpanning opened a new issue, #8184:
URL: https://github.com/apache/incubator-gluten/issues/8184

   ### Backend
   
   VL (Velox)
   
   ### Bug description
   
   ### Describe the issue
   **Reproducing SQL:**
   
   ```sql
   CREATE OR REPLACE TEMP VIEW temp_table AS
   SELECT * FROM VALUES
     (1, 'a'), (1, 'b'), (1, 'c'),
     (2, 'd'), (2, 'e'), (2, 'f'),
     (3, 'g'), (3, 'h'), (3, 'i')
   AS t(id, value);
   
   SELECT id, collect_list(value) AS values_list
   FROM (
     SELECT * FROM
     (SELECT id, value
      FROM temp_table
      DISTRIBUTE BY rand())  -- Forces a shuffle
     DISTRIBUTE BY id SORT BY id, value
   ) t
   GROUP BY id;
   ```
   
   **Results:**
   
   - The **vanilla** result is deterministic and values_list is sorted:
     ```
     id   values_list
     1    ["a", "b", "c"]
     2    ["d", "e", "f"]
     3    ["g", "h", "i"]
     ```
   
   - The **gluten** result is non-deterministic and values_list is not sorted, 
e.g. :
     ```
     id   values_list
     1    ["a", "c", "b"]
     3    ["g", "i", "h"]
     2    ["f", "e", "d"]
     ```
    The gluten physical plan:
   ```
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=true
   +- == Final Plan ==
      VeloxColumnarToRowExec
      +- ^(9) HashAggregateTransformer(keys=[id#0], 
functions=[velox_collect_list(value#1)], isStreamingAgg=false, output=[id#0, 
values_list#22])
         +- ^(9) HashAggregateTransformer(keys=[id#0], 
functions=[partial_velox_collect_list(value#1)], isStreamingAgg=false, 
output=[id#0, buffer#29])
            +- ^(9) InputIteratorTransformer[id#0, value#1]
               +- ShuffleQueryStage 1
                  +- ColumnarExchange hashpartitioning(id#0, 20), 
REPARTITION_WITH_NUM, [id#0, value#1], [id=#1293], [id=#1293], [OUTPUT] 
List(id:IntegerType, value:StringType), [OUTPUT] List(id:IntegerType, 
value:StringType)
                     +- VeloxAppendBatches 3276
                        +- ^(8) ProjectExecTransformer [hash(id#0, 42) AS 
hash_partition_key#31, id#0, value#1]
                           +- ^(8) InputIteratorTransformer[id#0, value#1, 
_nondeterministic#24]
                              +- ShuffleQueryStage 0
                                 +- ColumnarExchange 
hashpartitioning(_nondeterministic#24, 20), REPARTITION_WITH_NUM, [id#0, 
value#1, _nondeterministic#24], [id=#1239], [id=#1239], [OUTPUT] 
List(id:IntegerType, value:StringType, _nondeterministic:DoubleType), [OUTPUT] 
List(id:IntegerType, value:StringType, _nondeterministic:DoubleType)
                                    +- VeloxAppendBatches 3276
                                       +- ^(7) ProjectExecTransformer 
[hash(_nondeterministic#24, 42) AS hash_partition_key#30, id#0, value#1, 
_nondeterministic#24]
                                          +- ^(7) 
InputIteratorTransformer[id#0, value#1, _nondeterministic#24]
                                             +- RowToVeloxColumnar
                                                +- LocalTableScan [id#0, 
value#1, _nondeterministic#24]
   +- == Initial Plan ==
      SortAggregate(key=[id#0], functions=[velox_collect_list(value#1)], 
output=[id#0, values_list#22])
      +- SortAggregate(key=[id#0], 
functions=[partial_velox_collect_list(value#1)], output=[id#0, buffer#29])
         +- Sort [id#0 ASC NULLS FIRST, value#1 ASC NULLS FIRST], false, 0
            +- Exchange hashpartitioning(id#0, 20), REPARTITION_WITH_NUM, 
[id=#1211]
               +- Project [id#0, value#1]
                  +- Exchange hashpartitioning(_nondeterministic#24, 20), 
REPARTITION_WITH_NUM, [id=#1209]
                     +- LocalTableScan [id#0, value#1, _nondeterministic#24]
   ```
   
   Even though the collect_list function is non-deterministic, as stated in the 
documentation, some ETL tasks in our production environment depend on this 
behavior in vanilla Spark.
   
   ### Root cause for this issue
   
   We can see the Sort operator is removed through the gluten plan. This change 
appears to be due to this code snippet: [code 
link](https://github.com/apache/incubator-gluten/blob/branch-1.2/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala#L438).
   ```scala
   class ReplaceSingleNode() extends LogLevelUtil with Logging {
   
       def doReplace(p: SparkPlan): SparkPlan = {
   // ....
       case plan: SortAggregateExec =>
         logDebug(s"Columnar Processing for ${plan.getClass} is currently 
supported.")
         HashAggregateExecBaseTransformer.from(plan)(SortUtils.dropPartialSort)
   // ...
   }
   object SortUtils {
     def dropPartialSort(plan: SparkPlan): SparkPlan = plan match {
       case RewrittenNodeWall(p) => RewrittenNodeWall(dropPartialSort(p))
       case PartialSortLike(child) => child
       // from pre/post project-pulling
       case ProjectLike(PartialSortLike(ProjectLike(child))) if plan.outputSet 
== child.outputSet =>
         child
       case ProjectLike(PartialSortLike(child)) => 
plan.withNewChildren(Seq(child))
       case _ => plan
     }
   }          
             
   ```
    I'm  wondering why the partial sort needs to be removed for 
SortAggregateExec. Would it be possible to retain the partial sort operator?
   
   ### Spark version
   
   None
   
   ### Spark configurations
   
   _No response_
   
   ### System information
   
   _No response_
   
   ### Relevant logs
   
   _No response_


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to