alamb commented on code in PR #7129:
URL: https://github.com/apache/arrow-datafusion/pull/7129#discussion_r1279180964


##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2689,13 +2703,12 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST]
 --SortExec: expr=[country@0 ASC NULLS LAST]
 ----ProjectionExec: expr=[country@0 as country, 
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as 
fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS 
LAST]@2 as fv2]
 ------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
---------SortExec: expr=[ts@1 ASC NULLS LAST]
-----------CoalesceBatchesExec: target_batch_size=8192
-------------RepartitionExec: partitioning=Hash([country@0], 8), 
input_partitions=8
---------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)], 
ordering_mode=None
-----------------SortExec: expr=[ts@1 ASC NULLS LAST]
-------------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
---------------------MemoryExec: partitions=1, partition_sizes=[1]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------RepartitionExec: partitioning=Hash([country@0], 8), 
input_partitions=8
+------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]

Review Comment:
   I am not sure this new plan is correct either as the data is not necessarily 
sorted for the final grouping . The old plan also seems wrong.
   
   My reasoning is that for an aggregate like this:
   ```
   FIRST_VALUE(amount ORDER BY ts ASC)
   ```
   
   the input to the group operator has to be sorted on `ts`.
   
   However, I don't see how the order by `ts` is preserved after the 
`AggregateExec: mode=Partial` first grouping phase 
   
   ```shell
   ------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
    ** I think the order by `ts` is not preserved here, so the data is not 
ordered by ts for the final grouping **
   ------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
   
   ```
   
   Maybe the right solution would be to do a single phase grouping when any of 
the aggregates have an `ORDER BY` clause
   
   ```
   AggregateExec: mode=Final, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] ** no 
partial group by 
     SortExec: expr=[ts@1 ASC NULLS LAST]
       RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
         MemoryExec: partitions=1, partition_sizes=[1]
   ```
   
   🤔 



##########
datafusion/core/tests/sqllogictests/test_files/groupby.slt:
##########
@@ -2689,13 +2703,12 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST]
 --SortExec: expr=[country@0 ASC NULLS LAST]
 ----ProjectionExec: expr=[country@0 as country, 
FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as 
fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS 
LAST]@2 as fv2]
 ------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
---------SortExec: expr=[ts@1 ASC NULLS LAST]
-----------CoalesceBatchesExec: target_batch_size=8192
-------------RepartitionExec: partitioning=Hash([country@0], 8), 
input_partitions=8
---------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)], 
ordering_mode=None
-----------------SortExec: expr=[ts@1 ASC NULLS LAST]
-------------------RepartitionExec: partitioning=RoundRobinBatch(8), 
input_partitions=1
---------------------MemoryExec: partitions=1, partition_sizes=[1]
+--------CoalesceBatchesExec: target_batch_size=8192
+----------RepartitionExec: partitioning=Hash([country@0], 8), 
input_partitions=8
+------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]

Review Comment:
   I am not sure this new plan is correct either as the data is not necessarily 
sorted for the final grouping . The old plan also seems wrong.
   
   My reasoning is that for an aggregate like this:
   ```
   FIRST_VALUE(amount ORDER BY ts ASC)
   ```
   
   the input to the group operator has to be sorted on `ts`.
   
   However, I don't see how the order by `ts` is preserved after the 
`AggregateExec: mode=Partial` first grouping phase 
   
   ```shell
   ------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
    ** I think the order by `ts` is not preserved here, so the data is not 
ordered by ts for the final grouping **
   ------------AggregateExec: mode=Partial, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)]
   
   ```
   
   Maybe the right solution would be to do a single phase grouping when any of 
the aggregates have an `ORDER BY` clause
   
   ```
   AggregateExec: mode=Final, gby=[country@0 as country], 
aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] ** no 
partial group by 
     SortExec: expr=[ts@1 ASC NULLS LAST]
       RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
         MemoryExec: partitions=1, partition_sizes=[1]
   ```
   
   🤔 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to