This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 55930fbffe fix: fix index bug and add test to check it (#7124)
55930fbffe is described below

commit 55930fbffe34611e42676c52a31f7e90ccd77ca5
Author: Mustafa Akur <[email protected]>
AuthorDate: Fri Jul 28 19:23:35 2023 +0300

    fix: fix index bug and add test to check it (#7124)
    
    * Fix bug, add test
    
    * Add new test
---
 .../tests/sqllogictests/test_files/groupby.slt     | 68 ++++++++++++++++++++++
 .../physical-expr/src/aggregate/first_last.rs      |  4 +-
 2 files changed, 70 insertions(+), 2 deletions(-)

diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt 
b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index b2677679c8..5db2ad9280 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -3348,3 +3348,71 @@ ORDER BY l.sn
 2 75 3
 3 200 4
 4 100 5
+
+# create a table
+statement ok
+CREATE TABLE FOO (x int, y int) AS VALUES (1, 2), (2, 3), (1, 3);
+
+# make sure that query runs in multi partitions
+statement ok
+set datafusion.execution.target_partitions = 8;
+
+query I
+SELECT LAST_VALUE(x)
+FROM FOO;
+----
+1
+
+query II
+SELECT x, LAST_VALUE(x)
+FROM FOO
+GROUP BY x
+ORDER BY x;
+----
+1 1
+2 2
+
+query II
+SELECT y, LAST_VALUE(x)
+FROM FOO
+GROUP BY y
+ORDER BY y;
+----
+2 1
+3 1
+
+# plan of the query above should contain partial
+# and final aggregation stages
+query TT
+EXPLAIN SELECT LAST_VALUE(x)
+  FROM FOO;
+----
+logical_plan
+Aggregate: groupBy=[[]], aggr=[[LAST_VALUE(foo.x)]]
+--TableScan: foo projection=[x]
+physical_plan
+AggregateExec: mode=Final, gby=[], aggr=[LAST_VALUE(foo.x)]
+--CoalescePartitionsExec
+----AggregateExec: mode=Partial, gby=[], aggr=[LAST_VALUE(foo.x)]
+------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0]
+
+query I
+SELECT FIRST_VALUE(x)
+FROM FOO;
+----
+1
+
+# similarly plan of the above query should
+# contain partial and final aggregation stages.
+query TT
+EXPLAIN SELECT FIRST_VALUE(x)
+  FROM FOO;
+----
+logical_plan
+Aggregate: groupBy=[[]], aggr=[[FIRST_VALUE(foo.x)]]
+--TableScan: foo projection=[x]
+physical_plan
+AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(foo.x)]
+--CoalescePartitionsExec
+----AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(foo.x)]
+------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0]
diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs 
b/datafusion/physical-expr/src/aggregate/first_last.rs
index f322419a7b..656f30a135 100644
--- a/datafusion/physical-expr/src/aggregate/first_last.rs
+++ b/datafusion/physical-expr/src/aggregate/first_last.rs
@@ -202,7 +202,7 @@ impl Accumulator for FirstValueAccumulator {
         let is_set_flags = &states[last_idx];
         let flags = is_set_flags.as_boolean();
         let mut filtered_first_vals = vec![];
-        for state in states.iter().take(last_idx - 1) {
+        for state in states.iter().take(last_idx) {
             filtered_first_vals.push(compute::filter(state, flags)?)
         }
         self.update_batch(&filtered_first_vals)
@@ -387,7 +387,7 @@ impl Accumulator for LastValueAccumulator {
         let is_set_flags = &states[last_idx];
         let flags = is_set_flags.as_boolean();
         let mut filtered_first_vals = vec![];
-        for state in states.iter().take(last_idx - 1) {
+        for state in states.iter().take(last_idx) {
             filtered_first_vals.push(compute::filter(state, flags)?)
         }
         self.update_batch(&filtered_first_vals)

Reply via email to