NEUpanning commented on issue #8184:
URL: 
https://github.com/apache/incubator-gluten/issues/8184#issuecomment-2534703555

   > I think we could preserve the sort as long as vanilla Spark plan has it 
with a hash agg.
   
   @zhztheplayer I see this feature is implemented in gluten 1.2 branch, but 
main branch doesn't include it for some reason. For this issue, CollectList 
function is replaced by VeloxCollectList function in logical optimization 
phase. Here is the spark plan:
   ```
   == Parsed Logical Plan ==
   Aggregate [id#0], [id#0, collect_list(value#1, 0, 0) AS values_list#13]
   +- SubqueryAlias t
      +- Project [id#0, value#1]
         +- RepartitionByExpression [_nondeterministic#15], 20
            +- Project [id#0, value#1, rand(5386921442550703776) AS 
_nondeterministic#15]
               +- Project [id#0, value#1]
                  +- SubqueryAlias temp_table
                     +- Project [id#0, value#1]
                        +- SubqueryAlias t
                           +- LocalRelation [id#0, value#1]
   
   == Analyzed Logical Plan ==
   id: int, values_list: array<string>
   Aggregate [id#0], [id#0, collect_list(value#1, 0, 0) AS values_list#13]
   +- SubqueryAlias t
      +- Project [id#0, value#1]
         +- RepartitionByExpression [_nondeterministic#15], 20
            +- Project [id#0, value#1, rand(5386921442550703776) AS 
_nondeterministic#15]
               +- Project [id#0, value#1]
                  +- SubqueryAlias temp_table
                     +- Project [id#0, value#1]
                        +- SubqueryAlias t
                           +- LocalRelation [id#0, value#1]
   
   == Optimized Logical Plan ==
   Aggregate [id#0], [id#0, velox_collect_list(value#1) AS values_list#13]
   +- Project [id#0, value#1]
      +- RepartitionByExpression [_nondeterministic#15], 20
         +- LocalRelation [id#0, value#1, _nondeterministic#15]
   
   == Physical Plan ==
   AdaptiveSparkPlan isFinalPlan=true
   +- == Final Plan ==
      VeloxColumnarToRowExec
      +- ^(3) HashAggregateTransformer(keys=[id#0], 
functions=[velox_collect_list(value#1)], isStreamingAgg=false, output=[id#0, 
values_list#13])
         +- ^(3) InputIteratorTransformer[id#0, buffer#20]
            +- CustomShuffleReader coalesced
               +- ShuffleQueryStage 1
                  +- ColumnarExchange hashpartitioning(id#0, 20), 
ENSURE_REQUIREMENTS, [id#0, buffer#20], [id=#1048], [id=#1048], [OUTPUT] 
List(id:IntegerType, buffer:ArrayType(StringType,false)), [OUTPUT] 
List(id:IntegerType, buffer:ArrayType(StringType,false))
                     +- VeloxAppendBatches 3276
                        +- ^(2) ProjectExecTransformer [hash(id#0, 42) AS 
hash_partition_key#22, id#0, buffer#20]
                           +- ^(2) 
FlushableHashAggregateTransformer(keys=[id#0], 
functions=[partial_velox_collect_list(value#1)], isStreamingAgg=false, 
output=[id#0, buffer#20])
                              +- ^(2) ProjectExecTransformer [id#0, value#1]
                                 +- ^(2) InputIteratorTransformer[id#0, 
value#1, _nondeterministic#15]
                                    +- ShuffleQueryStage 0
                                       +- ColumnarExchange 
hashpartitioning(_nondeterministic#15, 20), REPARTITION_WITH_NUM, [id#0, 
value#1, _nondeterministic#15], [id=#966], [id=#966], [OUTPUT] 
List(id:IntegerType, value:StringType, _nondeterministic:DoubleType), [OUTPUT] 
List(id:IntegerType, value:StringType, _nondeterministic:DoubleType)
                                          +- VeloxAppendBatches 3276
                                             +- ^(1) ProjectExecTransformer 
[hash(_nondeterministic#15, 42) AS hash_partition_key#21, id#0, value#1, 
_nondeterministic#15]
                                                +- ^(1) 
InputIteratorTransformer[id#0, value#1, _nondeterministic#15]
                                                   +- RowToVeloxColumnar
                                                      +- LocalTableScan [id#0, 
value#1, _nondeterministic#15]
   +- == Initial Plan ==
      SortAggregate(key=[id#0], functions=[velox_collect_list(value#1)], 
output=[id#0, values_list#13])
      +- Sort [id#0 ASC NULLS FIRST], false, 0
         +- Exchange hashpartitioning(id#0, 20), ENSURE_REQUIREMENTS, [id=#936]
            +- SortAggregate(key=[id#0], 
functions=[partial_velox_collect_list(value#1)], output=[id#0, buffer#20])
               +- Sort [id#0 ASC NULLS FIRST], false, 0
                  +- Project [id#0, value#1]
                     +- Exchange hashpartitioning(_nondeterministic#15, 20), 
REPARTITION_WITH_NUM, [id=#928]
                        +- LocalTableScan [id#0, value#1, _nondeterministic#15]
   ```
   This leads to Spark using SortAggregateExec
    instead of ObjectHashAggregateExec as aggregate operator. So I think the 
sort also should be preserved if aggregate expressions include VeloxCollectList 
or VeloxCollectSet.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to