This is an automated email from the ASF dual-hosted git repository.

xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b6e9f0388cc Finalize aggregate intermediate results in 
AggregationResultsBlock.getRows() for server-return-final (#18835)
b6e9f0388cc is described below

commit b6e9f0388ccbe12f29c8e366c39fd996da44e8a2
Author: Yash Mayya <[email protected]>
AuthorDate: Tue Jun 23 00:16:35 2026 -0700

    Finalize aggregate intermediate results in 
AggregationResultsBlock.getRows() for server-return-final (#18835)
    
    A no-group-by aggregate whose data is colocated on a single server (e.g. 
partition-pruned
    to one partition on a strictReplicaGroup table) is planned by the v2 
physical optimizer as a
    single-stage AGGREGATE_DIRECT that returns final results 
(SERVER_RETURN_FINAL_RESULT). For
    aggregations whose intermediate type differs from their final type 
(DISTINCTCOUNTHLLPLUS,
    DISTINCTCOUNT, ...), the leaf crashed during serialization with e.g.
    'HyperLogLogPlus cannot be cast to Long'.
    
    AggregationResultsBlock.getDataSchema() reports the final column types when
    isServerReturnFinalResult() is true, but getRows() returned the raw 
intermediate results
    without finalizing (only getDataTable() finalized). The MSE LeafOperator 
consumes
    getRows() + getDataSchema(), so an intermediate object was left in a column 
typed as its
    final type and failed on MAILBOX_SEND serialization. getRows() now 
finalizes via
    extractFinalResult() when isServerReturnFinalResult() is true, consistent 
with
    getDataTable() and the group-by path (GroupByCombineOperator already 
finalizes the table).
    
    The default (v1) MSE planner always splits no-group-by aggregates into 
LEAF+FINAL and is
    unaffected; group-by DIRECT aggregates already finalize in the combine 
operator.
---
 .../blocks/results/AggregationResultsBlock.java    | 15 ++++-
 .../queries/DirectAggregateObjectIntermediate.json | 69 ++++++++++++++++++++++
 2 files changed, 83 insertions(+), 1 deletion(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
index c28e2a8faa2..907ea1eaf4b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
@@ -97,7 +97,20 @@ public class AggregationResultsBlock extends 
BaseResultsBlock {
 
   @Override
   public List<Object[]> getRows() {
-    return Collections.singletonList(_results.toArray());
+    if (!_queryContext.isServerReturnFinalResult()) {
+      return Collections.singletonList(_results.toArray());
+    }
+    // When the server is requested to return the final result (e.g. a 
single-server colocated DIRECT aggregate in the
+    // multi-stage engine), getDataSchema() reports the final column types. 
Finalize the intermediate results here so
+    // that the rows are consistent with the schema; otherwise an intermediate 
object (e.g. a HyperLogLogPlus) would be
+    // left in a column typed as its final type (e.g. LONG) and fail when the 
block is serialized. This mirrors the
+    // finalization done in getDataTable() and in GroupByCombineOperator for 
the group-by case.
+    int numColumns = _results.size();
+    Object[] row = new Object[numColumns];
+    for (int i = 0; i < numColumns; i++) {
+      row[i] = _aggregationFunctions[i].extractFinalResult(_results.get(i));
+    }
+    return Collections.singletonList(row);
   }
 
   @Override
diff --git 
a/pinot-query-runtime/src/test/resources/queries/DirectAggregateObjectIntermediate.json
 
b/pinot-query-runtime/src/test/resources/queries/DirectAggregateObjectIntermediate.json
new file mode 100644
index 00000000000..1094dd1e847
--- /dev/null
+++ 
b/pinot-query-runtime/src/test/resources/queries/DirectAggregateObjectIntermediate.json
@@ -0,0 +1,69 @@
+{
+  "direct_aggregate_object_intermediate": {
+    "comments": "Regression test for the HLL->Long ClassCastException. When a 
no-group-by aggregate is colocated on a single server, the v2 physical 
optimizer produces an AGGREGATE_DIRECT leaf that returns final results 
(SERVER_RETURN_FINAL_RESULT). Aggregations whose intermediate type differs from 
their final type (e.g. DISTINCTCOUNTHLLPLUS -> HyperLogLogPlus/LONG, 
DISTINCTCOUNT -> Set/INT) must be finalized before the leaf serializes its 
output. A 'replicated' table forces single-serve [...]
+    "tables": {
+      "tbl": {
+        "replicated": true,
+        "schema": [
+          {"name": "amount", "type": "INT"},
+          {"name": "user_id", "type": "STRING"}
+        ],
+        "inputs": [
+          [10, "u1"],
+          [20, "u2"],
+          [30, "u1"],
+          [40, ""]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "description": "SUM + DISTINCTCOUNTHLLPLUS FILTER (the reported 
failing shape)",
+        "sql": "SELECT SUM(amount) AS s, DISTINCTCOUNTHLLPLUS(user_id) FILTER 
(WHERE user_id <> '') AS dc FROM {tbl}",
+        "h2Sql": "SELECT SUM(amount) AS s, COUNT(DISTINCT CASE WHEN user_id <> 
'' THEN user_id END) AS dc FROM {tbl}"
+      },
+      {
+        "description": "Plain DISTINCTCOUNTHLLPLUS (no filter) still hits the 
same DIRECT path",
+        "sql": "SELECT DISTINCTCOUNTHLLPLUS(user_id) AS dc FROM {tbl}",
+        "h2Sql": "SELECT COUNT(DISTINCT user_id) AS dc FROM {tbl}"
+      },
+      {
+        "description": "DISTINCTCOUNT (Set intermediate, INT final) - same 
finalize requirement",
+        "sql": "SELECT DISTINCTCOUNT(user_id) AS dc FROM {tbl}",
+        "h2Sql": "SELECT COUNT(DISTINCT user_id) AS dc FROM {tbl}"
+      },
+      {
+        "description": "Primitive-only DIRECT aggregate (intermediate type == 
final type) - finalize loop must be a correct no-op",
+        "sql": "SELECT SUM(amount) AS s, COUNT(*) AS c FROM {tbl}",
+        "h2Sql": "SELECT SUM(amount) AS s, COUNT(*) AS c FROM {tbl}"
+      }
+    ]
+  },
+  "direct_aggregate_object_intermediate_null_handling": {
+    "comments": "Same DIRECT path with column-based null handling enabled and 
FILTERs that match zero rows, so the finalized value is NULL (SUM) / 0 
(COUNT-family). Exercises the null finalization path of getRows() in 
SERVER_RETURN_FINAL_RESULT mode.",
+    "extraProps": {
+      "enableColumnBasedNullHandling": true
+    },
+    "tables": {
+      "tbl": {
+        "replicated": true,
+        "schema": [
+          {"name": "amount", "type": "INT"},
+          {"name": "user_id", "type": "STRING"}
+        ],
+        "inputs": [
+          [10, "u1"],
+          [20, "u2"],
+          [30, "u1"]
+        ]
+      }
+    },
+    "queries": [
+      {
+        "description": "Zero-match FILTERs: SUM finalizes to NULL, 
DISTINCTCOUNTHLLPLUS/COUNT finalize to 0",
+        "sql": "SELECT SUM(amount) FILTER (WHERE amount > 1000) AS s, COUNT(*) 
FILTER (WHERE amount > 1000) AS c, DISTINCTCOUNTHLLPLUS(user_id) FILTER (WHERE 
amount > 1000) AS dc FROM {tbl}",
+        "h2Sql": "SELECT SUM(CASE WHEN amount > 1000 THEN amount END) AS s, 
COUNT(CASE WHEN amount > 1000 THEN 1 END) AS c, COUNT(DISTINCT CASE WHEN amount 
> 1000 THEN user_id END) AS dc FROM {tbl}"
+      }
+    ]
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to