This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 55930fbffe fix: fix index bug and add test to check it (#7124)
55930fbffe is described below
commit 55930fbffe34611e42676c52a31f7e90ccd77ca5
Author: Mustafa Akur <[email protected]>
AuthorDate: Fri Jul 28 19:23:35 2023 +0300
fix: fix index bug and add test to check it (#7124)
* Fix bug, add test
* Add new test
---
.../tests/sqllogictests/test_files/groupby.slt | 68 ++++++++++++++++++++++
.../physical-expr/src/aggregate/first_last.rs | 4 +-
2 files changed, 70 insertions(+), 2 deletions(-)
diff --git a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
index b2677679c8..5db2ad9280 100644
--- a/datafusion/core/tests/sqllogictests/test_files/groupby.slt
+++ b/datafusion/core/tests/sqllogictests/test_files/groupby.slt
@@ -3348,3 +3348,71 @@ ORDER BY l.sn
2 75 3
3 200 4
4 100 5
+
+# create a table
+statement ok
+CREATE TABLE FOO (x int, y int) AS VALUES (1, 2), (2, 3), (1, 3);
+
+# make sure that query runs in multi partitions
+statement ok
+set datafusion.execution.target_partitions = 8;
+
+query I
+SELECT LAST_VALUE(x)
+FROM FOO;
+----
+1
+
+query II
+SELECT x, LAST_VALUE(x)
+FROM FOO
+GROUP BY x
+ORDER BY x;
+----
+1 1
+2 2
+
+query II
+SELECT y, LAST_VALUE(x)
+FROM FOO
+GROUP BY y
+ORDER BY y;
+----
+2 1
+3 1
+
+# plan of the query above should contain partial
+# and final aggregation stages
+query TT
+EXPLAIN SELECT LAST_VALUE(x)
+ FROM FOO;
+----
+logical_plan
+Aggregate: groupBy=[[]], aggr=[[LAST_VALUE(foo.x)]]
+--TableScan: foo projection=[x]
+physical_plan
+AggregateExec: mode=Final, gby=[], aggr=[LAST_VALUE(foo.x)]
+--CoalescePartitionsExec
+----AggregateExec: mode=Partial, gby=[], aggr=[LAST_VALUE(foo.x)]
+------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0]
+
+query I
+SELECT FIRST_VALUE(x)
+FROM FOO;
+----
+1
+
+# similarly plan of the above query should
+# contain partial and final aggregation stages.
+query TT
+EXPLAIN SELECT FIRST_VALUE(x)
+ FROM FOO;
+----
+logical_plan
+Aggregate: groupBy=[[]], aggr=[[FIRST_VALUE(foo.x)]]
+--TableScan: foo projection=[x]
+physical_plan
+AggregateExec: mode=Final, gby=[], aggr=[FIRST_VALUE(foo.x)]
+--CoalescePartitionsExec
+----AggregateExec: mode=Partial, gby=[], aggr=[FIRST_VALUE(foo.x)]
+------MemoryExec: partitions=8, partition_sizes=[1, 0, 0, 0, 0, 0, 0, 0]
diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs
b/datafusion/physical-expr/src/aggregate/first_last.rs
index f322419a7b..656f30a135 100644
--- a/datafusion/physical-expr/src/aggregate/first_last.rs
+++ b/datafusion/physical-expr/src/aggregate/first_last.rs
@@ -202,7 +202,7 @@ impl Accumulator for FirstValueAccumulator {
let is_set_flags = &states[last_idx];
let flags = is_set_flags.as_boolean();
let mut filtered_first_vals = vec![];
- for state in states.iter().take(last_idx - 1) {
+ for state in states.iter().take(last_idx) {
filtered_first_vals.push(compute::filter(state, flags)?)
}
self.update_batch(&filtered_first_vals)
@@ -387,7 +387,7 @@ impl Accumulator for LastValueAccumulator {
let is_set_flags = &states[last_idx];
let flags = is_set_flags.as_boolean();
let mut filtered_first_vals = vec![];
- for state in states.iter().take(last_idx - 1) {
+ for state in states.iter().take(last_idx) {
filtered_first_vals.push(compute::filter(state, flags)?)
}
self.update_batch(&filtered_first_vals)