This is an automated email from the ASF dual-hosted git repository.

kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f65c166327b Windowed aggregates should update the aggregation value 
based on final compute (#16244)
f65c166327b is described below

commit f65c166327b7fb126859d814c34e33d169a5c2b0
Author: Sree Charan Manamala <[email protected]>
AuthorDate: Fri Apr 12 11:58:33 2024 +0530

    Windowed aggregates should update the aggregation value based on final 
compute (#16244)
---
 .../semantic/DefaultFramedOnHeapAggregatable.java  | 30 ++++++++++++----------
 .../wikipediaFinalComputedAggregation.sqlTest      | 18 +++++++++++++
 2 files changed, 34 insertions(+), 14 deletions(-)

diff --git 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java
 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java
index 8b34b62f06f..106fa9674a0 100644
--- 
a/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java
+++ 
b/processing/src/main/java/org/apache/druid/query/rowsandcols/semantic/DefaultFramedOnHeapAggregatable.java
@@ -318,10 +318,11 @@ public class DefaultFramedOnHeapAggregatable implements 
FramedOnHeapAggregatable
         currentValue,
         0
     );
-    Aggregator combiningAgg = 
aggFactory.getCombiningFactory().factorize(combiningFactory);
 
-    combiningAgg.aggregate();
-    return combiningAgg.get();
+    try (Aggregator combiningAgg = 
aggFactory.getCombiningFactory().factorize(combiningFactory)) {
+      combiningAgg.aggregate();
+      return aggFactory.finalizeComputation(combiningAgg.get());
+    }
   }
 
   /**
@@ -458,7 +459,7 @@ public class DefaultFramedOnHeapAggregatable implements 
FramedOnHeapAggregatable
     if (rowIdProvider.get() < numRows) {
       for (int i = 0; i < aggs.length; i++) {
         aggs[i].aggregate();
-        results[i][resultStorageIndex] = aggs[i].get();
+        results[i][resultStorageIndex] = 
aggFactories[i].finalizeComputation(aggs[i].get());
         aggs[i].close();
         aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
       }
@@ -471,7 +472,7 @@ public class DefaultFramedOnHeapAggregatable implements 
FramedOnHeapAggregatable
     for (int rowId = rowIdProvider.get(); rowId < numRows; ++rowId) {
       for (int i = 0; i < aggs.length; i++) {
         aggs[i].aggregate();
-        results[i][resultStorageIndex] = aggs[i].get();
+        results[i][resultStorageIndex] = 
aggFactories[i].finalizeComputation(aggs[i].get());
         aggs[i].close();
 
         // Use a combining aggregator to combine the result we just got with 
the result from the previous row
@@ -489,7 +490,7 @@ public class DefaultFramedOnHeapAggregatable implements 
FramedOnHeapAggregatable
         combiningAgg.aggregate();
         combiningFactory.increment();
         combiningAgg.aggregate();
-        results[i][resultStorageIndex] = combiningAgg.get();
+        results[i][resultStorageIndex] = 
aggFactories[i].finalizeComputation(combiningAgg.get());
         combiningAgg.close();
 
         aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
@@ -545,7 +546,7 @@ public class DefaultFramedOnHeapAggregatable implements 
FramedOnHeapAggregatable
     if (rowIdProvider.get() >= 0) {
       for (int i = 0; i < aggs.length; i++) {
         aggs[i].aggregate();
-        results[i][resultStorageIndex] = aggs[i].get();
+        results[i][resultStorageIndex] = 
aggFactories[i].finalizeComputation(aggs[i].get());
         aggs[i].close();
         aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
       }
@@ -558,7 +559,7 @@ public class DefaultFramedOnHeapAggregatable implements 
FramedOnHeapAggregatable
     for (int rowId = rowIdProvider.get(); rowId >= 0; --rowId) {
       for (int i = 0; i < aggs.length; i++) {
         aggs[i].aggregate();
-        results[i][resultStorageIndex] = aggs[i].get();
+        results[i][resultStorageIndex] = 
aggFactories[i].finalizeComputation(aggs[i].get());
         aggs[i].close();
 
         // Use a combining aggregator to combine the result we just got with 
the result from the previous row
@@ -576,7 +577,7 @@ public class DefaultFramedOnHeapAggregatable implements 
FramedOnHeapAggregatable
         combiningAgg.aggregate();
         combiningFactory.decrement();
         combiningAgg.aggregate();
-        results[i][resultStorageIndex] = combiningAgg.get();
+        results[i][resultStorageIndex] = 
aggFactories[i].finalizeComputation(combiningAgg.get());
         combiningAgg.close();
 
         aggs[i] = aggFactories[i].factorize(columnSelectorFactory);
@@ -667,7 +668,7 @@ public class DefaultFramedOnHeapAggregatable implements 
FramedOnHeapAggregatable
       }
 
       for (int i = 0; i < aggFactories.length; ++i) {
-        results[i][resultStorageIndex] = aggregators[i][nextIndex].get();
+        results[i][resultStorageIndex] = 
aggFactories[i].finalizeComputation(aggregators[i][nextIndex].get());
         aggregators[i][nextIndex].close();
         aggregators[i][nextIndex] = 
aggFactories[i].factorize(columnSelectorFactory);
       }
@@ -706,7 +707,7 @@ public class DefaultFramedOnHeapAggregatable implements 
FramedOnHeapAggregatable
       }
 
       for (int i = 0; i < aggFactories.length; ++i) {
-        results[i][resultStorageIndex] = aggregators[i][nextIndex].get();
+        results[i][resultStorageIndex] = 
aggFactories[i].finalizeComputation(aggregators[i][nextIndex].get());
         aggregators[i][nextIndex].close();
         aggregators[i][nextIndex] = null;
       }
@@ -719,7 +720,8 @@ public class DefaultFramedOnHeapAggregatable implements 
FramedOnHeapAggregatable
     // End Phase 3, anything left in the window needs to be collected and put 
into our results
     for (; nextIndex < windowSize; ++nextIndex) {
       for (int i = 0; i < aggFactories.length; ++i) {
-        results[i][resultStorageIndex] = aggregators[i][nextIndex].get();
+        results[i][resultStorageIndex] = 
aggFactories[i].finalizeComputation(aggregators[i][nextIndex].get());
+        aggregators[i][nextIndex].close();
         aggregators[i][nextIndex] = null;
       }
       ++resultStorageIndex;
@@ -772,7 +774,7 @@ public class DefaultFramedOnHeapAggregatable implements 
FramedOnHeapAggregatable
 
       if (rowId >= upperOffset) {
         for (int i = 0; i < aggregators.length; ++i) {
-          results[i][resultStorageIndex] = aggregators[i][startIndex].get();
+          results[i][resultStorageIndex] = 
aggFactories[i].finalizeComputation(aggregators[i][startIndex].get());
           aggregators[i][startIndex].close();
           aggregators[i][startIndex] = null;
         }
@@ -790,7 +792,7 @@ public class DefaultFramedOnHeapAggregatable implements 
FramedOnHeapAggregatable
 
     for (; startIndex < windowSize; ++startIndex) {
       for (int i = 0; i < aggregators.length; ++i) {
-        results[i][resultStorageIndex] = aggregators[i][startIndex].get();
+        results[i][resultStorageIndex] = 
aggFactories[i].finalizeComputation(aggregators[i][startIndex].get());
         aggregators[i][startIndex].close();
         aggregators[i][startIndex] = null;
       }
diff --git 
a/sql/src/test/resources/calcite/tests/window/wikipediaFinalComputedAggregation.sqlTest
 
b/sql/src/test/resources/calcite/tests/window/wikipediaFinalComputedAggregation.sqlTest
new file mode 100644
index 00000000000..c45dcb8b688
--- /dev/null
+++ 
b/sql/src/test/resources/calcite/tests/window/wikipediaFinalComputedAggregation.sqlTest
@@ -0,0 +1,18 @@
+type: "operatorValidation"
+
+sql: |
+    SELECT
+      countryName,
+      cityName,
+      channel,
+      string_agg(channel, '|') over (partition by cityName order by 
countryName) s
+    FROM wikipedia
+    WHERE countryName='Austria'
+    GROUP BY 1, 2, 3
+
+expectedResults:
+  - ["Austria",null,"#de.wikipedia","#de.wikipedia"]
+  - ["Austria","Horsching","#de.wikipedia","#de.wikipedia"]
+  - 
["Austria","Vienna","#de.wikipedia","#de.wikipedia|#es.wikipedia|#tr.wikipedia"]
+  - 
["Austria","Vienna","#es.wikipedia","#de.wikipedia|#es.wikipedia|#tr.wikipedia"]
+  - 
["Austria","Vienna","#tr.wikipedia","#de.wikipedia|#es.wikipedia|#tr.wikipedia"]
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to