This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 0c5c97b22a fix(functions-aggregate): drain CORR state vectors for
streaming aggregation (#19669)
0c5c97b22a is described below
commit 0c5c97b22a5286966be0c0ef104484d9e962515f
Author: Geoffrey Claude <[email protected]>
AuthorDate: Sun Jan 11 08:36:07 2026 +0100
fix(functions-aggregate): drain CORR state vectors for streaming
aggregation (#19669)
## Which issue does this PR close?
- N/A
## Rationale for this change
This change addresses a failure in the `CORR` aggregate function when
running in streaming mode. The `CorrelationGroupsAccumulator`
(introduced in [PR
#13581](https://github.com/apache/datafusion/pull/13581)) was failing to
drain its state vectors during `EmitTo::First` calls, causing internal
state to persist across emissions. This led to memory leaks, incorrect
results for subsequent groups, and "length mismatch" errors because the
internal vector sizes diverged from the number of emitted groups.
### Reproducer
```sql
# Setup data
CREATE TABLE stream_test (
g INT,
x DOUBLE,
y DOUBLE
) AS VALUES
(1, 1.0, 1.0), (1, 2.0, 2.0),
(2, 1.0, 5.0), (2, 2.0, 5.0),
(3, 1.0, 1.0), (3, 2.0, 2.0);
# Trigger streaming aggregation via sorted subquery
SELECT
g,
CORR(x, y)
FROM (SELECT * FROM stream_test ORDER BY g LIMIT 10000)
GROUP BY g
ORDER BY g;
```
**Before**: `DataFusion error: Arrow error: Invalid argument error: all
columns in a record batch must have the same length`
**After**:
```
1 1
2 NULL
3 1
```
## What changes are included in this PR?
This PR is structured into two commits: the first adds a failing test
case to demonstrate the issue, and the second implements the fix.
The accumulator now uses `emit_to.take_needed()` in both `evaluate` and
`state` to properly consume the emitted portions of the state vectors.
Additionally, the `size()` implementation has been updated to use vector
capacity for more accurate memory accounting.
## Are these changes tested?
Yes, a new test case in `aggregate.slt` triggers streaming aggregation
via an ordered subquery. This test previously crashed with an Arrow
length mismatch error and now produces correct results.
## Are there any user-facing changes?
Yes, SQL queries that trigger streaming aggregation using `CORR`
(typically those with specific ordering requirements) will now succeed
instead of failing with a length mismatch error.
---
datafusion/functions-aggregate/src/correlation.rs | 62 ++---
datafusion/sqllogictest/test_files/aggregate.slt | 281 ++++++++++++++++++++++
2 files changed, 315 insertions(+), 28 deletions(-)
diff --git a/datafusion/functions-aggregate/src/correlation.rs
b/datafusion/functions-aggregate/src/correlation.rs
index 538311dfa2..119f861a57 100644
--- a/datafusion/functions-aggregate/src/correlation.rs
+++ b/datafusion/functions-aggregate/src/correlation.rs
@@ -411,11 +411,15 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator {
}
fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
- let n = match emit_to {
- EmitTo::All => self.count.len(),
- EmitTo::First(n) => n,
- };
-
+ // Drain the state vectors for the groups being emitted
+ let counts = emit_to.take_needed(&mut self.count);
+ let sum_xs = emit_to.take_needed(&mut self.sum_x);
+ let sum_ys = emit_to.take_needed(&mut self.sum_y);
+ let sum_xys = emit_to.take_needed(&mut self.sum_xy);
+ let sum_xxs = emit_to.take_needed(&mut self.sum_xx);
+ let sum_yys = emit_to.take_needed(&mut self.sum_yy);
+
+ let n = counts.len();
let mut values = Vec::with_capacity(n);
let mut nulls = NullBufferBuilder::new(n);
@@ -427,14 +431,13 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator {
// result should be `Null` (according to PostgreSQL's behavior).
// - However, if any of the accumulated values contain NaN, the result
should
// be NaN regardless of the count (even for single-row groups).
- //
for i in 0..n {
- let count = self.count[i];
- let sum_x = self.sum_x[i];
- let sum_y = self.sum_y[i];
- let sum_xy = self.sum_xy[i];
- let sum_xx = self.sum_xx[i];
- let sum_yy = self.sum_yy[i];
+ let count = counts[i];
+ let sum_x = sum_xs[i];
+ let sum_y = sum_ys[i];
+ let sum_xy = sum_xys[i];
+ let sum_xx = sum_xxs[i];
+ let sum_yy = sum_yys[i];
// If BOTH sum_x AND sum_y are NaN, then both input values are NaN
→ return NaN
// If only ONE of them is NaN, then only one input value is NaN →
return NULL
@@ -470,18 +473,21 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator {
}
fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
- let n = match emit_to {
- EmitTo::All => self.count.len(),
- EmitTo::First(n) => n,
- };
+ // Drain the state vectors for the groups being emitted
+ let count = emit_to.take_needed(&mut self.count);
+ let sum_x = emit_to.take_needed(&mut self.sum_x);
+ let sum_y = emit_to.take_needed(&mut self.sum_y);
+ let sum_xy = emit_to.take_needed(&mut self.sum_xy);
+ let sum_xx = emit_to.take_needed(&mut self.sum_xx);
+ let sum_yy = emit_to.take_needed(&mut self.sum_yy);
Ok(vec![
- Arc::new(UInt64Array::from(self.count[0..n].to_vec())),
- Arc::new(Float64Array::from(self.sum_x[0..n].to_vec())),
- Arc::new(Float64Array::from(self.sum_y[0..n].to_vec())),
- Arc::new(Float64Array::from(self.sum_xy[0..n].to_vec())),
- Arc::new(Float64Array::from(self.sum_xx[0..n].to_vec())),
- Arc::new(Float64Array::from(self.sum_yy[0..n].to_vec())),
+ Arc::new(UInt64Array::from(count)),
+ Arc::new(Float64Array::from(sum_x)),
+ Arc::new(Float64Array::from(sum_y)),
+ Arc::new(Float64Array::from(sum_xy)),
+ Arc::new(Float64Array::from(sum_xx)),
+ Arc::new(Float64Array::from(sum_yy)),
])
}
@@ -537,12 +543,12 @@ impl GroupsAccumulator for CorrelationGroupsAccumulator {
}
fn size(&self) -> usize {
- size_of_val(&self.count)
- + size_of_val(&self.sum_x)
- + size_of_val(&self.sum_y)
- + size_of_val(&self.sum_xy)
- + size_of_val(&self.sum_xx)
- + size_of_val(&self.sum_yy)
+ self.count.capacity() * size_of::<u64>()
+ + self.sum_x.capacity() * size_of::<f64>()
+ + self.sum_y.capacity() * size_of::<f64>()
+ + self.sum_xy.capacity() * size_of::<f64>()
+ + self.sum_xx.capacity() * size_of::<f64>()
+ + self.sum_yy.capacity() * size_of::<f64>()
}
}
diff --git a/datafusion/sqllogictest/test_files/aggregate.slt
b/datafusion/sqllogictest/test_files/aggregate.slt
index 3c962a0f87..a5f3ef0413 100644
--- a/datafusion/sqllogictest/test_files/aggregate.slt
+++ b/datafusion/sqllogictest/test_files/aggregate.slt
@@ -8387,3 +8387,284 @@ ORDER BY grp, id;
statement ok
DROP TABLE string_agg_window_test;
+
+# Enable streaming aggregation by limiting partitions and ensuring sorted input
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+# Setup data
+statement ok
+CREATE TABLE stream_test (
+ g INT,
+ x DOUBLE,
+ y DOUBLE,
+ i INT,
+ b BOOLEAN,
+ s VARCHAR
+) AS VALUES
+(1, 1.0, 1.0, 1, true, 'a'), (1, 2.0, 2.0, 2, true, 'b'),
+(2, 1.0, 5.0, 3, false, 'c'), (2, 2.0, 5.0, 4, true, 'd'),
+(3, 1.0, 1.0, 7, false, 'e'), (3, 2.0, 2.0, 8, false, 'f');
+
+# Test comprehensive aggregates with streaming
+# This verifies that CORR and other aggregates work together in a streaming
plan (ordering_mode=Sorted)
+
+# Basic Aggregates
+query TT
+EXPLAIN SELECT
+ g,
+ COUNT(*),
+ SUM(x),
+ AVG(x),
+ MEAN(x),
+ MIN(x),
+ MAX(y),
+ BIT_AND(i),
+ BIT_OR(i),
+ BIT_XOR(i),
+ BOOL_AND(b),
+ BOOL_OR(b),
+ MEDIAN(x),
+ GROUPING(g),
+ VAR(x),
+ VAR_SAMP(x),
+ VAR_POP(x),
+ VAR_SAMPLE(x),
+ VAR_POPULATION(x),
+ STDDEV(x),
+ STDDEV_SAMP(x),
+ STDDEV_POP(x)
+FROM (SELECT * FROM stream_test ORDER BY g LIMIT 10000)
+GROUP BY g
+ORDER BY g;
+----
+logical_plan
+01)Sort: stream_test.g ASC NULLS LAST
+02)--Projection: stream_test.g, count(Int64(1)) AS count(*),
sum(stream_test.x), avg(stream_test.x), avg(stream_test.x) AS
mean(stream_test.x), min(stream_test.x), max(stream_test.y),
bit_and(stream_test.i), bit_or(stream_test.i), bit_xor(stream_test.i),
bool_and(stream_test.b), bool_or(stream_test.b), median(stream_test.x),
Int32(0) AS grouping(stream_test.g), var(stream_test.x), var(stream_test.x) AS
var_samp(stream_test.x), var_pop(stream_test.x), var(stream_test.x) AS
var_sample(stre [...]
+03)----Aggregate: groupBy=[[stream_test.g]], aggr=[[count(Int64(1)),
sum(stream_test.x), avg(stream_test.x), min(stream_test.x), max(stream_test.y),
bit_and(stream_test.i), bit_or(stream_test.i), bit_xor(stream_test.i),
bool_and(stream_test.b), bool_or(stream_test.b), median(stream_test.x),
var(stream_test.x), var_pop(stream_test.x), stddev(stream_test.x),
stddev_pop(stream_test.x)]]
+04)------Sort: stream_test.g ASC NULLS LAST, fetch=10000
+05)--------TableScan: stream_test projection=[g, x, y, i, b]
+physical_plan
+01)ProjectionExec: expr=[g@0 as g, count(Int64(1))@1 as count(*),
sum(stream_test.x)@2 as sum(stream_test.x), avg(stream_test.x)@3 as
avg(stream_test.x), avg(stream_test.x)@3 as mean(stream_test.x),
min(stream_test.x)@4 as min(stream_test.x), max(stream_test.y)@5 as
max(stream_test.y), bit_and(stream_test.i)@6 as bit_and(stream_test.i),
bit_or(stream_test.i)@7 as bit_or(stream_test.i), bit_xor(stream_test.i)@8 as
bit_xor(stream_test.i), bool_and(stream_test.b)@9 as bool_and(stream_test.b
[...]
+02)--AggregateExec: mode=Single, gby=[g@0 as g], aggr=[count(Int64(1)),
sum(stream_test.x), avg(stream_test.x), min(stream_test.x), max(stream_test.y),
bit_and(stream_test.i), bit_or(stream_test.i), bit_xor(stream_test.i),
bool_and(stream_test.b), bool_or(stream_test.b), median(stream_test.x),
var(stream_test.x), var_pop(stream_test.x), stddev(stream_test.x),
stddev_pop(stream_test.x)], ordering_mode=Sorted
+03)----SortExec: TopK(fetch=10000), expr=[g@0 ASC NULLS LAST],
preserve_partitioning=[false]
+04)------DataSourceExec: partitions=1, partition_sizes=[1]
+
+query IIRRRRRIIIBBRIRRRRRRRR
+SELECT
+ g,
+ COUNT(*),
+ SUM(x),
+ AVG(x),
+ MEAN(x),
+ MIN(x),
+ MAX(y),
+ BIT_AND(i),
+ BIT_OR(i),
+ BIT_XOR(i),
+ BOOL_AND(b),
+ BOOL_OR(b),
+ MEDIAN(x),
+ GROUPING(g),
+ VAR(x),
+ VAR_SAMP(x),
+ VAR_POP(x),
+ VAR_SAMPLE(x),
+ VAR_POPULATION(x),
+ STDDEV(x),
+ STDDEV_SAMP(x),
+ STDDEV_POP(x)
+FROM (SELECT * FROM stream_test ORDER BY g LIMIT 10000)
+GROUP BY g
+ORDER BY g;
+----
+1 2 3 1.5 1.5 1 2 0 3 3 true true 1.5 0 0.5 0.5 0.25 0.5 0.25 0.707106781187
0.707106781187 0.5
+2 2 3 1.5 1.5 1 5 0 7 7 false true 1.5 0 0.5 0.5 0.25 0.5 0.25 0.707106781187
0.707106781187 0.5
+3 2 3 1.5 1.5 1 2 0 15 15 false false 1.5 0 0.5 0.5 0.25 0.5 0.25
0.707106781187 0.707106781187 0.5
+
+# Ordered Aggregates (by x)
+query TT
+EXPLAIN SELECT
+ g,
+ ARRAY_AGG(x ORDER BY x),
+ ARRAY_AGG(DISTINCT x ORDER BY x),
+ FIRST_VALUE(x ORDER BY x),
+ LAST_VALUE(x ORDER BY x),
+ NTH_VALUE(x, 1 ORDER BY x)
+FROM (SELECT * FROM stream_test ORDER BY g LIMIT 10000)
+GROUP BY g
+ORDER BY g;
+----
+logical_plan
+01)Sort: stream_test.g ASC NULLS LAST
+02)--Aggregate: groupBy=[[stream_test.g]], aggr=[[array_agg(stream_test.x)
ORDER BY [stream_test.x ASC NULLS LAST], array_agg(DISTINCT stream_test.x)
ORDER BY [stream_test.x ASC NULLS LAST], first_value(stream_test.x) ORDER BY
[stream_test.x ASC NULLS LAST], last_value(stream_test.x) ORDER BY
[stream_test.x ASC NULLS LAST], nth_value(stream_test.x, Int64(1)) ORDER BY
[stream_test.x ASC NULLS LAST]]]
+03)----Sort: stream_test.g ASC NULLS LAST, fetch=10000
+04)------TableScan: stream_test projection=[g, x]
+physical_plan
+01)AggregateExec: mode=Single, gby=[g@0 as g], aggr=[array_agg(stream_test.x)
ORDER BY [stream_test.x ASC NULLS LAST], array_agg(DISTINCT stream_test.x)
ORDER BY [stream_test.x ASC NULLS LAST], first_value(stream_test.x) ORDER BY
[stream_test.x ASC NULLS LAST], last_value(stream_test.x) ORDER BY
[stream_test.x ASC NULLS LAST], nth_value(stream_test.x,Int64(1)) ORDER BY
[stream_test.x ASC NULLS LAST]], ordering_mode=Sorted
+02)--SortExec: TopK(fetch=10000), expr=[g@0 ASC NULLS LAST, x@1 ASC NULLS
LAST], preserve_partitioning=[false]
+03)----DataSourceExec: partitions=1, partition_sizes=[1]
+
+query I??RRR
+SELECT
+ g,
+ ARRAY_AGG(x ORDER BY x),
+ ARRAY_AGG(DISTINCT x ORDER BY x),
+ FIRST_VALUE(x ORDER BY x),
+ LAST_VALUE(x ORDER BY x),
+ NTH_VALUE(x, 1 ORDER BY x)
+FROM (SELECT * FROM stream_test ORDER BY g LIMIT 10000)
+GROUP BY g
+ORDER BY g;
+----
+1 [1.0, 2.0] [1.0, 2.0] 1 2 1
+2 [1.0, 2.0] [1.0, 2.0] 1 2 1
+3 [1.0, 2.0] [1.0, 2.0] 1 2 1
+
+# Ordered Aggregates (by s)
+query TT
+EXPLAIN SELECT
+ g,
+ ARRAY_AGG(s ORDER BY s),
+ STRING_AGG(s, '|' ORDER BY s),
+ STRING_AGG(DISTINCT s, '|' ORDER BY s)
+FROM (SELECT * FROM stream_test ORDER BY g LIMIT 10000)
+GROUP BY g
+ORDER BY g;
+----
+logical_plan
+01)Sort: stream_test.g ASC NULLS LAST
+02)--Aggregate: groupBy=[[stream_test.g]], aggr=[[array_agg(stream_test.s)
ORDER BY [stream_test.s ASC NULLS LAST], string_agg(stream_test.s, Utf8("|"))
ORDER BY [stream_test.s ASC NULLS LAST], string_agg(DISTINCT stream_test.s,
Utf8("|")) ORDER BY [stream_test.s ASC NULLS LAST]]]
+03)----Sort: stream_test.g ASC NULLS LAST, fetch=10000
+04)------TableScan: stream_test projection=[g, s]
+physical_plan
+01)AggregateExec: mode=Single, gby=[g@0 as g], aggr=[array_agg(stream_test.s)
ORDER BY [stream_test.s ASC NULLS LAST], string_agg(stream_test.s,Utf8("|"))
ORDER BY [stream_test.s ASC NULLS LAST], string_agg(DISTINCT
stream_test.s,Utf8("|")) ORDER BY [stream_test.s ASC NULLS LAST]],
ordering_mode=Sorted
+02)--SortExec: TopK(fetch=10000), expr=[g@0 ASC NULLS LAST, s@1 ASC NULLS
LAST], preserve_partitioning=[false]
+03)----DataSourceExec: partitions=1, partition_sizes=[1]
+
+query I?TT
+SELECT
+ g,
+ ARRAY_AGG(s ORDER BY s),
+ STRING_AGG(s, '|' ORDER BY s),
+ STRING_AGG(DISTINCT s, '|' ORDER BY s)
+FROM (SELECT * FROM stream_test ORDER BY g LIMIT 10000)
+GROUP BY g
+ORDER BY g;
+----
+1 [a, b] a|b a|b
+2 [c, d] c|d c|d
+3 [e, f] e|f e|f
+
+# Statistical & Regression Aggregates
+query TT
+EXPLAIN SELECT
+ g,
+ CORR(x, y),
+ COVAR(x, y),
+ COVAR_SAMP(x, y),
+ COVAR_POP(x, y),
+ REGR_SXX(x, y),
+ REGR_SXY(x, y),
+ REGR_SYY(x, y),
+ REGR_AVGX(x, y),
+ REGR_AVGY(x, y),
+ REGR_COUNT(x, y),
+ REGR_SLOPE(x, y),
+ REGR_INTERCEPT(x, y),
+ REGR_R2(x, y)
+FROM (SELECT * FROM stream_test ORDER BY g LIMIT 10000)
+GROUP BY g
+ORDER BY g;
+----
+logical_plan
+01)Sort: stream_test.g ASC NULLS LAST
+02)--Projection: stream_test.g, corr(stream_test.x,stream_test.y),
covar_samp(stream_test.x,stream_test.y) AS covar(stream_test.x,stream_test.y),
covar_samp(stream_test.x,stream_test.y),
covar_pop(stream_test.x,stream_test.y), regr_sxx(stream_test.x,stream_test.y),
regr_sxy(stream_test.x,stream_test.y), regr_syy(stream_test.x,stream_test.y),
regr_avgx(stream_test.x,stream_test.y), regr_avgy(stream_test.x,stream_test.y),
regr_count(stream_test.x,stream_test.y), regr_slope(stream_test.x,st [...]
+03)----Aggregate: groupBy=[[stream_test.g]], aggr=[[corr(stream_test.x,
stream_test.y), covar_samp(stream_test.x, stream_test.y),
covar_pop(stream_test.x, stream_test.y), regr_sxx(stream_test.x,
stream_test.y), regr_sxy(stream_test.x, stream_test.y), regr_syy(stream_test.x,
stream_test.y), regr_avgx(stream_test.x, stream_test.y),
regr_avgy(stream_test.x, stream_test.y), regr_count(stream_test.x,
stream_test.y), regr_slope(stream_test.x, stream_test.y),
regr_intercept(stream_test.x, strea [...]
+04)------Sort: stream_test.g ASC NULLS LAST, fetch=10000
+05)--------TableScan: stream_test projection=[g, x, y]
+physical_plan
+01)ProjectionExec: expr=[g@0 as g, corr(stream_test.x,stream_test.y)@1 as
corr(stream_test.x,stream_test.y), covar_samp(stream_test.x,stream_test.y)@2 as
covar(stream_test.x,stream_test.y), covar_samp(stream_test.x,stream_test.y)@2
as covar_samp(stream_test.x,stream_test.y),
covar_pop(stream_test.x,stream_test.y)@3 as
covar_pop(stream_test.x,stream_test.y), regr_sxx(stream_test.x,stream_test.y)@4
as regr_sxx(stream_test.x,stream_test.y),
regr_sxy(stream_test.x,stream_test.y)@5 as regr_sx [...]
+02)--AggregateExec: mode=Single, gby=[g@0 as g],
aggr=[corr(stream_test.x,stream_test.y),
covar_samp(stream_test.x,stream_test.y),
covar_pop(stream_test.x,stream_test.y), regr_sxx(stream_test.x,stream_test.y),
regr_sxy(stream_test.x,stream_test.y), regr_syy(stream_test.x,stream_test.y),
regr_avgx(stream_test.x,stream_test.y), regr_avgy(stream_test.x,stream_test.y),
regr_count(stream_test.x,stream_test.y),
regr_slope(stream_test.x,stream_test.y),
regr_intercept(stream_test.x,stream_test.y [...]
+03)----SortExec: TopK(fetch=10000), expr=[g@0 ASC NULLS LAST],
preserve_partitioning=[false]
+04)------DataSourceExec: partitions=1, partition_sizes=[1]
+
+query IRRRRRRRRRIRRR
+SELECT
+ g,
+ CORR(x, y),
+ COVAR(x, y),
+ COVAR_SAMP(x, y),
+ COVAR_POP(x, y),
+ REGR_SXX(x, y),
+ REGR_SXY(x, y),
+ REGR_SYY(x, y),
+ REGR_AVGX(x, y),
+ REGR_AVGY(x, y),
+ REGR_COUNT(x, y),
+ REGR_SLOPE(x, y),
+ REGR_INTERCEPT(x, y),
+ REGR_R2(x, y)
+FROM (SELECT * FROM stream_test ORDER BY g LIMIT 10000)
+GROUP BY g
+ORDER BY g;
+----
+1 1 0.5 0.5 0.25 0.5 0.5 0.5 1.5 1.5 2 1 0 1
+2 NULL 0 0 0 0 0 0.5 5 1.5 2 NULL NULL NULL
+3 1 0.5 0.5 0.25 0.5 0.5 0.5 1.5 1.5 2 1 0 1
+
+# Approximate and Ordered-Set Aggregates
+query TT
+EXPLAIN SELECT
+ g,
+ APPROX_DISTINCT(i),
+ APPROX_MEDIAN(x),
+ PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY x),
+ QUANTILE_CONT(0.5) WITHIN GROUP (ORDER BY x),
+ APPROX_PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY x),
+ APPROX_PERCENTILE_CONT_WITH_WEIGHT(1.0, 0.5) WITHIN GROUP (ORDER BY x),
+ PERCENTILE_CONT(x, 0.5),
+ APPROX_PERCENTILE_CONT(x, 0.5),
+ APPROX_PERCENTILE_CONT_WITH_WEIGHT(x, 1.0, 0.5)
+FROM (SELECT * FROM stream_test ORDER BY g LIMIT 10000)
+GROUP BY g
+ORDER BY g;
+----
+logical_plan
+01)Sort: stream_test.g ASC NULLS LAST
+02)--Projection: stream_test.g, approx_distinct(stream_test.i),
approx_median(stream_test.x), percentile_cont(Float64(0.5)) WITHIN GROUP
[stream_test.x ASC NULLS LAST], percentile_cont(Float64(0.5)) WITHIN GROUP
[stream_test.x ASC NULLS LAST] AS quantile_cont(stream_test.x,Float64(0.5)),
approx_percentile_cont(Float64(0.5)) WITHIN GROUP [stream_test.x ASC NULLS
LAST], approx_percentile_cont_with_weight(Float64(1),Float64(0.5)) WITHIN GROUP
[stream_test.x ASC NULLS LAST], percentile_cont( [...]
+03)----Aggregate: groupBy=[[stream_test.g]],
aggr=[[approx_distinct(stream_test.i), approx_median(stream_test.x),
percentile_cont(stream_test.x, Float64(0.5)) ORDER BY [stream_test.x ASC NULLS
LAST], approx_percentile_cont(stream_test.x, Float64(0.5)) ORDER BY
[stream_test.x ASC NULLS LAST],
approx_percentile_cont_with_weight(stream_test.x, Float64(1), Float64(0.5))
ORDER BY [stream_test.x ASC NULLS LAST], percentile_cont(stream_test.x,
Float64(0.5)), approx_percentile_cont(stream_test.x [...]
+04)------Sort: stream_test.g ASC NULLS LAST, fetch=10000
+05)--------TableScan: stream_test projection=[g, x, i]
+physical_plan
+01)ProjectionExec: expr=[g@0 as g, approx_distinct(stream_test.i)@1 as
approx_distinct(stream_test.i), approx_median(stream_test.x)@2 as
approx_median(stream_test.x), percentile_cont(Float64(0.5)) WITHIN GROUP
[stream_test.x ASC NULLS LAST]@3 as percentile_cont(Float64(0.5)) WITHIN GROUP
[stream_test.x ASC NULLS LAST], percentile_cont(Float64(0.5)) WITHIN GROUP
[stream_test.x ASC NULLS LAST]@3 as quantile_cont(stream_test.x,Float64(0.5)),
approx_percentile_cont(Float64(0.5)) WITHIN GROUP [...]
+02)--AggregateExec: mode=Single, gby=[g@0 as g],
aggr=[approx_distinct(stream_test.i), approx_median(stream_test.x),
percentile_cont(Float64(0.5)) WITHIN GROUP [stream_test.x ASC NULLS LAST],
approx_percentile_cont(Float64(0.5)) WITHIN GROUP [stream_test.x ASC NULLS
LAST], approx_percentile_cont_with_weight(Float64(1),Float64(0.5)) WITHIN GROUP
[stream_test.x ASC NULLS LAST], percentile_cont(stream_test.x,Float64(0.5)),
approx_percentile_cont(stream_test.x,Float64(0.5)), approx_percentil [...]
+03)----SortExec: TopK(fetch=10000), expr=[g@0 ASC NULLS LAST],
preserve_partitioning=[false]
+04)------DataSourceExec: partitions=1, partition_sizes=[1]
+
+query IIRRRRRRRR
+SELECT
+ g,
+ APPROX_DISTINCT(i),
+ APPROX_MEDIAN(x),
+ PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY x),
+ QUANTILE_CONT(0.5) WITHIN GROUP (ORDER BY x),
+ APPROX_PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY x),
+ APPROX_PERCENTILE_CONT_WITH_WEIGHT(1.0, 0.5) WITHIN GROUP (ORDER BY x),
+ PERCENTILE_CONT(x, 0.5),
+ APPROX_PERCENTILE_CONT(x, 0.5),
+ APPROX_PERCENTILE_CONT_WITH_WEIGHT(x, 1.0, 0.5)
+FROM (SELECT * FROM stream_test ORDER BY g LIMIT 10000)
+GROUP BY g
+ORDER BY g;
+----
+1 2 1.5 1.5 1.5 1.5 1.5 1.5 1.5 1.5
+2 2 1.5 1.5 1.5 1.5 1.5 1.5 1.5 1.5
+3 2 1.5 1.5 1.5 1.5 1.5 1.5 1.5 1.5
+
+statement ok
+DROP TABLE stream_test;
+
+# Restore default target partitions
+statement ok
+set datafusion.execution.target_partitions = 4;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]