This is an automated email from the ASF dual-hosted git repository.

jakevin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 9f3fb08f46 Relax check during aggregate partial mode. (#7101)
9f3fb08f46 is described below

commit 9f3fb08f46cf8560428d37e42cd8df95fc185000
Author: Mustafa Akur <[email protected]>
AuthorDate: Thu Jul 27 09:06:01 2023 +0300

    Relax check during aggregate partial mode. (#7101)
---
 .../src/physical_plan/aggregates/order/partial.rs  |  5 +-
 .../tests/sqllogictests/test_files/groupby.slt     | 54 +++++++++++++++++++++-
 2 files changed, 56 insertions(+), 3 deletions(-)

diff --git a/datafusion/core/src/physical_plan/aggregates/order/partial.rs 
b/datafusion/core/src/physical_plan/aggregates/order/partial.rs
index ac32c69fd5..019e61ef26 100644
--- a/datafusion/core/src/physical_plan/aggregates/order/partial.rs
+++ b/datafusion/core/src/physical_plan/aggregates/order/partial.rs
@@ -108,9 +108,10 @@ impl GroupOrderingPartial {
         ordering: &[PhysicalSortExpr],
     ) -> Result<Self> {
         assert!(!order_indices.is_empty());
-        assert_eq!(order_indices.len(), ordering.len());
+        assert!(order_indices.len() <= ordering.len());
 
-        let fields = ordering
+        // get only the section of ordering, that consist of group by 
expressions.
+        let fields = ordering[0..order_indices.len()]
             .iter()
             .map(|sort_expr| {
                 Ok(SortField::new_with_options(
diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt 
b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index d65433c4fb..de57956f0e 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -1922,7 +1922,59 @@ SELECT DISTINCT + col1 FROM tab2 AS cor0 GROUP BY 
cor0.col1
 59
 61
 
-
+# query below should work in multi partition, successfully.
+query II
+SELECT l.col0, LAST_VALUE(r.col1 ORDER BY r.col0) as last_col1
+FROM tab0 as l
+JOIN tab0 as r
+ON l.col0 = r.col0
+GROUP BY l.col0, l.col1, l.col2
+ORDER BY l.col0;
+----
+26 0
+43 81
+83 0
+
+# assert that above query works in indeed multi partitions
+# physical plan for this query should contain RepartitionExecs.
+# Aggregation should be in two stages, Partial + FinalPartitioned stages.
+query TT
+EXPLAIN SELECT l.col0, LAST_VALUE(r.col1 ORDER BY r.col0) as last_col1
+FROM tab0 as l
+JOIN tab0 as r
+ON l.col0 = r.col0
+GROUP BY l.col0, l.col1, l.col2
+ORDER BY l.col0;
+----
+logical_plan
+Sort: l.col0 ASC NULLS LAST
+--Projection: l.col0, LAST_VALUE(r.col1) ORDER BY [r.col0 ASC NULLS LAST] AS 
last_col1
+----Aggregate: groupBy=[[l.col0, l.col1, l.col2]], aggr=[[LAST_VALUE(r.col1) 
ORDER BY [r.col0 ASC NULLS LAST]]]
+------Inner Join: l.col0 = r.col0
+--------SubqueryAlias: l
+----------TableScan: tab0 projection=[col0, col1, col2]
+--------SubqueryAlias: r
+----------TableScan: tab0 projection=[col0, col1]
+physical_plan
+SortPreservingMergeExec: [col0@0 ASC NULLS LAST]
+--SortExec: expr=[col0@0 ASC NULLS LAST]
+----ProjectionExec: expr=[col0@0 as col0, LAST_VALUE(r.col1) ORDER BY [r.col0 
ASC NULLS LAST]@3 as last_col1]
+------AggregateExec: mode=FinalPartitioned, gby=[col0@0 as col0, col1@1 as 
col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)]
+--------SortExec: expr=[col0@3 ASC NULLS LAST]
+----------CoalesceBatchesExec: target_batch_size=8192
+------------RepartitionExec: partitioning=Hash([col0@0, col1@1, col2@2], 4), 
input_partitions=4
+--------------AggregateExec: mode=Partial, gby=[col0@0 as col0, col1@1 as 
col1, col2@2 as col2], aggr=[LAST_VALUE(r.col1)], ordering_mode=PartiallyOrdered
+----------------SortExec: expr=[col0@3 ASC NULLS LAST]
+------------------CoalesceBatchesExec: target_batch_size=8192
+--------------------HashJoinExec: mode=Partitioned, join_type=Inner, 
on=[(col0@0, col0@0)]
+----------------------CoalesceBatchesExec: target_batch_size=8192
+------------------------RepartitionExec: partitioning=Hash([col0@0], 4), 
input_partitions=4
+--------------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+----------------------------MemoryExec: partitions=1, partition_sizes=[3]
+----------------------CoalesceBatchesExec: target_batch_size=8192
+------------------------RepartitionExec: partitioning=Hash([col0@0], 4), 
input_partitions=4
+--------------------------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1
+----------------------------MemoryExec: partitions=1, partition_sizes=[3]
 
 # Columns in the table are a,b,c,d. Source is CsvExec which is ordered by
 # a,b,c column. Column a has cardinality 2, column b has cardinality 4.

Reply via email to