mustafasrepo commented on code in PR #7129: URL: https://github.com/apache/arrow-datafusion/pull/7129#discussion_r1279277814
########## datafusion/core/tests/sqllogictests/test_files/groupby.slt: ########## @@ -2689,13 +2703,12 @@ SortPreservingMergeExec: [country@0 ASC NULLS LAST] --SortExec: expr=[country@0 ASC NULLS LAST] ----ProjectionExec: expr=[country@0 as country, FIRST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as fv1, LAST_VALUE(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@2 as fv2] ------AggregateExec: mode=FinalPartitioned, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] ---------SortExec: expr=[ts@1 ASC NULLS LAST] -----------CoalesceBatchesExec: target_batch_size=8192 -------------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 ---------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)], ordering_mode=None -----------------SortExec: expr=[ts@1 ASC NULLS LAST] -------------------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1 ---------------------MemoryExec: partitions=1, partition_sizes=[1] +--------CoalesceBatchesExec: target_batch_size=8192 +----------RepartitionExec: partitioning=Hash([country@0], 8), input_partitions=8 +------------AggregateExec: mode=Partial, gby=[country@0 as country], aggr=[FIRST_VALUE(sales_global.amount), LAST_VALUE(sales_global.amount)] Review Comment: Consider table below | amount | ts | | -------- | ------- | | 12 | 1| | 11| 2| | 13| 3| | 12|4| | 11| 5| | 13| 6| | 12 |7| | 11 | 8| | 13| 9| Also assume we have 3 partitions, receiving following data | amount | ts | | -------- | ------- | | 11| 2| | 11| 5| | 11 | 8| | amount | ts | | -------- | ------- | | 12 | 1| | 12 |4| | 12 |7| | amount | ts | | -------- | ------- | | 13| 3| | 13| 6| | 13| 9| AggregatePartial would produce following values (11, 2); (12, 1); (13;3) for each partition. First value represents first_value for this partitition. Second value represents its corresponding ts value. In this case AgregateFinal would receive following batch | amount | ts of amount partial result | | -------- | ------- | | 11| 2| | 12| 1| | 13| 3|. During `merge_batch` method of `first_value` first value is calculated by considering `ts` values corresponding to `amount` for each partition. In our case, since requirement is `ts ASC`, first value should be from the row that have smallest `ts` (in our case 1). Hence result will be 12. Please note that `ts` at the final input and `ts` at the partial input doesn't correspond to same column. `ts` at the final aggregation input, comes from the `state` of aggregation partial result. In short, we delegated responsibility to sort to `merge_batch` algorithm. Because, the column where sorting will be done is no longer valid for the aggregation final. > Maybe the right solution would be to do a single phase grouping when any of the aggregates have an ORDER BY clause this would certainly work. However, I wanted to use existing parallelization as much as possible. Hence, I wanted to make aggregators to work in Partial and Final modes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
