This is an automated email from the ASF dual-hosted git repository.
jayzhan pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new ab1de2c570 Enhance LastValueAccumulator logic and add SQL logic tests
for last_value function (#13980)
ab1de2c570 is described below
commit ab1de2c5707c137978889564b2e4c1b45849ff0f
Author: Jay Zhan <[email protected]>
AuthorDate: Sat Jan 4 06:47:51 2025 +0800
Enhance LastValueAccumulator logic and add SQL logic tests for last_value
function (#13980)
- Updated LastValueAccumulator to include requirement satisfaction check
before updating the last value.
- Added SQL logic tests to verify the behavior of the last_value function
with merge batches and ensure correct aggregation in various scenarios.
---
datafusion/functions-aggregate/src/first_last.rs | 1 +
datafusion/sqllogictest/test_files/aggregate.slt | 66 ++++++++++++++++++++++++
2 files changed, 67 insertions(+)
diff --git a/datafusion/functions-aggregate/src/first_last.rs
b/datafusion/functions-aggregate/src/first_last.rs
index 9ad55d91a6..8ef139ae61 100644
--- a/datafusion/functions-aggregate/src/first_last.rs
+++ b/datafusion/functions-aggregate/src/first_last.rs
@@ -651,6 +651,7 @@ impl Accumulator for LastValueAccumulator {
// Either there is no existing value, or there is a newer (latest)
// version in the new data:
if !self.is_set
+ || self.requirement_satisfied
|| compare_rows(&self.orderings, last_ordering,
&sort_options)?.is_lt()
{
// Update with last value in the state. Note that we should
exclude the
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt
b/datafusion/sqllogictest/test_files/aggregate.slt
index cd62e56253..ffc441d317 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -6137,3 +6137,69 @@ SELECT v1 FROM t1 WHERE ((count(v1) % 1) << 1) > 0;
statement ok
DROP TABLE t1;
+
+# Test last function with merge batch
+query II
+with A as (
+ select 1 as id, 10 as foo
+ UNION ALL
+ select 1, 10
+ UNION ALL
+ select 1, 10
+ UNION ALL
+ select 1, 10
+ UNION ALL
+ select 1, 10
+ ---- The order is non-deterministic, keep the value the same
+) select last_value(a.foo), sum(distinct 1) from A a group by a.id;
+----
+10 1
+
+# It has only AggregateExec with FinalPartitioned mode, so `merge_batch` is
used
+# If the plan is changed, whether the `merge_batch` is used should be verified
to ensure the test coverage
+query TT
+explain with A as (
+ select 1 as id, 2 as foo
+ UNION ALL
+ select 1, 4
+ UNION ALL
+ select 1, 5
+ UNION ALL
+ select 1, 3
+ UNION ALL
+ select 1, 2
+) select last_value(a.foo order by a.foo), sum(distinct 1) from A a group by
a.id;
+----
+logical_plan
+01)Projection: last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST], sum(DISTINCT
Int64(1))
+02)--Aggregate: groupBy=[[a.id]], aggr=[[last_value(a.foo) ORDER BY [a.foo ASC
NULLS LAST], sum(DISTINCT Int64(1))]]
+03)----SubqueryAlias: a
+04)------SubqueryAlias: a
+05)--------Union
+06)----------Projection: Int64(1) AS id, Int64(2) AS foo
+07)------------EmptyRelation
+08)----------Projection: Int64(1) AS id, Int64(4) AS foo
+09)------------EmptyRelation
+10)----------Projection: Int64(1) AS id, Int64(5) AS foo
+11)------------EmptyRelation
+12)----------Projection: Int64(1) AS id, Int64(3) AS foo
+13)------------EmptyRelation
+14)----------Projection: Int64(1) AS id, Int64(2) AS foo
+15)------------EmptyRelation
+physical_plan
+01)ProjectionExec: expr=[last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST]@1
as last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST], sum(DISTINCT Int64(1))@2
as sum(DISTINCT Int64(1))]
+02)--AggregateExec: mode=FinalPartitioned, gby=[id@0 as id],
aggr=[last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST], sum(DISTINCT
Int64(1))], ordering_mode=Sorted
+03)----CoalesceBatchesExec: target_batch_size=8192
+04)------RepartitionExec: partitioning=Hash([id@0], 4), input_partitions=5
+05)--------AggregateExec: mode=Partial, gby=[id@0 as id],
aggr=[last_value(a.foo) ORDER BY [a.foo ASC NULLS LAST], sum(DISTINCT
Int64(1))], ordering_mode=Sorted
+06)----------UnionExec
+07)------------ProjectionExec: expr=[1 as id, 2 as foo]
+08)--------------PlaceholderRowExec
+09)------------ProjectionExec: expr=[1 as id, 4 as foo]
+10)--------------PlaceholderRowExec
+11)------------ProjectionExec: expr=[1 as id, 5 as foo]
+12)--------------PlaceholderRowExec
+13)------------ProjectionExec: expr=[1 as id, 3 as foo]
+14)--------------PlaceholderRowExec
+15)------------ProjectionExec: expr=[1 as id, 2 as foo]
+16)--------------PlaceholderRowExec
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]