This is an automated email from the ASF dual-hosted git repository.
kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f65c166327b Windowed aggregates should update the aggregation value
based on final compute (#16244)
f65c166327b is described below
commit f65c166327b7fb126859d814c34e33d169a5c2b0
Author: Sree Charan Manamala <[email protected]>
AuthorDate: Fri Apr 12 11:58:33 2024 +0530
Windowed aggregates should update the aggregation value based on final
compute (#16244)
---
.../semantic/DefaultFramedOnHeapAggregatable.java | 30 ++++++++++++----------
.../wikipediaFinalComputedAggregation.sqlTest | 18 +++++++++++++
2 files changed, 34 insertions(+), 14 deletions(-)
diff --git
a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java
b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java
index 8b34b62f06f..106fa9674a0 100644
---
a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java
+++
b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java
@@ -318,10 +318,11 @@ public class DefaultFramedOnHeapAggregatable implements
FramedOnHeapAggregatable
currentValue,
0
);
- Aggregator combiningAgg =
aggFactory.getCombiningFactory().factorize(combiningFactory);
- combiningAgg.aggregate();
- return combiningAgg.get();
+ try (Aggregator combiningAgg =
aggFactory.getCombiningFactory().factorize(combiningFactory)) {
+ combiningAgg.aggregate();
+ return aggFactory.finalizeComputation(combiningAgg.get());
+ }
}
/**
@@ -458,7 +459,7 @@ public class DefaultFramedOnHeapAggregatable implements
FramedOnHeapAggregatable
if (rowIdProvider.get() < numRows) {
for (int i = 0; i < aggs.length; i++) {
aggs[i].aggregate();
- results[i][resultStorageIndex] = aggs[i].get();
+ results[i][resultStorageIndex] =
aggFactories[i].finalizeComputation(aggs[i].get());
aggs[i].close();
aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
}
@@ -471,7 +472,7 @@ public class DefaultFramedOnHeapAggregatable implements
FramedOnHeapAggregatable
for (int rowId = rowIdProvider.get(); rowId < numRows; ++rowId) {
for (int i = 0; i < aggs.length; i++) {
aggs[i].aggregate();
- results[i][resultStorageIndex] = aggs[i].get();
+ results[i][resultStorageIndex] =
aggFactories[i].finalizeComputation(aggs[i].get());
aggs[i].close();
// Use a combining aggregator to combine the result we just got with
the result from the previous row
@@ -489,7 +490,7 @@ public class DefaultFramedOnHeapAggregatable implements
FramedOnHeapAggregatable
combiningAgg.aggregate();
combiningFactory.increment();
combiningAgg.aggregate();
- results[i][resultStorageIndex] = combiningAgg.get();
+ results[i][resultStorageIndex] =
aggFactories[i].finalizeComputation(combiningAgg.get());
combiningAgg.close();
aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
@@ -545,7 +546,7 @@ public class DefaultFramedOnHeapAggregatable implements
FramedOnHeapAggregatable
if (rowIdProvider.get() >= 0) {
for (int i = 0; i < aggs.length; i++) {
aggs[i].aggregate();
- results[i][resultStorageIndex] = aggs[i].get();
+ results[i][resultStorageIndex] =
aggFactories[i].finalizeComputation(aggs[i].get());
aggs[i].close();
aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
}
@@ -558,7 +559,7 @@ public class DefaultFramedOnHeapAggregatable implements
FramedOnHeapAggregatable
for (int rowId = rowIdProvider.get(); rowId >= 0; --rowId) {
for (int i = 0; i < aggs.length; i++) {
aggs[i].aggregate();
- results[i][resultStorageIndex] = aggs[i].get();
+ results[i][resultStorageIndex] =
aggFactories[i].finalizeComputation(aggs[i].get());
aggs[i].close();
// Use a combining aggregator to combine the result we just got with
the result from the previous row
@@ -576,7 +577,7 @@ public class DefaultFramedOnHeapAggregatable implements
FramedOnHeapAggregatable
combiningAgg.aggregate();
combiningFactory.decrement();
combiningAgg.aggregate();
- results[i][resultStorageIndex] = combiningAgg.get();
+ results[i][resultStorageIndex] =
aggFactories[i].finalizeComputation(combiningAgg.get());
combiningAgg.close();
aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
@@ -667,7 +668,7 @@ public class DefaultFramedOnHeapAggregatable implements
FramedOnHeapAggregatable
}
for (int i = 0; i < aggFactories.length; ++i) {
- results[i][resultStorageIndex] = aggregators[i][nextIndex].get();
+ results[i][resultStorageIndex] =
aggFactories[i].finalizeComputation(aggregators[i][nextIndex].get());
aggregators[i][nextIndex].close();
aggregators[i][nextIndex] =
aggFactories[i].factorize(columnSelectorFactory);
}
@@ -706,7 +707,7 @@ public class DefaultFramedOnHeapAggregatable implements
FramedOnHeapAggregatable
}
for (int i = 0; i < aggFactories.length; ++i) {
- results[i][resultStorageIndex] = aggregators[i][nextIndex].get();
+ results[i][resultStorageIndex] =
aggFactories[i].finalizeComputation(aggregators[i][nextIndex].get());
aggregators[i][nextIndex].close();
aggregators[i][nextIndex] = null;
}
@@ -719,7 +720,8 @@ public class DefaultFramedOnHeapAggregatable implements
FramedOnHeapAggregatable
// End Phase 3, anything left in the window needs to be collected and put
into our results
for (; nextIndex < windowSize; ++nextIndex) {
for (int i = 0; i < aggFactories.length; ++i) {
- results[i][resultStorageIndex] = aggregators[i][nextIndex].get();
+ results[i][resultStorageIndex] =
aggFactories[i].finalizeComputation(aggregators[i][nextIndex].get());
+ aggregators[i][nextIndex].close();
aggregators[i][nextIndex] = null;
}
++resultStorageIndex;
@@ -772,7 +774,7 @@ public class DefaultFramedOnHeapAggregatable implements
FramedOnHeapAggregatable
if (rowId >= upperOffset) {
for (int i = 0; i < aggregators.length; ++i) {
- results[i][resultStorageIndex] = aggregators[i][startIndex].get();
+ results[i][resultStorageIndex] =
aggFactories[i].finalizeComputation(aggregators[i][startIndex].get());
aggregators[i][startIndex].close();
aggregators[i][startIndex] = null;
}
@@ -790,7 +792,7 @@ public class DefaultFramedOnHeapAggregatable implements
FramedOnHeapAggregatable
for (; startIndex < windowSize; ++startIndex) {
for (int i = 0; i < aggregators.length; ++i) {
- results[i][resultStorageIndex] = aggregators[i][startIndex].get();
+ results[i][resultStorageIndex] =
aggFactories[i].finalizeComputation(aggregators[i][startIndex].get());
aggregators[i][startIndex].close();
aggregators[i][startIndex] = null;
}
diff --git
a/sql/src/test/resources/calcite/tests/window/wikipediaFinalComputedAggregation.sqlTest
b/sql/src/test/resources/calcite/tests/window/wikipediaFinalComputedAggregation.sqlTest
new file mode 100644
index 00000000000..c45dcb8b688
--- /dev/null
+++
b/sql/src/test/resources/calcite/tests/window/wikipediaFinalComputedAggregation.sqlTest
@@ -0,0 +1,18 @@
+type: "operatorValidation"
+
+sql: |
+ SELECT
+ countryName,
+ cityName,
+ channel,
+ string_agg(channel, '|') over (partition by cityName order by
countryName) s
+ FROM wikipedia
+ WHERE countryName='Austria'
+ GROUP BY 1, 2, 3
+
+expectedResults:
+ - ["Austria",null,"#de.wikipedia","#de.wikipedia"]
+ - ["Austria","Horsching","#de.wikipedia","#de.wikipedia"]
+ -
["Austria","Vienna","#de.wikipedia","#de.wikipedia|#es.wikipedia|#tr.wikipedia"]
+ -
["Austria","Vienna","#es.wikipedia","#de.wikipedia|#es.wikipedia|#tr.wikipedia"]
+ -
["Austria","Vienna","#tr.wikipedia","#de.wikipedia|#es.wikipedia|#tr.wikipedia"]
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]