This is an automated email from the ASF dual-hosted git repository.

yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5dee07228bb Add merge-only reducer API for intercepting intermediate 
results (#18621)
5dee07228bb is described below

commit 5dee07228bbbd5d3fbf3e2cb93adce34ec32995a
Author: Navina Ramesh <[email protected]>
AuthorDate: Tue Jun 2 15:40:41 2026 -0700

    Add merge-only reducer API for intercepting intermediate results (#18621)
---
 .../apache/pinot/common/datatable/DataTable.java   |   9 +-
 .../common/datatable/DataTableBuilderUtils.java    | 128 +++++
 .../blocks/results/AggregationResultsBlock.java    |  34 +-
 .../blocks/results/GroupByResultsBlock.java        |  95 +---
 .../function/AggregationFunctionUtils.java         |  39 ++
 .../query/reduce/AggregationDataTableReducer.java  | 101 +++-
 .../core/query/reduce/BrokerReduceService.java     | 220 +++++---
 .../pinot/core/query/reduce/DataTableReducer.java  |  29 ++
 .../query/reduce/DistinctDataTableReducer.java     |  33 +-
 .../query/reduce/ExecutionStatsAggregator.java     | 131 +++++
 .../core/query/reduce/GroupByDataTableReducer.java | 118 +++++
 .../query/reduce/ExecutionStatsAggregatorTest.java | 179 +++++++
 .../core/query/reduce/MergeDataTablesOnlyTest.java | 577 +++++++++++++++++++++
 13 files changed, 1501 insertions(+), 192 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
index bf3f2f86117..ddba67d92df 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
@@ -156,11 +156,16 @@ public interface DataTable {
     WORKLOAD_NAME(40, "workloadName", MetadataValueType.STRING),
     // Needed so that we can track query id in Netty channel response.
     QUERY_ID(41, "queryId", MetadataValueType.STRING),
-    EARLY_TERMINATION_REASON(42, "earlyTerminationReason", 
MetadataValueType.STRING);
+    EARLY_TERMINATION_REASON(42, "earlyTerminationReason", 
MetadataValueType.STRING),
+    // Set on a merged-only DataTable when one or more input server DataTables 
were dropped during
+    // the merge (e.g., due to a schema conflict), so the merge ran over a 
strict subset of the
+    // inputs. How a downstream consumer reacts (skip, retry, accept with 
annotation) is the
+    // consumer's policy.
+    INCOMPLETE_MERGE(43, "incompleteMerge", MetadataValueType.STRING);
 
     // We keep this constant to track the max id added so far for backward 
compatibility.
     // Increase it when adding new keys, but NEVER DECREASE IT!!!
-    private static final int MAX_ID = EARLY_TERMINATION_REASON.getId();
+    private static final int MAX_ID = INCOMPLETE_MERGE.getId();
 
     private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new 
MetadataKey[MAX_ID + 1];
     private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new 
HashMap<>();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java
new file mode 100644
index 00000000000..d111ba21428
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common.datatable;
+
+import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import java.io.IOException;
+import java.math.BigDecimal;
+import org.apache.pinot.common.utils.ArrayListUtils;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+/**
+ * Helpers for writing values into a {@link DataTableBuilder}.
+ */
+public class DataTableBuilderUtils {
+  private DataTableBuilderUtils() {
+  }
+
+  /**
+   * Writes a non-null value of the given stored column data type into the 
{@link DataTableBuilder} at
+   * the given column. Supports all scalar and array stored types. OBJECT 
columns are NOT handled here
+   * (callers serialize them via the owning aggregation function). Used by the 
group-by result
+   * serialization on both the server ({@code GroupByResultsBlock}) and the 
merge-only reduce path.
+   */
+  public static void setColumn(DataTableBuilder dataTableBuilder, 
ColumnDataType storedColumnDataType,
+      int columnIndex, Object value)
+      throws IOException {
+    switch (storedColumnDataType) {
+      case INT:
+        dataTableBuilder.setColumn(columnIndex, (int) value);
+        break;
+      case LONG:
+        dataTableBuilder.setColumn(columnIndex, (long) value);
+        break;
+      case FLOAT:
+        dataTableBuilder.setColumn(columnIndex, (float) value);
+        break;
+      case DOUBLE:
+        dataTableBuilder.setColumn(columnIndex, (double) value);
+        break;
+      case BIG_DECIMAL:
+        dataTableBuilder.setColumn(columnIndex, (BigDecimal) value);
+        break;
+      case STRING:
+        dataTableBuilder.setColumn(columnIndex, value.toString());
+        break;
+      case BYTES:
+        dataTableBuilder.setColumn(columnIndex, (ByteArray) value);
+        break;
+      case INT_ARRAY:
+        if (value instanceof IntArrayList) {
+          dataTableBuilder.setColumn(columnIndex, 
ArrayListUtils.toIntArray((IntArrayList) value));
+        } else {
+          dataTableBuilder.setColumn(columnIndex, (int[]) value);
+        }
+        break;
+      case LONG_ARRAY:
+        if (value instanceof LongArrayList) {
+          dataTableBuilder.setColumn(columnIndex, 
ArrayListUtils.toLongArray((LongArrayList) value));
+        } else {
+          dataTableBuilder.setColumn(columnIndex, (long[]) value);
+        }
+        break;
+      case FLOAT_ARRAY:
+        if (value instanceof FloatArrayList) {
+          dataTableBuilder.setColumn(columnIndex, 
ArrayListUtils.toFloatArray((FloatArrayList) value));
+        } else {
+          dataTableBuilder.setColumn(columnIndex, (float[]) value);
+        }
+        break;
+      case DOUBLE_ARRAY:
+        if (value instanceof DoubleArrayList) {
+          dataTableBuilder.setColumn(columnIndex, 
ArrayListUtils.toDoubleArray((DoubleArrayList) value));
+        } else {
+          dataTableBuilder.setColumn(columnIndex, (double[]) value);
+        }
+        break;
+      case BIG_DECIMAL_ARRAY:
+        if (value instanceof ObjectArrayList) {
+          //noinspection unchecked
+          dataTableBuilder.setColumn(columnIndex,
+              ArrayListUtils.toBigDecimalArray((ObjectArrayList<BigDecimal>) 
value));
+        } else {
+          dataTableBuilder.setColumn(columnIndex, (BigDecimal[]) value);
+        }
+        break;
+      case STRING_ARRAY:
+        if (value instanceof ObjectArrayList) {
+          //noinspection unchecked
+          dataTableBuilder.setColumn(columnIndex, 
ArrayListUtils.toStringArray((ObjectArrayList<String>) value));
+        } else {
+          dataTableBuilder.setColumn(columnIndex, (String[]) value);
+        }
+        break;
+      case BYTES_ARRAY:
+        if (value instanceof ObjectArrayList) {
+          //noinspection unchecked
+          dataTableBuilder.setColumn(columnIndex, 
ArrayListUtils.toBytesArray((ObjectArrayList<ByteArray>) value));
+        } else {
+          dataTableBuilder.setColumn(columnIndex, (ByteArray[]) value);
+        }
+        break;
+      default:
+        throw new IllegalStateException("Unsupported stored type: " + 
storedColumnDataType);
+    }
+  }
+}
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
index 8f493869e3b..c28e2a8faa2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
@@ -146,7 +146,7 @@ public class AggregationResultsBlock extends 
BaseResultsBlock {
               nullBitmaps[i].add(0);
             }
             assert result != null;
-            setIntermediateResult(dataTableBuilder, columnDataTypes, i, 
result);
+            AggregationFunctionUtils.setIntermediateResult(dataTableBuilder, 
columnDataTypes[i], i, result);
           }
         }
       }
@@ -174,7 +174,7 @@ public class AggregationResultsBlock extends 
BaseResultsBlock {
             if (columnDataTypes[i] == ColumnDataType.OBJECT) {
               dataTableBuilder.setColumn(i, 
_aggregationFunctions[i].serializeIntermediateResult(result));
             } else {
-              setIntermediateResult(dataTableBuilder, columnDataTypes, i, 
result);
+              AggregationFunctionUtils.setIntermediateResult(dataTableBuilder, 
columnDataTypes[i], i, result);
             }
           }
         }
@@ -184,36 +184,6 @@ public class AggregationResultsBlock extends 
BaseResultsBlock {
     return dataTableBuilder.build();
   }
 
-  private void setIntermediateResult(DataTableBuilder dataTableBuilder, 
ColumnDataType[] columnDataTypes, int index,
-      Object result) throws IOException {
-    ColumnDataType columnDataType = columnDataTypes[index];
-    switch (columnDataType) {
-      case INT:
-        dataTableBuilder.setColumn(index, (int) result);
-        break;
-      case LONG:
-        dataTableBuilder.setColumn(index, (long) result);
-        break;
-      case FLOAT:
-        dataTableBuilder.setColumn(index, (float) result);
-        break;
-      case DOUBLE:
-        dataTableBuilder.setColumn(index, (double) result);
-        break;
-      case BIG_DECIMAL:
-        dataTableBuilder.setColumn(index, (BigDecimal) result);
-        break;
-      case STRING:
-        dataTableBuilder.setColumn(index, result.toString());
-        break;
-      case BYTES:
-        dataTableBuilder.setColumn(index, (ByteArray) result);
-        break;
-      default:
-        throw new IllegalStateException("Illegal column data type in 
intermediate result: " + columnDataType);
-    }
-  }
-
   private void setFinalResult(DataTableBuilder dataTableBuilder, 
ColumnDataType[] columnDataTypes, int index,
       Object result)
       throws IOException {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
index ea1e9bab31f..5baac68f099 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
@@ -18,13 +18,7 @@
  */
 package org.apache.pinot.core.operator.blocks.results;
 
-import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
-import it.unimi.dsi.fastutil.floats.FloatArrayList;
-import it.unimi.dsi.fastutil.ints.IntArrayList;
-import it.unimi.dsi.fastutil.longs.LongArrayList;
-import it.unimi.dsi.fastutil.objects.ObjectArrayList;
 import java.io.IOException;
-import java.math.BigDecimal;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
@@ -33,11 +27,11 @@ import java.util.Map;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.common.utils.ArrayListUtils;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
 import org.apache.pinot.core.data.table.IntermediateRecord;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.data.table.Table;
@@ -45,7 +39,6 @@ import 
org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.spi.query.QueryThreadContext;
-import org.apache.pinot.spi.utils.ByteArray;
 import org.roaringbitmap.RoaringBitmap;
 
 
@@ -243,7 +236,7 @@ public class GroupByResultsBlock extends BaseResultsBlock {
               nullBitmaps[i].add(rowId);
             }
             assert value != null;
-            setDataTableColumn(storedColumnDataTypes[i], dataTableBuilder, i, 
value);
+            DataTableBuilderUtils.setColumn(dataTableBuilder, 
storedColumnDataTypes[i], i, value);
           }
         }
         dataTableBuilder.finishRow();
@@ -265,7 +258,7 @@ public class GroupByResultsBlock extends BaseResultsBlock {
           } else if (storedColumnDataTypes[i] == ColumnDataType.OBJECT) {
             dataTableBuilder.setColumn(i, aggregationFunctions[i - 
numKeyColumns].serializeIntermediateResult(value));
           } else {
-            setDataTableColumn(storedColumnDataTypes[i], dataTableBuilder, i, 
value);
+            DataTableBuilderUtils.setColumn(dataTableBuilder, 
storedColumnDataTypes[i], i, value);
           }
         }
         dataTableBuilder.finishRow();
@@ -275,88 +268,6 @@ public class GroupByResultsBlock extends BaseResultsBlock {
     return dataTableBuilder.build();
   }
 
-  private void setDataTableColumn(ColumnDataType storedColumnDataType, 
DataTableBuilder dataTableBuilder,
-      int columnIndex, Object value)
-      throws IOException {
-    switch (storedColumnDataType) {
-      case INT:
-        dataTableBuilder.setColumn(columnIndex, (int) value);
-        break;
-      case LONG:
-        dataTableBuilder.setColumn(columnIndex, (long) value);
-        break;
-      case FLOAT:
-        dataTableBuilder.setColumn(columnIndex, (float) value);
-        break;
-      case DOUBLE:
-        dataTableBuilder.setColumn(columnIndex, (double) value);
-        break;
-      case BIG_DECIMAL:
-        dataTableBuilder.setColumn(columnIndex, (BigDecimal) value);
-        break;
-      case STRING:
-        dataTableBuilder.setColumn(columnIndex, value.toString());
-        break;
-      case BYTES:
-        dataTableBuilder.setColumn(columnIndex, (ByteArray) value);
-        break;
-      case INT_ARRAY:
-        if (value instanceof IntArrayList) {
-          dataTableBuilder.setColumn(columnIndex, 
ArrayListUtils.toIntArray((IntArrayList) value));
-        } else {
-          dataTableBuilder.setColumn(columnIndex, (int[]) value);
-        }
-        break;
-      case LONG_ARRAY:
-        if (value instanceof LongArrayList) {
-          dataTableBuilder.setColumn(columnIndex, 
ArrayListUtils.toLongArray((LongArrayList) value));
-        } else {
-          dataTableBuilder.setColumn(columnIndex, (long[]) value);
-        }
-        break;
-      case FLOAT_ARRAY:
-        if (value instanceof FloatArrayList) {
-          dataTableBuilder.setColumn(columnIndex, 
ArrayListUtils.toFloatArray((FloatArrayList) value));
-        } else {
-          dataTableBuilder.setColumn(columnIndex, (float[]) value);
-        }
-        break;
-      case DOUBLE_ARRAY:
-        if (value instanceof DoubleArrayList) {
-          dataTableBuilder.setColumn(columnIndex, 
ArrayListUtils.toDoubleArray((DoubleArrayList) value));
-        } else {
-          dataTableBuilder.setColumn(columnIndex, (double[]) value);
-        }
-        break;
-      case BIG_DECIMAL_ARRAY:
-        if (value instanceof ObjectArrayList) {
-          //noinspection unchecked
-          dataTableBuilder.setColumn(columnIndex,
-              ArrayListUtils.toBigDecimalArray((ObjectArrayList<BigDecimal>) 
value));
-        } else {
-          dataTableBuilder.setColumn(columnIndex, (BigDecimal[]) value);
-        }
-        break;
-      case STRING_ARRAY:
-        if (value instanceof ObjectArrayList) {
-          //noinspection unchecked
-          dataTableBuilder.setColumn(columnIndex, 
ArrayListUtils.toStringArray((ObjectArrayList<String>) value));
-        } else {
-          dataTableBuilder.setColumn(columnIndex, (String[]) value);
-        }
-        break;
-      case BYTES_ARRAY:
-        if (value instanceof ObjectArrayList) {
-          //noinspection unchecked
-          dataTableBuilder.setColumn(columnIndex, 
ArrayListUtils.toBytesArray((ObjectArrayList<ByteArray>) value));
-        } else {
-          dataTableBuilder.setColumn(columnIndex, (ByteArray[]) value);
-        }
-        break;
-      default:
-        throw new IllegalStateException("Unsupported stored type: " + 
storedColumnDataType);
-    }
-  }
 
   @Override
   public Map<String, String> getResultsMetadata() {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index 010510d485e..a9e6f439320 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -23,6 +23,8 @@ import it.unimi.dsi.fastutil.floats.FloatArrayList;
 import it.unimi.dsi.fastutil.ints.IntArrayList;
 import it.unimi.dsi.fastutil.longs.LongArrayList;
 import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import java.io.IOException;
+import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -41,6 +43,7 @@ import 
org.apache.pinot.common.request.context.predicate.Predicate;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.config.QueryOptionsUtils;
 import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.operator.BaseProjectOperator;
 import org.apache.pinot.core.operator.blocks.ValueBlock;
 import org.apache.pinot.core.operator.filter.BaseFilterOperator;
@@ -54,6 +57,7 @@ import org.apache.pinot.core.startree.StarTreeUtils;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.segment.spi.SegmentContext;
 import 
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
+import org.apache.pinot.spi.utils.ByteArray;
 
 
 /**
@@ -168,6 +172,41 @@ public class AggregationFunctionUtils {
     }
   }
 
+  /**
+   * Writes a non-OBJECT intermediate result into the {@link DataTableBuilder} 
at the given column.
+   * Counterpart of {@link #getIntermediateResult}. OBJECT columns are handled 
by the caller via
+   * {@link AggregationFunction#serializeIntermediateResult}, since they need 
the aggregation function.
+   */
+  public static void setIntermediateResult(DataTableBuilder dataTableBuilder, 
ColumnDataType columnDataType, int colId,
+      Object result)
+      throws IOException {
+    switch (columnDataType) {
+      case INT:
+        dataTableBuilder.setColumn(colId, (int) result);
+        break;
+      case LONG:
+        dataTableBuilder.setColumn(colId, (long) result);
+        break;
+      case FLOAT:
+        dataTableBuilder.setColumn(colId, (float) result);
+        break;
+      case DOUBLE:
+        dataTableBuilder.setColumn(colId, (double) result);
+        break;
+      case BIG_DECIMAL:
+        dataTableBuilder.setColumn(colId, (BigDecimal) result);
+        break;
+      case STRING:
+        dataTableBuilder.setColumn(colId, result.toString());
+        break;
+      case BYTES:
+        dataTableBuilder.setColumn(colId, (ByteArray) result);
+        break;
+      default:
+        throw new IllegalStateException("Illegal column data type in 
intermediate result: " + columnDataType);
+    }
+  }
+
   /**
    * Reads the final result from the {@link DataTable}.
    */
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
index 8f105d6af71..b8e6f249ce6 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.query.reduce;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -28,6 +29,8 @@ import 
org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import 
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.request.context.QueryContext;
@@ -83,6 +86,22 @@ public class AggregationDataTableReducer implements 
DataTableReducer {
   private void reduceWithIntermediateResult(DataSchema dataSchema, 
Collection<DataTable> dataTables,
       BrokerResponseNative brokerResponseNative) {
     int numAggregationFunctions = _aggregationFunctions.length;
+    Object[] intermediateResults = mergeIntermediateResults(dataSchema, 
dataTables);
+    Object[] finalResults = new Object[numAggregationFunctions];
+    for (int i = 0; i < numAggregationFunctions; i++) {
+      AggregationFunction aggregationFunction = _aggregationFunctions[i];
+      Comparable result = 
aggregationFunction.extractFinalResult(intermediateResults[i]);
+      finalResults[i] = result != null ? 
aggregationFunction.getFinalResultColumnType().convert(result) : null;
+    }
+    
brokerResponseNative.setResultTable(reduceToResultTable(getPrePostAggregationDataSchema(dataSchema),
 finalResults));
+  }
+
+  /**
+   * Merges the per-server intermediate aggregation results into a single 
{@code Object[]} of merged
+   * intermediate results (one per aggregation function), WITHOUT finalizing.
+   */
+  private Object[] mergeIntermediateResults(DataSchema dataSchema, 
Collection<DataTable> dataTables) {
+    int numAggregationFunctions = _aggregationFunctions.length;
     Object[] intermediateResults = new Object[numAggregationFunctions];
     for (DataTable dataTable : dataTables) {
       
QueryThreadContext.checkTerminationAndSampleUsage("AggregationDataTableReducer");
@@ -110,13 +129,83 @@ public class AggregationDataTableReducer implements 
DataTableReducer {
         }
       }
     }
-    Object[] finalResults = new Object[numAggregationFunctions];
-    for (int i = 0; i < numAggregationFunctions; i++) {
-      AggregationFunction aggregationFunction = _aggregationFunctions[i];
-      Comparable result = 
aggregationFunction.extractFinalResult(intermediateResults[i]);
-      finalResults[i] = result != null ? 
aggregationFunction.getFinalResultColumnType().convert(result) : null;
+    return intermediateResults;
+  }
+
+  @Override
+  public DataTable mergeDataTablesOnly(String tableName, DataSchema dataSchema,
+      Map<ServerRoutingInstance, DataTable> dataTableMap, 
DataTableReducerContext reducerContext,
+      BrokerMetrics brokerMetrics) {
+    // cannot support finalized value types returned by the servers as an 
intermediate result type
+    if (_queryContext.isServerReturnFinalResult()) {
+      throw new UnsupportedOperationException(
+          "Datatable merge to intermediate results cannot be supported when 
servers return final result");
     }
-    
brokerResponseNative.setResultTable(reduceToResultTable(getPrePostAggregationDataSchema(dataSchema),
 finalResults));
+    dataSchema = 
ReducerDataSchemaUtils.canonicalizeDataSchemaForAggregation(_queryContext, 
dataSchema);
+    try {
+      if (dataTableMap.isEmpty()) {
+        return DataTableBuilderFactory.getDataTableBuilder(dataSchema).build();
+      }
+      Object[] intermediateResults = mergeIntermediateResults(dataSchema, 
dataTableMap.values());
+      return buildIntermediateDataTable(dataSchema, intermediateResults);
+    } catch (IOException e) {
+      throw new RuntimeException("Caught IOException while building merged 
intermediate DataTable for aggregation", e);
+    }
+  }
+
+  /**
+   * Serializes the merged intermediate results into a single-row intermediate 
{@link DataTable},
+   * mirroring the non-final branch of {@code 
AggregationResultsBlock#getDataTable()} so the output is
+   * byte-shape identical to a single server's intermediate response. Never 
finalizes.
+   */
+  private DataTable buildIntermediateDataTable(DataSchema dataSchema, Object[] 
intermediateResults)
+      throws IOException {
+    ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+    int numColumns = columnDataTypes.length;
+    DataTableBuilder dataTableBuilder = 
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
+    if (_queryContext.isNullHandlingEnabled()) {
+      RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
+      for (int i = 0; i < numColumns; i++) {
+        nullBitmaps[i] = new RoaringBitmap();
+      }
+      dataTableBuilder.startRow();
+      for (int i = 0; i < numColumns; i++) {
+        Object result = intermediateResults[i];
+        if (columnDataTypes[i] == ColumnDataType.OBJECT) {
+          if (result == null) {
+            dataTableBuilder.setNull(i);
+          } else {
+            dataTableBuilder.setColumn(i, 
_aggregationFunctions[i].serializeIntermediateResult(result));
+          }
+        } else {
+          if (result == null) {
+            result = columnDataTypes[i].getNullPlaceholder();
+            nullBitmaps[i].add(0);
+          }
+          AggregationFunctionUtils.setIntermediateResult(dataTableBuilder, 
columnDataTypes[i], i, result);
+        }
+      }
+      dataTableBuilder.finishRow();
+      for (RoaringBitmap nullBitmap : nullBitmaps) {
+        dataTableBuilder.setNullRowIds(nullBitmap);
+      }
+    } else {
+      dataTableBuilder.startRow();
+      for (int i = 0; i < numColumns; i++) {
+        Object result = intermediateResults[i];
+        if (result == null) {
+          dataTableBuilder.setNull(i);
+        } else {
+          if (columnDataTypes[i] == ColumnDataType.OBJECT) {
+            dataTableBuilder.setColumn(i, 
_aggregationFunctions[i].serializeIntermediateResult(result));
+          } else {
+            AggregationFunctionUtils.setIntermediateResult(dataTableBuilder, 
columnDataTypes[i], i, result);
+          }
+        }
+      }
+      dataTableBuilder.finishRow();
+    }
+    return dataTableBuilder.build();
   }
 
   private void processSingleFinalResult(DataSchema dataSchema, DataTable 
dataTable,
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index 07dd8300ee0..f4fd325b483 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.metrics.BrokerMeter;
@@ -75,46 +76,11 @@ public class BrokerReduceService extends BaseReduceService {
     ExecutionStatsAggregator aggregator = new 
ExecutionStatsAggregator(enableTrace);
     BrokerResponseNative brokerResponseNative = new BrokerResponseNative();
 
-    // Cache a data schema from data tables (try to cache one with data rows 
associated with it).
-    DataSchema dataSchemaFromEmptyDataTable = null;
-    DataSchema dataSchemaFromNonEmptyDataTable = null;
+    // Process server response metadata, drop 
empty/null-schema/conflicting-schema data tables, and pick
+    // a data schema (preferring one backed by data rows).
     List<ServerRoutingInstance> serversWithConflictingDataSchema = new 
ArrayList<>();
-
-    // Process server response metadata.
-    Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator = 
dataTableMap.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Map.Entry<ServerRoutingInstance, DataTable> entry = iterator.next();
-      DataTable dataTable = entry.getValue();
-
-      // aggregate metrics
-      aggregator.aggregate(entry.getKey(), dataTable);
-
-      // After processing the metadata, remove data tables without data rows 
inside.
-      DataSchema dataSchema = dataTable.getDataSchema();
-      if (dataSchema == null) {
-        iterator.remove();
-      } else {
-        // Try to cache a data table with data rows inside, or cache one with 
data schema inside.
-        if (dataTable.getNumberOfRows() == 0) {
-          if (dataSchemaFromEmptyDataTable == null) {
-            dataSchemaFromEmptyDataTable = dataSchema;
-          }
-          iterator.remove();
-        } else {
-          if (dataSchemaFromNonEmptyDataTable == null) {
-            dataSchemaFromNonEmptyDataTable = dataSchema;
-          } else {
-            // Remove data tables with conflicting data schema.
-            // NOTE: Only compare the column data types, since the column 
names (string representation of expression)
-            //       can change across different versions.
-            if (!Arrays.equals(dataSchema.getColumnDataTypes(), 
dataSchemaFromNonEmptyDataTable.getColumnDataTypes())) {
-              serversWithConflictingDataSchema.add(entry.getKey());
-              iterator.remove();
-            }
-          }
-        }
-      }
-    }
+    DataSchema cachedDataSchema =
+        filterDataTablesAndPickSchema(dataTableMap, aggregator, 
serversWithConflictingDataSchema);
 
     String tableName = serverBrokerRequest.getQuerySource().getTableName();
     String rawTableName = TableNameBuilder.extractRawTableName(tableName);
@@ -142,34 +108,17 @@ public class BrokerReduceService extends 
BaseReduceService {
 
     // NOTE: When there is no cached data schema, that means all servers 
encountered exception. In such case, return the
     //       response with metadata only.
-    DataSchema cachedDataSchema =
-        dataSchemaFromNonEmptyDataTable != null ? 
dataSchemaFromNonEmptyDataTable : dataSchemaFromEmptyDataTable;
     if (cachedDataSchema == null) {
       return brokerResponseNative;
     }
 
     QueryContext serverQueryContext = 
QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery());
     DataTableReducer dataTableReducer = 
ResultReducerFactory.getResultReducer(serverQueryContext);
-
-    Integer minGroupTrimSizeQueryOption = null;
-    Integer groupTrimThresholdQueryOption = null;
-    Integer minInitialIndexedTableCapacityQueryOption = null;
-    if (queryOptions != null) {
-      minGroupTrimSizeQueryOption = 
QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions);
-      groupTrimThresholdQueryOption = 
QueryOptionsUtils.getGroupTrimThreshold(queryOptions);
-      minInitialIndexedTableCapacityQueryOption = 
QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions);
-    }
-    int minGroupTrimSize = minGroupTrimSizeQueryOption != null ? 
minGroupTrimSizeQueryOption : _minGroupTrimSize;
-    int groupTrimThreshold =
-        groupTrimThresholdQueryOption != null ? groupTrimThresholdQueryOption 
: _groupByTrimThreshold;
-    int minInitialIndexedTableCapacity =
-        minInitialIndexedTableCapacityQueryOption != null ? 
minInitialIndexedTableCapacityQueryOption
-            : _minInitialIndexedTableCapacity;
+    DataTableReducerContext reducerContext = 
createReducerContext(queryOptions, reduceTimeOutMs);
 
     try {
       dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema, 
dataTableMap, brokerResponseNative,
-          new DataTableReducerContext(_reduceExecutorService, 
_maxReduceThreadsPerQuery, reduceTimeOutMs,
-              groupTrimThreshold, minGroupTrimSize, 
minInitialIndexedTableCapacity), brokerMetrics);
+          reducerContext, brokerMetrics);
     } catch (RuntimeException e) {
       // First check terminate exception and use it as the results block if 
exists. We want to return the termination
       // reason when query is explicitly terminated.
@@ -227,6 +176,161 @@ public class BrokerReduceService extends 
BaseReduceService {
         
serverBrokerRequest.getPinotQuery().getQueryOptions().get(QueryOptionKey.MATERIALIZED_VIEW_REWRITE));
   }
 
+  /**
+   * Merge-only counterpart of {@link #reduceOnDataTable}: merges the 
per-server DataTables into a single
+   * intermediate {@link DataTable} WITHOUT finalizing (no {@code 
extractFinalResult}). Returns
+   * {@code null} when there is nothing to merge (empty map, or all servers 
returned no data / errored).
+   * Reuses the same schema-filtering preamble and reducer/trim resolution as 
the regular reduce.
+   *
+   * <p>The returned DataTable carries intermediate, non-finalized state 
(byte-shape identical to a
+   * single server's partial response).
+   *
+   * <p>If one or more input server DataTables are dropped during merge (e.g., 
due to a schema
+   * conflict with the first non-empty table), the returned DataTable's 
metadata carries the
+   * {@link DataTable.MetadataKey#INCOMPLETE_MERGE} flag set to {@code "true"} 
and the
+   * {@link BrokerMeter#RESPONSE_MERGE_EXCEPTIONS} meter is incremented. The 
callers can
+   * read the flag and decide policy (skip, retry, accept).
+   *
+   * <p>Execution stats from the input DataTables are aggregated via {@link 
ExecutionStatsAggregator}
+   * and written back onto the merged DataTable: additive longs (e.g. {@code 
numDocsScanned},
+   * {@code numSegments*}, {@code threadCpuTimeNs}), {@code 
minConsumingFreshnessTimeMs} (MIN-reduced),
+   * boolean flags ({@code groupsTrimmed}, {@code numGroupsLimitReached}, 
etc., OR-reduced),
+   * per-server exceptions, and trace info (JSON-encoded if {@code 
trace=true}). This method does NOT bump
+   * broker meters/timers for the input stats.
+   *
+   * <p>Limitations on stats:
+   * <ul>
+   *   <li>Exception attribution to original servers is lost; the wire format 
is {@code Map<Integer,
+   *       String>} so collisions on the same error code are resolved 
last-write-wins.
+   *   <li>Per-server trace info is JSON-encoded into a single {@code 
TRACE_INFO} entry
+   * </ul>
+   *
+   * <p>WARNING: this performs a full cross-server merge and re-serializes the 
result — heavyweight work
+   * that must be run asynchronously, decoupled from request serving. Intended 
for callers that want to
+   * intercept the merged intermediate result instead of the finalized one.
+   *
+   * <p>[org.apache.pinot.spi.query.QueryThreadContext] must already be set up 
before calling this method.
+   */
+  @Nullable
+  public DataTable mergeOnDataTable(BrokerRequest serverBrokerRequest,
+      Map<ServerRoutingInstance, DataTable> dataTableMap, long 
reduceTimeOutMs, BrokerMetrics brokerMetrics) {
+    if (dataTableMap.isEmpty()) {
+      return null;
+    }
+    Map<String, String> queryOptions = 
serverBrokerRequest.getPinotQuery().getQueryOptions();
+    boolean enableTrace =
+        queryOptions != null && 
Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));
+    // Aggregate stats while filtering so we can write them back onto the 
merged DataTable's metadata.
+    ExecutionStatsAggregator aggregator = new 
ExecutionStatsAggregator(enableTrace);
+    List<ServerRoutingInstance> conflictingServers = new ArrayList<>();
+    DataSchema cachedDataSchema = filterDataTablesAndPickSchema(dataTableMap, 
aggregator, conflictingServers);
+    if (cachedDataSchema == null) {
+      // All servers returned no data or encountered exceptions; nothing to 
merge.
+      return null;
+    }
+
+    String rawTableName = 
TableNameBuilder.extractRawTableName(serverBrokerRequest.getQuerySource().getTableName());
+    QueryContext serverQueryContext = 
QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery());
+    DataTableReducer dataTableReducer = 
ResultReducerFactory.getResultReducer(serverQueryContext);
+    DataTableReducerContext reducerContext =
+        
createReducerContext(serverBrokerRequest.getPinotQuery().getQueryOptions(), 
reduceTimeOutMs);
+    DataTable merged = dataTableReducer.mergeDataTablesOnly(rawTableName, 
cachedDataSchema, dataTableMap,
+        reducerContext, brokerMetrics);
+
+    if (merged != null) {
+      // Write accumulated stats (additive longs, booleans, MIN freshness, 
exceptions, trace) onto the
+      // merged DataTable
+      aggregator.setStatsOnMergedDataTable(merged);
+
+      if (!conflictingServers.isEmpty()) {
+        LOGGER.warn("Merge-only reduce dropped {} server response(s) for table 
{} due to data schema "
+            + "inconsistency: {}", conflictingServers.size(), rawTableName, 
conflictingServers);
+        brokerMetrics.addMeteredTableValue(rawTableName, 
BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1);
+        
merged.getMetadata().put(DataTable.MetadataKey.INCOMPLETE_MERGE.getName(), 
"true");
+      }
+    }
+    return merged;
+  }
+
+  /**
+   * Processes per-server response metadata and filters {@code dataTableMap} 
in place:
+   *  - drops tables with a null schema
+   *  - drops empty tables (remembering their schema as a fallback)
+   *  - drops tables whose column data types conflict with the first non-empty 
table
+   *  (collected into {@code conflictingServers}).
+   *
+   * When an {@code aggregator} is provided, per-table execution stats are 
aggregated before a table is
+   * dropped.
+   * Returns the remembered data schema (non-empty preferred, else empty-table 
schema, else
+   * {@code null}).
+   */
+  private static DataSchema 
filterDataTablesAndPickSchema(Map<ServerRoutingInstance, DataTable> 
dataTableMap,
+      @Nullable ExecutionStatsAggregator aggregator, 
List<ServerRoutingInstance> conflictingServers) {
+    DataSchema dataSchemaFromEmptyDataTable = null;
+    DataSchema dataSchemaFromNonEmptyDataTable = null;
+    Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator = 
dataTableMap.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<ServerRoutingInstance, DataTable> entry = iterator.next();
+      DataTable dataTable = entry.getValue();
+
+      // aggregate metrics
+      if (aggregator != null) {
+        aggregator.aggregate(entry.getKey(), dataTable);
+      }
+
+      // After processing the metadata, remove data tables without data rows 
inside.
+      DataSchema dataSchema = dataTable.getDataSchema();
+      if (dataSchema == null) {
+        iterator.remove();
+      } else {
+        // Try to cache a data table with data rows inside, or cache one with 
data schema inside.
+        if (dataTable.getNumberOfRows() == 0) {
+          if (dataSchemaFromEmptyDataTable == null) {
+            dataSchemaFromEmptyDataTable = dataSchema;
+          }
+          iterator.remove();
+        } else {
+          if (dataSchemaFromNonEmptyDataTable == null) {
+            dataSchemaFromNonEmptyDataTable = dataSchema;
+          } else {
+            // Remove data tables with conflicting data schema.
+            // NOTE: Only compare the column data types, since the column 
names (string representation of expression)
+            //       can change across different versions.
+            if (!Arrays.equals(dataSchema.getColumnDataTypes(), 
dataSchemaFromNonEmptyDataTable.getColumnDataTypes())) {
+              conflictingServers.add(entry.getKey());
+              iterator.remove();
+            }
+          }
+        }
+      }
+    }
+    return dataSchemaFromNonEmptyDataTable != null ? 
dataSchemaFromNonEmptyDataTable : dataSchemaFromEmptyDataTable;
+  }
+
+  /**
+   * Resolves the group-by trim parameters (query option overrides, else 
broker defaults) and builds the
+   * {@link DataTableReducerContext}
+   */
+  private DataTableReducerContext createReducerContext(@Nullable Map<String, 
String> queryOptions,
+      long reduceTimeOutMs) {
+    Integer minGroupTrimSizeQueryOption = null;
+    Integer groupTrimThresholdQueryOption = null;
+    Integer minInitialIndexedTableCapacityQueryOption = null;
+    if (queryOptions != null) {
+      minGroupTrimSizeQueryOption = 
QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions);
+      groupTrimThresholdQueryOption = 
QueryOptionsUtils.getGroupTrimThreshold(queryOptions);
+      minInitialIndexedTableCapacityQueryOption = 
QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions);
+    }
+    int minGroupTrimSize = minGroupTrimSizeQueryOption != null ? 
minGroupTrimSizeQueryOption : _minGroupTrimSize;
+    int groupTrimThreshold =
+        groupTrimThresholdQueryOption != null ? groupTrimThresholdQueryOption 
: _groupByTrimThreshold;
+    int minInitialIndexedTableCapacity =
+        minInitialIndexedTableCapacityQueryOption != null ? 
minInitialIndexedTableCapacityQueryOption
+            : _minInitialIndexedTableCapacity;
+    return new DataTableReducerContext(_reduceExecutorService, 
_maxReduceThreadsPerQuery, reduceTimeOutMs,
+        groupTrimThreshold, minGroupTrimSize, minInitialIndexedTableCapacity);
+  }
+
   public void shutDown() {
     _reduceExecutorService.shutdownNow();
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java
index 496df03da85..597b048958f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java
@@ -42,4 +42,33 @@ public interface DataTableReducer {
    */
   void reduceAndSetResults(String tableName, DataSchema dataSchema, 
Map<ServerRoutingInstance, DataTable> dataTableMap,
       BrokerResponseNative brokerResponseNative, DataTableReducerContext 
reducerContext, BrokerMetrics brokerMetrics);
+
+  /**
+   * Merges per-server data tables into a single <em>intermediate</em> {@link 
DataTable} WITHOUT
+   * finalizing (no {@code extractFinalResult} / result formatting). The 
returned DataTable carries
+   * intermediate state byte-shape identical to a single server's partial 
response, so a consumer can
+   * intercept the merged intermediate results and custom handle it. It is 
expected that the merged
+   * intermediate result can be reinjected in the normal reduce path.
+   *
+   * <p><b>WARNING:</b> this performs a full cross-server merge and 
re-serializes the result —
+   * heavyweight work that must be run asynchronously, decoupled from request 
serving. Invoking it
+   * inline while a query is being served adds that cost to the query and can 
severely degrade its
+   * latency.
+   *
+   * <p>Reducers that cannot produce a re-mergeable intermediate (e.g. 
explain-plan) leave this default
+   * implementation, which throws {@link UnsupportedOperationException}.
+   *
+   * @param tableName table name
+   * @param dataSchema schema from broker reduce service
+   * @param dataTableMap map of servers to data tables
+   * @param reducerContext DataTableReducer context
+   * @param brokerMetrics broker metrics
+   * @return the merged intermediate DataTable (intermediate, non-finalized 
state)
+   */
+  default DataTable mergeDataTablesOnly(String tableName, DataSchema 
dataSchema,
+      Map<ServerRoutingInstance, DataTable> dataTableMap, 
DataTableReducerContext reducerContext,
+      BrokerMetrics brokerMetrics) {
+    throw new UnsupportedOperationException(
+        getClass().getSimpleName() + " does not support merge-only reduction");
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
index dab95c23227..c3a1c7d788c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.core.query.reduce;
 
+import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import org.apache.pinot.common.datatable.DataTable;
@@ -27,6 +29,7 @@ import 
org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.query.distinct.table.BigDecimalDistinctTable;
 import org.apache.pinot.core.query.distinct.table.BytesDistinctTable;
 import org.apache.pinot.core.query.distinct.table.DistinctTable;
@@ -61,8 +64,34 @@ public class DistinctDataTableReducer implements 
DataTableReducer {
       brokerResponseNative.setResultTable(new ResultTable(dataSchema, 
List.of()));
       return;
     }
+    DistinctTable distinctTable = mergeToDistinctTable(dataSchema, 
dataTableMap.values());
+    brokerResponseNative.setResultTable(distinctTable.toResultTable());
+  }
+
+  @Override
+  public DataTable mergeDataTablesOnly(String tableName, DataSchema dataSchema,
+      Map<ServerRoutingInstance, DataTable> dataTableMap, 
DataTableReducerContext reducerContext,
+      BrokerMetrics brokerMetrics) {
+    dataSchema = 
ReducerDataSchemaUtils.canonicalizeDataSchemaForDistinct(_queryContext, 
dataSchema);
+    try {
+      if (dataTableMap.isEmpty() || _queryContext.getLimit() == 0) {
+        return DataTableBuilderFactory.getDataTableBuilder(dataSchema).build();
+      }
+      DistinctTable distinctTable = mergeToDistinctTable(dataSchema, 
dataTableMap.values());
+      // Intermediate form: limit/sorting not applied
+      return distinctTable.toDataTable();
+    } catch (IOException e) {
+      throw new RuntimeException("Caught IOException while building merged 
intermediate DataTable for distinct", e);
+    }
+  }
+
+  /**
+   * Merges the per-server DataTables into a single {@link DistinctTable} 
(early-stopping once the
+   * distinct set is satisfied). Shared by the normal reduce path and the 
merge-only path.
+   */
+  private DistinctTable mergeToDistinctTable(DataSchema dataSchema, 
Collection<DataTable> dataTables) {
     DistinctTable distinctTable = null;
-    for (DataTable dataTable : dataTableMap.values()) {
+    for (DataTable dataTable : dataTables) {
       
QueryThreadContext.checkTerminationAndSampleUsage("DistinctDataTableReducer");
       if (distinctTable == null) {
         distinctTable = createDistinctTable(dataSchema, dataTable);
@@ -75,7 +104,7 @@ public class DistinctDataTableReducer implements 
DataTableReducer {
         }
       }
     }
-    brokerResponseNative.setResultTable(distinctTable.toResultTable());
+    return distinctTable;
   }
 
   private DistinctTable createDistinctTable(DataSchema dataSchema, DataTable 
dataTable) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
index 3ca72dd04e6..d76596987ae 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.query.reduce;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -33,6 +34,7 @@ import 
org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.JsonUtils;
 
 
 public class ExecutionStatsAggregator {
@@ -346,4 +348,133 @@ public class ExecutionStatsAggregator {
       consumer.accept(Long.parseLong(strValue));
     }
   }
+
+  /**
+   * Writes the accumulated execution stats onto the given DataTable's 
metadata (and exception map),
+   * so a merged-only DataTable can be re-injected into the regular reduce 
path with the same
+   * downstream totals as a direct reduce of the original inputs would have 
produced.
+   *
+   * <p>Unlike {@link #setStats(String, BrokerResponseNative, BrokerMetrics)}, 
this method does NOT
+   * bump broker meters or timers. The merge-only path is expected to run off 
the request-serving
+   * path; meter increments fire when the result is eventually re-reduced.
+   *
+   * <p>Limitations of the round-trip via DataTable metadata:
+   * <ul>
+   *   <li>CPU and memory stats round-trip as a single combined value per key
+   *       ({@link DataTable.MetadataKey#THREAD_CPU_TIME_NS}, etc.) because 
the wire format has no
+   *       per-tableType keys. In the standard reduce path the aggregator 
attributes each server's
+   *       value to offline vs realtime based on {@code 
routingInstance.getTableType()} and surfaces
+   *       them as separate fields on {@link BrokerResponseNative}; on a 
re-reduce of the merged
+   *       DataTable the whole combined value lands in one bucket — whichever 
tableType the caller
+   *       assigned to the synthetic server response. So the per-tableType 
split visible on
+   *       BrokerResponse is lost across the round-trip, even though the total 
is preserved.
+   *   <li>Per-server exceptions are written via {@link 
DataTable#addException(int, String)} which
+   *       backs a {@code Map<Integer, String>} keyed by error code; if two 
inputs reported the
+   *       same error code the merged DataTable carries last-write-wins for 
the message.
+   *   <li>Per-server trace info is JSON-encoded into a single
+   *       {@link DataTable.MetadataKey#TRACE_INFO} entry; the downstream 
aggregator reads it back
+   *       as one trace blob attributed to the synthetic server.
+   *   <li>DISTINCT early-termination reasons round-trip as a single enum name
+   *       ({@link DataTable.MetadataKey#EARLY_TERMINATION_REASON}) because 
the wire format is one
+   *       string per DataTable. The aggregator OR-reduces multiple per-server 
reasons into three
+   *       independent booleans; on re-injection we encode only the first set 
flag in declaration
+   *       order. The user-visible "DISTINCT is partial" signal is preserved 
(each of the three
+   *       flags independently sets partial-ness on {@link 
BrokerResponseNative}); the exact reason
+   *       granularity is best-effort when multiple flags are true.
+   * </ul>
+   */
+  public void setStatsOnMergedDataTable(DataTable dataTable) {
+    Map<String, String> metadata = dataTable.getMetadata();
+
+    // Additive long stats: mirror setStats()'s pattern of unconditional 
writes. Accumulators are
+    // initialized to 0, so a 0 here is indistinguishable to downstream from 
"absent" — the
+    // downstream aggregator's Long.parseLong("0") + null-check both produce 0.
+    putLong(metadata, DataTable.MetadataKey.NUM_DOCS_SCANNED, _numDocsScanned);
+    putLong(metadata, DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER, 
_numEntriesScannedInFilter);
+    putLong(metadata, DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER, 
_numEntriesScannedPostFilter);
+    putLong(metadata, DataTable.MetadataKey.NUM_SEGMENTS_QUERIED, 
_numSegmentsQueried);
+    putLong(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED, 
_numSegmentsProcessed);
+    putLong(metadata, DataTable.MetadataKey.NUM_SEGMENTS_MATCHED, 
_numSegmentsMatched);
+    putLong(metadata, DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED, 
_numConsumingSegmentsQueried);
+    putLong(metadata, DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED, 
_numConsumingSegmentsProcessed);
+    putLong(metadata, DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED, 
_numConsumingSegmentsMatched);
+    putLong(metadata, DataTable.MetadataKey.TOTAL_DOCS, _numTotalDocs);
+    putLong(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER, 
_numSegmentsPrunedByServer);
+    putLong(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID, 
_numSegmentsPrunedInvalid);
+    putLong(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT, 
_numSegmentsPrunedByLimit);
+    putLong(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE, 
_numSegmentsPrunedByValue);
+    putLong(metadata, 
DataTable.MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS,
+        _explainPlanNumEmptyFilterSegments);
+    putLong(metadata, 
DataTable.MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS,
+        _explainPlanNumMatchAllFilterSegments);
+    // Collapse offline+realtime decomposition back to the combined 
wire-format keys.
+    putLong(metadata, DataTable.MetadataKey.THREAD_CPU_TIME_NS,
+        _offlineThreadCpuTimeNs + _realtimeThreadCpuTimeNs);
+    putLong(metadata, DataTable.MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS,
+        _offlineSystemActivitiesCpuTimeNs + 
_realtimeSystemActivitiesCpuTimeNs);
+    putLong(metadata, DataTable.MetadataKey.RESPONSE_SER_CPU_TIME_NS,
+        _offlineResponseSerializationCpuTimeNs + 
_realtimeResponseSerializationCpuTimeNs);
+    putLong(metadata, DataTable.MetadataKey.THREAD_MEM_ALLOCATED_BYTES,
+        _offlineThreadMemAllocatedBytes + _realtimeThreadMemAllocatedBytes);
+    putLong(metadata, DataTable.MetadataKey.RESPONSE_SER_MEM_ALLOCATED_BYTES,
+        _offlineResponseSerMemAllocatedBytes + 
_realtimeResponseSerMemAllocatedBytes);
+
+    // MIN_CONSUMING_FRESHNESS_TIME_MS: sentinel-guarded. Long.MAX_VALUE means 
"no input had a real
+    // freshness reading"; writing the sentinel would mislead downstream 
observability.
+    if (_minConsumingFreshnessTimeMs != Long.MAX_VALUE) {
+      
metadata.put(DataTable.MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
+          Long.toString(_minConsumingFreshnessTimeMs));
+    }
+
+    // Boolean flags: OR-reduced; only write the key when true (a "false" 
entry is noise and the
+    // existing reduce path treats absent as false).
+    if (_groupsTrimmed) {
+      metadata.put(DataTable.MetadataKey.GROUPS_TRIMMED.getName(), "true");
+    }
+    if (_numGroupsLimitReached) {
+      metadata.put(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), 
"true");
+    }
+    if (_numGroupsWarningLimitReached) {
+      
metadata.put(DataTable.MetadataKey.NUM_GROUPS_WARNING_LIMIT_REACHED.getName(), 
"true");
+    }
+
+    // EARLY_TERMINATION_REASON: 1-string wire format ↔ 3-boolean accumulator. 
aggregate() OR-reduces
+    // multiple per-server reasons into the three DISTINCT booleans (each 
per-server DataTable carries
+    // at most one enum name); on re-injection we can encode only one reason 
back. Pick the first set
+    // flag in declaration order so the round-trip is deterministic. 
Granularity loss when multiple
+    // flags are true is inherent to the single-string wire format — the 
user-visible "DISTINCT is
+    // partial" signal is preserved because any one of the three flags 
independently sets
+    // partial-ness on BrokerResponseNative.
+    BaseResultsBlock.EarlyTerminationReason distinctReason = null;
+    if (_maxRowsInDistinctReached) {
+      distinctReason = 
BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_ROWS;
+    } else if (_maxRowsWithoutChangeInDistinctReached) {
+      distinctReason = 
BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_ROWS_WITHOUT_CHANGE;
+    } else if (_maxExecutionTimeInDistinctReached) {
+      distinctReason = 
BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_EXECUTION_TIME;
+    }
+    if (distinctReason != null) {
+      metadata.put(DataTable.MetadataKey.EARLY_TERMINATION_REASON.getName(), 
distinctReason.name());
+    }
+
+    // Exceptions: copy each accumulated exception onto the DataTable. 
Last-write-wins on error-code
+    // collision (wire format is Map<Integer, String>).
+    for (QueryProcessingException qpe : _processingExceptions) {
+      dataTable.addException(qpe.getErrorCode(), qpe.getMessage());
+    }
+
+    // Trace: JSON-encode the per-server map into a single TRACE_INFO metadata 
entry. On downstream
+    // readback the aggregator reads it as one string under the synthetic 
server's name.
+    if (_enableTrace && !_traceInfo.isEmpty()) {
+      try {
+        metadata.put(DataTable.MetadataKey.TRACE_INFO.getName(), 
JsonUtils.objectToString(_traceInfo));
+      } catch (JsonProcessingException e) {
+        throw new IllegalStateException("Failed to serialize trace info for 
merged DataTable", e);
+      }
+    }
+  }
+
+  private static void putLong(Map<String, String> metadata, 
DataTable.MetadataKey key, long value) {
+    metadata.put(key.getName(), Long.toString(value));
+  }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 942544dab50..839948f88be 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -23,6 +23,7 @@ import it.unimi.dsi.fastutil.floats.FloatArrayList;
 import it.unimi.dsi.fastutil.ints.IntArrayList;
 import it.unimi.dsi.fastutil.longs.LongArrayList;
 import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import java.io.IOException;
 import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -37,6 +38,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.pinot.common.CustomObject;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.metrics.BrokerGauge;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -46,6 +48,9 @@ import 
org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
 import org.apache.pinot.core.data.table.IndexedTable;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
@@ -120,6 +125,49 @@ public class GroupByDataTableReducer implements 
DataTableReducer {
     }
   }
 
+  @Override
+  public DataTable mergeDataTablesOnly(String tableName, DataSchema dataSchema,
+      Map<ServerRoutingInstance, DataTable> dataTableMap, 
DataTableReducerContext reducerContext,
+      BrokerMetrics brokerMetrics) {
+    // When servers are configured to return final aggregate state, the input 
DataTables hold final
+    // (not intermediate) values, so the merge-only contract — "produce an 
intermediate DataTable that
+    // can be re-merged via the normal reduce path" — cannot be honored.
+    if (_queryContext.isServerReturnFinalResult()) {
+      throw new UnsupportedOperationException(
+          "Merge-only reduction is not supported when servers return final 
aggregate results "
+              + "(server.returnFinalResult / isServerReturnFinalResult); input 
would be final-typed, "
+              + "not intermediate.");
+    }
+    dataSchema = 
ReducerDataSchemaUtils.canonicalizeDataSchemaForGroupBy(_queryContext, 
dataSchema);
+    try {
+      if (dataTableMap.isEmpty()) {
+        return DataTableBuilderFactory.getDataTableBuilder(dataSchema).build();
+      }
+      Collection<DataTable> dataTables = dataTableMap.values();
+      // Reuse the regular reduce's merge: builds the IndexedTable of group 
keys + intermediate agg state.
+      IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, 
reducerContext);
+      DataTable mergedDataTable = buildIntermediateDataTable(dataSchema, 
indexedTable);
+      if (indexedTable.isTrimmed() && _queryContext.isUnsafeTrim()) {
+        
mergedDataTable.getMetadata().put(MetadataKey.GROUPS_TRIMMED.getName(), "true");
+      }
+      if (anyNumGroupsLimitReached(dataTables)) {
+        
mergedDataTable.getMetadata().put(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(),
 "true");
+      }
+      return mergedDataTable;
+    } catch (IOException e) {
+      throw new RuntimeException("Caught IOException while building merged 
intermediate DataTable for group-by", e);
+    }
+  }
+
+  private static boolean anyNumGroupsLimitReached(Collection<DataTable> 
dataTables) {
+    for (DataTable dataTable : dataTables) {
+      if 
(Boolean.parseBoolean(dataTable.getMetadata().get(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName())))
 {
+        return true;
+      }
+    }
+    return false;
+  }
+
   /// Reduces group-by results into a [ResultTable] and set it into the 
[BrokerResponseNative].
   private void reduceResult(BrokerResponseNative brokerResponseNative, 
DataSchema dataSchema,
       Collection<DataTable> dataTables, DataTableReducerContext 
reducerContext, String rawTableName,
@@ -482,4 +530,74 @@ public class GroupByDataTableReducer implements 
DataTableReducer {
         throw new IllegalStateException("Illegal column data type in group 
key: " + columnDataType);
     }
   }
+
+  /**
+   * Serializes the merged {@link IndexedTable} back into an intermediate 
{@link DataTable}, mirroring
+   * {@code GroupByResultsBlock#getDataTable()} so the output is byte-shape 
identical to a single
+   * server's intermediate group-by response. Group-key columns are written by 
stored type; OBJECT
+   * aggregate columns via {@link 
AggregationFunction#serializeIntermediateResult}. No limit / HAVING /
+   * post-aggregation / formatting is applied.
+   */
+  private DataTable buildIntermediateDataTable(DataSchema dataSchema, 
IndexedTable indexedTable)
+      throws IOException {
+    DataTableBuilder dataTableBuilder = 
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
+    ColumnDataType[] storedColumnDataTypes = 
dataSchema.getStoredColumnDataTypes();
+    Iterator<Record> iterator = indexedTable.iterator();
+    if (_queryContext.isNullHandlingEnabled()) {
+      RoaringBitmap[] nullBitmaps = new RoaringBitmap[_numColumns];
+      Object[] nullPlaceholders = new Object[_numColumns];
+      for (int colId = 0; colId < _numColumns; colId++) {
+        nullBitmaps[colId] = new RoaringBitmap();
+        nullPlaceholders[colId] = 
storedColumnDataTypes[colId].getNullPlaceholder();
+      }
+      int rowId = 0;
+      while (iterator.hasNext()) {
+        QueryThreadContext.checkTerminationAndSampleUsagePeriodically(rowId, 
"GroupByDataTableReducer#merge");
+        dataTableBuilder.startRow();
+        Object[] values = iterator.next().getValues();
+        for (int i = 0; i < _numColumns; i++) {
+          Object value = values[i];
+          if (storedColumnDataTypes[i] == ColumnDataType.OBJECT) {
+            if (value == null) {
+              dataTableBuilder.setNull(i);
+            } else {
+              dataTableBuilder.setColumn(i,
+                  _aggregationFunctions[i - 
_numGroupByExpressions].serializeIntermediateResult(value));
+            }
+          } else {
+            if (value == null) {
+              value = nullPlaceholders[i];
+              nullBitmaps[i].add(rowId);
+            }
+            DataTableBuilderUtils.setColumn(dataTableBuilder, 
storedColumnDataTypes[i], i, value);
+          }
+        }
+        dataTableBuilder.finishRow();
+        rowId++;
+      }
+      for (RoaringBitmap nullBitmap : nullBitmaps) {
+        dataTableBuilder.setNullRowIds(nullBitmap);
+      }
+    } else {
+      int rowId = 0;
+      while (iterator.hasNext()) {
+        QueryThreadContext.checkTerminationAndSampleUsagePeriodically(rowId++, 
"GroupByDataTableReducer#merge");
+        dataTableBuilder.startRow();
+        Object[] values = iterator.next().getValues();
+        for (int i = 0; i < _numColumns; i++) {
+          Object value = values[i];
+          if (value == null) {
+            dataTableBuilder.setNull(i);
+          } else if (storedColumnDataTypes[i] == ColumnDataType.OBJECT) {
+            dataTableBuilder.setColumn(i,
+                _aggregationFunctions[i - 
_numGroupByExpressions].serializeIntermediateResult(value));
+          } else {
+            DataTableBuilderUtils.setColumn(dataTableBuilder, 
storedColumnDataTypes[i], i, value);
+          }
+        }
+        dataTableBuilder.finishRow();
+      }
+    }
+    return dataTableBuilder.build();
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregatorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregatorTest.java
new file mode 100644
index 00000000000..608287b05be
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregatorTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import 
org.apache.pinot.core.operator.blocks.results.BaseResultsBlock.EarlyTerminationReason;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.spi.config.table.TableType;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Focused tests for {@link 
ExecutionStatsAggregator#setStatsOnMergedDataTable(DataTable)} —
+ * specifically the {@link MetadataKey#EARLY_TERMINATION_REASON} round-trip 
for DISTINCT queries.
+ *
+ * <p>The aggregator collapses a single-string wire-format reason into three 
independent booleans
+ * (one per {@link EarlyTerminationReason}); the round-trip can encode only 
one reason back. These
+ * tests pin:
+ * <ul>
+ *   <li>Each of the three reasons round-trips its own boolean when it is the 
only one set.
+ *   <li>When multiple booleans are set (different per-server reasons 
OR-reduced), the round-trip
+ *       encodes the first set flag in declaration order. The user-visible 
"DISTINCT is partial"
+ *       signal is preserved because any one flag independently sets 
partial-ness on
+ *       {@link BrokerResponseNative}.
+ *   <li>When no DISTINCT early-termination booleans are set, the key is 
absent from the merged
+ *       metadata (so the downstream decoder does not see a stray non-DISTINCT 
reason and the
+ *       merged DataTable is indistinguishable from one whose inputs all 
completed normally).
+ * </ul>
+ */
+public class ExecutionStatsAggregatorTest {
+
+  private static final ServerRoutingInstance SERVER =
+      new ServerRoutingInstance("localhost", 8080, TableType.OFFLINE);
+
+  /** Build an empty DataTable carrying a single {@code 
EARLY_TERMINATION_REASON} metadata entry. */
+  private static DataTable serverTableWithReason(EarlyTerminationReason 
reason) {
+    DataTable dt = DataTableBuilderFactory.getEmptyDataTable();
+    dt.getMetadata().put(MetadataKey.EARLY_TERMINATION_REASON.getName(), 
reason.name());
+    return dt;
+  }
+
+  private static DataTable emptyServerTable() {
+    return DataTableBuilderFactory.getEmptyDataTable();
+  }
+
+  /**
+   * Drives one aggregate-then-readback round-trip and returns the three 
DISTINCT booleans the
+   * downstream aggregator would flip from the merged DataTable's metadata. 
Mirrors the production
+   * sequence: aggregate per-server inputs → serialize onto merged DataTable → 
reduce path
+   * re-aggregates the merged DataTable on a fresh aggregator.
+   */
+  private static boolean[] roundTrip(DataTable... inputs) {
+    ExecutionStatsAggregator producer = new ExecutionStatsAggregator(false);
+    for (DataTable dt : inputs) {
+      producer.aggregate(SERVER, dt);
+    }
+    DataTable merged = DataTableBuilderFactory.getEmptyDataTable();
+    producer.setStatsOnMergedDataTable(merged);
+
+    ExecutionStatsAggregator consumer = new ExecutionStatsAggregator(false);
+    consumer.aggregate(SERVER, merged);
+    BrokerResponseNative response = new BrokerResponseNative();
+    consumer.setStats("testTable", response, mock(BrokerMetrics.class));
+    return new boolean[]{
+        response.isMaxRowsInDistinctReached(),
+        response.isMaxRowsWithoutChangeInDistinctReached(),
+        response.isMaxExecutionTimeInDistinctReached()
+    };
+  }
+
+  @Test
+  public void testDistinctMaxRowsReachedRoundTrip() {
+    boolean[] flags = 
roundTrip(serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_ROWS));
+    assertTrue(flags[0], "_maxRowsInDistinctReached must round-trip");
+    assertFalse(flags[1], "other DISTINCT flags must stay false");
+    assertFalse(flags[2], "other DISTINCT flags must stay false");
+  }
+
+  @Test
+  public void testDistinctMaxRowsWithoutChangeReachedRoundTrip() {
+    boolean[] flags = 
roundTrip(serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_ROWS_WITHOUT_CHANGE));
+    assertFalse(flags[0]);
+    assertTrue(flags[1], "_maxRowsWithoutChangeInDistinctReached must 
round-trip");
+    assertFalse(flags[2]);
+  }
+
+  @Test
+  public void testDistinctMaxExecutionTimeReachedRoundTrip() {
+    boolean[] flags = 
roundTrip(serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_EXECUTION_TIME));
+    assertFalse(flags[0]);
+    assertFalse(flags[1]);
+    assertTrue(flags[2], "_maxExecutionTimeInDistinctReached must round-trip");
+  }
+
+  /**
+   * Two inputs hit DIFFERENT DISTINCT early-termination reasons. The 
aggregator OR-reduces both
+   * into independent booleans, but the wire format is one string per 
DataTable. Round-trip can
+   * encode only one reason back; per the implementation contract, it picks 
the first set flag in
+   * declaration order — {@code DISTINCT_MAX_ROWS} when both rows-reached and 
execution-time are
+   * set.
+   *
+   * <p>The user-visible "DISTINCT is partial" signal is still preserved: at 
least one of the
+   * three flags is true post-round-trip. The exact reason granularity is 
best-effort.
+   */
+  @Test
+  public void testMultipleReasonsEncodeFirstInDeclarationOrder() {
+    boolean[] flags = roundTrip(
+        
serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_EXECUTION_TIME),
+        serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_ROWS));
+    assertTrue(flags[0], "DISTINCT_MAX_ROWS wins over 
DISTINCT_MAX_EXECUTION_TIME in priority order");
+    assertFalse(flags[1]);
+    assertFalse(flags[2], "execution-time flag is dropped — single-string wire 
format can only carry one reason");
+  }
+
+  /**
+   * No DISTINCT early-termination on any input. The merged DataTable must NOT 
carry an
+   * {@code EARLY_TERMINATION_REASON} key — writing one would mislead the 
downstream decoder, and
+   * an empty/false value is not part of the wire-format vocabulary (the 
consumer's
+   * {@code EarlyTerminationReason.valueOf} would throw on "" or "false" and 
the
+   * {@code IllegalArgumentException} would silently mask any other reason we 
tried to encode).
+   */
+  @Test
+  public void testNoReasonProducesAbsentMetadataKey() {
+    ExecutionStatsAggregator producer = new ExecutionStatsAggregator(false);
+    producer.aggregate(SERVER, emptyServerTable());
+    DataTable merged = DataTableBuilderFactory.getEmptyDataTable();
+    producer.setStatsOnMergedDataTable(merged);
+
+    
assertNull(merged.getMetadata().get(MetadataKey.EARLY_TERMINATION_REASON.getName()),
+        "no DISTINCT early-termination → key must be absent on merged 
DataTable");
+
+    // Sanity: re-aggregation of the merged DataTable leaves all three flags 
false.
+    boolean[] flags = roundTrip(emptyServerTable());
+    assertFalse(flags[0]);
+    assertFalse(flags[1]);
+    assertFalse(flags[2]);
+  }
+
+  /**
+   * Same DISTINCT reason reported by multiple inputs round-trips to the same 
single boolean. The
+   * aggregator's OR-reduce makes this a trivial case, but pinning it catches 
a future regression
+   * where setStatsOnMergedDataTable starts skipping the reason on repeats.
+   */
+  @Test
+  public void testSameReasonFromMultipleInputsRoundTripsToOneBoolean() {
+    boolean[] flags = roundTrip(
+        serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_ROWS),
+        serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_ROWS),
+        serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_ROWS));
+    assertTrue(flags[0]);
+    assertFalse(flags[1]);
+    assertFalse(flags[2]);
+  }
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/MergeDataTablesOnlyTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/MergeDataTablesOnlyTest.java
new file mode 100644
index 00000000000..7aea1fe90fb
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/MergeDataTablesOnlyTest.java
@@ -0,0 +1,577 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.utils.CommonConstants.Broker;
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.roaringbitmap.RoaringBitmap;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Verifies the core merge-only contract: reducing the raw per-server 
DataTable map produces the same
+ * final result as reducing a single re-injected merged intermediate 
DataTable, i.e.
+ * <pre>reduceOnDataTable(rawMap) == reduceOnDataTable({ syntheticKey -> 
mergeOnDataTable(rawMap) })</pre>
+ * for the supported reducers (Aggregation, Group-by, Distinct), including the 
OBJECT-column
+ * (DISTINCTCOUNT) round-trip. Also checks that the merged DataTable carries 
the intermediate schema and
+ * that unsupported reducers (Selection) throw.
+ */
+public class MergeDataTablesOnlyTest {
+  private static final long TIMEOUT_MS = 60_000L;
+  private BrokerReduceService _reduceService;
+
+  @BeforeClass
+  public void setUp() {
+    _reduceService =
+        new BrokerReduceService(new 
PinotConfiguration(Map.of(Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY, 2)));
+  }
+
+  @AfterClass
+  public void tearDown() {
+    _reduceService.shutDown();
+  }
+
+  @Test
+  public void testAggregationScalarRoundTrip() {
+    String query = "SELECT COUNT(*), SUM(m), MIN(m), MAX(m) FROM testTable";
+    DataSchema schema = new DataSchema(new String[]{"count(*)", "sum(m)", 
"min(m)", "max(m)"},
+        new ColumnDataType[]{ColumnDataType.LONG, ColumnDataType.DOUBLE, 
ColumnDataType.DOUBLE, ColumnDataType.DOUBLE});
+    List<DataTable> serverTables = List.of(
+        buildRow(schema, 3L, 10.0, 2.0, 8.0),
+        buildRow(schema, 2L, 5.0, 1.0, 9.0),
+        buildRow(schema, 4L, 20.0, 3.0, 7.0));
+    assertRoundTrip(query, serverTables);
+  }
+
+  @Test
+  public void testDistinctCountObjectRoundTrip()
+      throws IOException {
+    String query = "SELECT DISTINCTCOUNT(col1) FROM testTable";
+    AggregationFunction aggFunction = aggFunctions(query)[0];
+    DataSchema schema =
+        new DataSchema(new String[]{"distinctcount(col1)"}, new 
ColumnDataType[]{ColumnDataType.OBJECT});
+    List<DataTable> serverTables = List.of(
+        buildObjectRow(schema, aggFunction, new IntOpenHashSet(new int[]{1, 2, 
3})),
+        buildObjectRow(schema, aggFunction, new IntOpenHashSet(new int[]{3, 4, 
5})));
+    assertRoundTrip(query, serverTables);
+  }
+
+  @Test
+  public void testGroupByRoundTrip() {
+    String query = "SELECT col1, COUNT(*), SUM(m) FROM testTable GROUP BY 
col1";
+    DataSchema schema = new DataSchema(new String[]{"col1", "count(*)", 
"sum(m)"},
+        new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG, 
ColumnDataType.DOUBLE});
+    List<DataTable> serverTables = List.of(
+        buildGroupBy(schema, new Object[][]{{1, 2L, 10.0}, {2, 1L, 5.0}}),
+        buildGroupBy(schema, new Object[][]{{1, 3L, 20.0}, {3, 4L, 8.0}}));
+    assertRoundTrip(query, serverTables);
+  }
+
+  @Test
+  public void testDistinctRoundTrip() {
+    String query = "SELECT DISTINCT col1 FROM testTable";
+    DataSchema schema = new DataSchema(new String[]{"col1"}, new 
ColumnDataType[]{ColumnDataType.INT});
+    List<DataTable> serverTables = List.of(
+        buildGroupBy(schema, new Object[][]{{1}, {2}}),
+        buildGroupBy(schema, new Object[][]{{2}, {3}}));
+    assertRoundTrip(query, serverTables);
+  }
+
+  @Test
+  public void testAggregationServerReturnFinalResultRejected() {
+    // Under server.returnFinalResult, the per-server DataTables hold 
finalized (not intermediate)
+    // aggregate state; the merge-only contract cannot be honored, so the 
reducer must throw rather
+    // than silently merge final-typed values as if they were intermediates.
+    BrokerRequest brokerRequest = 
CalciteSqlCompiler.compileToBrokerRequest("SELECT COUNT(*) FROM testTable");
+    
brokerRequest.getPinotQuery().putToQueryOptions(Broker.Request.QueryOptionKey.SERVER_RETURN_FINAL_RESULT,
 "true");
+    DataSchema schema = new DataSchema(new String[]{"count(*)"}, new 
ColumnDataType[]{ColumnDataType.LONG});
+    Map<ServerRoutingInstance, DataTable> map = singletonMap(buildRow(schema, 
5L));
+    assertThrows(UnsupportedOperationException.class, () -> 
merge(brokerRequest, map));
+  }
+
+  @Test
+  public void testGroupByServerReturnFinalResultRejected() {
+    BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(
+        "SELECT col1, COUNT(*) FROM testTable GROUP BY col1");
+    
brokerRequest.getPinotQuery().putToQueryOptions(Broker.Request.QueryOptionKey.SERVER_RETURN_FINAL_RESULT,
 "true");
+    DataSchema schema = new DataSchema(new String[]{"col1", "count(*)"},
+        new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG});
+    Map<ServerRoutingInstance, DataTable> map = 
singletonMap(buildGroupBy(schema, new Object[][]{{1, 2L}}));
+    assertThrows(UnsupportedOperationException.class, () -> 
merge(brokerRequest, map));
+  }
+
+  @Test
+  public void testGroupByLimitZeroRoundTrip() {
+    // LIMIT 0 on group-by: mergeDataTablesOnly does NOT apply LIMIT 
(intermediate is unlimited),
+    // so a non-empty intermediate DataTable is produced. The re-injected 
reduce path applies the
+    // LIMIT and produces an empty final result, matching the direct reduce.
+    String query = "SELECT col1, COUNT(*) FROM testTable GROUP BY col1 LIMIT 
0";
+    DataSchema schema = new DataSchema(new String[]{"col1", "count(*)"},
+        new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG});
+    List<DataTable> serverTables = List.of(
+        buildGroupBy(schema, new Object[][]{{1, 2L}}),
+        buildGroupBy(schema, new Object[][]{{2, 3L}}));
+    assertRoundTrip(query, serverTables);
+  }
+
+  @Test
+  public void testMergedDataTableCarriesIntermediateSchema() {
+    String query = "SELECT COUNT(*), SUM(m), DISTINCTCOUNT(col1) FROM 
testTable";
+    AggregationFunction[] aggFunctions = aggFunctions(query);
+    // Build intermediate rows: COUNT->LONG, SUM->DOUBLE, DISTINCTCOUNT->OBJECT
+    DataSchema schema = new DataSchema(new String[]{"count(*)", "sum(m)", 
"distinctcount(col1)"},
+        new ColumnDataType[]{ColumnDataType.LONG, ColumnDataType.DOUBLE, 
ColumnDataType.OBJECT});
+    DataTableBuilder builder = 
DataTableBuilderFactory.getDataTableBuilder(schema);
+    try {
+      builder.startRow();
+      builder.setColumn(0, 5L);
+      builder.setColumn(1, 12.0);
+      builder.setColumn(2, aggFunctions[2].serializeIntermediateResult(new 
IntOpenHashSet(new int[]{1, 2})));
+      builder.finishRow();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    Map<ServerRoutingInstance, DataTable> map = singletonMap(builder.build());
+
+    DataTable merged = merge(query, map);
+    assertNotNull(merged);
+    ColumnDataType[] mergedTypes = merged.getDataSchema().getColumnDataTypes();
+    for (int i = 0; i < aggFunctions.length; i++) {
+      assertEquals(mergedTypes[i], 
aggFunctions[i].getIntermediateResultColumnType(),
+          "merged column " + i + " must carry the intermediate type, not the 
finalized type");
+    }
+  }
+
+  @Test
+  public void testEmptyMapReturnsNull() {
+    assertNull(merge("SELECT COUNT(*) FROM testTable", new HashMap<>()));
+  }
+
+  @Test
+  public void testAllEmptyDataTablesReturnsNull() {
+    // A metadata-only (0-row, null schema) data table is dropped, leaving 
nothing to merge.
+    DataSchema schema = new DataSchema(new String[]{"count(*)"}, new 
ColumnDataType[]{ColumnDataType.LONG});
+    DataTable emptyMetadataOnly = 
DataTableBuilderFactory.getDataTableBuilder(schema).build().toMetadataOnlyDataTable();
+    assertNull(merge("SELECT COUNT(*) FROM testTable", 
singletonMap(emptyMetadataOnly)));
+  }
+
+  @Test
+  public void testSelectionIsUnsupported() {
+    String query = "SELECT col1 FROM testTable";
+    DataSchema schema = new DataSchema(new String[]{"col1"}, new 
ColumnDataType[]{ColumnDataType.INT});
+    Map<ServerRoutingInstance, DataTable> map = 
singletonMap(buildGroupBy(schema, new Object[][]{{1}, {2}}));
+    // SelectionDataTableReducer inherits the default-throwing 
mergeDataTablesOnly (selection is out of
+    // scope for merge-only reduction).
+    assertThrows(UnsupportedOperationException.class, () -> merge(query, map));
+  }
+
+  @Test
+  public void testNullHandlingAggregationRoundTrip()
+      throws IOException {
+    BrokerRequest brokerRequest = 
CalciteSqlCompiler.compileToBrokerRequest("SELECT MIN(m) FROM testTable");
+    
brokerRequest.getPinotQuery().putToQueryOptions(Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING,
 "true");
+    DataSchema schema = new DataSchema(new String[]{"min(m)"}, new 
ColumnDataType[]{ColumnDataType.DOUBLE});
+    // One server has a value, the other contributes a null (e.g. all rows 
filtered out).
+    List<DataTable> serverTables = List.of(buildNullableDouble(schema, 5.0), 
buildNullableDouble(schema, null));
+    assertRoundTrip(brokerRequest, serverTables);
+  }
+
+  @Test
+  public void testConflictingSchemaSurfacedAsIncompleteMerge() {
+    // When one server's column types conflict with the first non-empty 
table's, filterDataTablesAndPickSchema
+    // drops that server. The merge proceeds on the remaining servers and the 
returned DataTable carries
+    // the INCOMPLETE_MERGE flag so a downstream consumer can detect that the 
merge ran over a strict
+    // subset of inputs.
+    String query = "SELECT COUNT(*) FROM testTable";
+    DataSchema schemaLong = new DataSchema(new String[]{"count(*)"}, new 
ColumnDataType[]{ColumnDataType.LONG});
+    DataSchema schemaInt = new DataSchema(new String[]{"count(*)"}, new 
ColumnDataType[]{ColumnDataType.INT});
+    DataTable t1 = buildRow(schemaLong, 5L);
+    DataTable t2 = buildRow(schemaLong, 7L);
+    // Conflicting column data type (INT vs LONG) — 
filterDataTablesAndPickSchema will drop t3.
+    DataTable t3 = buildRow(schemaInt, 99);
+
+    DataTable merged = merge(query, toMap(List.of(t1, t2, t3)));
+    assertNotNull(merged);
+    
assertEquals(merged.getMetadata().get(MetadataKey.INCOMPLETE_MERGE.getName()), 
"true",
+        "INCOMPLETE_MERGE must be surfaced when a server is dropped due to 
schema conflict");
+  }
+
+  @Test
+  public void testAllSameSchemaDoesNotSetIncompleteMerge() {
+    String query = "SELECT COUNT(*) FROM testTable";
+    DataSchema schema = new DataSchema(new String[]{"count(*)"}, new 
ColumnDataType[]{ColumnDataType.LONG});
+    DataTable merged = merge(query, toMap(List.of(buildRow(schema, 5L), 
buildRow(schema, 7L))));
+    assertNotNull(merged);
+    
assertNull(merged.getMetadata().get(MetadataKey.INCOMPLETE_MERGE.getName()),
+        "INCOMPLETE_MERGE must not be set on a clean merge");
+  }
+
+  @Test
+  public void testAdditiveStatsAggregatedOntoMergedMetadata() {
+    // Two inputs carry numDocsScanned + threadCpuTimeNs in their metadata; 
the merged DataTable
+    // should sum them so a downstream re-reduce sees the same totals.
+    String query = "SELECT COUNT(*) FROM testTable";
+    DataSchema schema = new DataSchema(new String[]{"count(*)"}, new 
ColumnDataType[]{ColumnDataType.LONG});
+    DataTable t1 = buildRow(schema, 5L);
+    DataTable t2 = buildRow(schema, 7L);
+    t1.getMetadata().put(MetadataKey.NUM_DOCS_SCANNED.getName(), "100");
+    t1.getMetadata().put(MetadataKey.THREAD_CPU_TIME_NS.getName(), "1000");
+    t2.getMetadata().put(MetadataKey.NUM_DOCS_SCANNED.getName(), "250");
+    t2.getMetadata().put(MetadataKey.THREAD_CPU_TIME_NS.getName(), "3000");
+
+    DataTable merged = merge(query, toMap(List.of(t1, t2)));
+    assertNotNull(merged);
+    
assertEquals(merged.getMetadata().get(MetadataKey.NUM_DOCS_SCANNED.getName()), 
"350");
+    
assertEquals(merged.getMetadata().get(MetadataKey.THREAD_CPU_TIME_NS.getName()),
 "4000");
+  }
+
+  @Test
+  public void testAdditiveStatsAbsentFromAllInputsWriteZero() {
+    // Mirrors setStats()'s unconditional-write pattern: when none of the 
inputs carry an additive
+    // stats key, the merged DataTable carries "0" for it. Downstream's 
Long.parseLong("0") and
+    // null-check both yield 0, so the user-facing BrokerResponse is identical 
either way.
+    String query = "SELECT COUNT(*) FROM testTable";
+    DataSchema schema = new DataSchema(new String[]{"count(*)"}, new 
ColumnDataType[]{ColumnDataType.LONG});
+    DataTable merged = merge(query, toMap(List.of(buildRow(schema, 5L), 
buildRow(schema, 7L))));
+    assertNotNull(merged);
+    
assertEquals(merged.getMetadata().get(MetadataKey.NUM_DOCS_SCANNED.getName()), 
"0");
+  }
+
+  @Test
+  public void testMinConsumingFreshnessTimeMsTakesMin() {
+    // MIN_CONSUMING_FRESHNESS_TIME_MS semantically captures the WORST 
freshness across servers
+    // (used by FRESHNESS_LAG_MS); the merge must MIN-reduce, not SUM.
+    String query = "SELECT COUNT(*) FROM testTable";
+    DataSchema schema = new DataSchema(new String[]{"count(*)"}, new 
ColumnDataType[]{ColumnDataType.LONG});
+    DataTable t1 = buildRow(schema, 5L);
+    DataTable t2 = buildRow(schema, 7L);
+    
t1.getMetadata().put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), 
"1000");
+    
t2.getMetadata().put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(), 
"500");
+
+    DataTable merged = merge(query, toMap(List.of(t1, t2)));
+    assertNotNull(merged);
+    
assertEquals(merged.getMetadata().get(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName()),
 "500");
+  }
+
+  @Test
+  public void testStatsFromDroppedZeroRowServersStillCounted() {
+    // A 0-row metadata-only DataTable is dropped from the merge inputs by 
filterDataTablesAndPickSchema,
+    // but its stats must still be aggregated (matches reduceOnDataTable's 
behavior — the aggregator
+    // runs BEFORE the iterator.remove()).
+    String query = "SELECT COUNT(*) FROM testTable";
+    DataSchema schema = new DataSchema(new String[]{"count(*)"}, new 
ColumnDataType[]{ColumnDataType.LONG});
+    DataTable withData = buildRow(schema, 5L);
+    withData.getMetadata().put(MetadataKey.NUM_DOCS_SCANNED.getName(), "100");
+    // A 0-row server (e.g. all rows filtered out) still reports stats.
+    DataTable emptyRows = 
DataTableBuilderFactory.getDataTableBuilder(schema).build();
+    emptyRows.getMetadata().put(MetadataKey.NUM_DOCS_SCANNED.getName(), "50");
+
+    DataTable merged = merge(query, toMap(List.of(withData, emptyRows)));
+    assertNotNull(merged);
+    
assertEquals(merged.getMetadata().get(MetadataKey.NUM_DOCS_SCANNED.getName()), 
"150");
+  }
+
+  @Test
+  public void testAdditiveStatsRoundTripThroughReduce() {
+    // End-to-end: aggregating stats on the merged DataTable means a 
downstream reduce on the
+    // re-injected merged DataTable sees the same numDocsScanned the direct 
reduce would.
+    BrokerRequest brokerRequest = 
CalciteSqlCompiler.compileToBrokerRequest("SELECT COUNT(*) FROM testTable");
+    DataSchema schema = new DataSchema(new String[]{"count(*)"}, new 
ColumnDataType[]{ColumnDataType.LONG});
+    DataTable t1 = buildRow(schema, 5L);
+    DataTable t2 = buildRow(schema, 7L);
+    t1.getMetadata().put(MetadataKey.NUM_DOCS_SCANNED.getName(), "100");
+    t2.getMetadata().put(MetadataKey.NUM_DOCS_SCANNED.getName(), "250");
+
+    BrokerResponseNative direct = reduce(brokerRequest, toMap(List.of(t1, 
t2)));
+    DataTable merged = merge(brokerRequest, toMap(List.of(t1, t2)));
+    assertNotNull(merged);
+    BrokerResponseNative viaMerge = reduce(brokerRequest, 
singletonMap(merged));
+
+    assertEquals(viaMerge.getNumDocsScanned(), direct.getNumDocsScanned(),
+        "merged DataTable must carry numDocsScanned so a downstream reduce 
produces the same total");
+    assertEquals(viaMerge.getNumDocsScanned(), 350L);
+  }
+
+  @Test
+  public void testExceptionsCopiedToMergedDataTable() {
+    // Per-server exceptions are accumulated by the aggregator and written 
back onto the merged
+    // DataTable's exception map, so a downstream reduce surfaces them on the 
BrokerResponse.
+    BrokerRequest brokerRequest = 
CalciteSqlCompiler.compileToBrokerRequest("SELECT COUNT(*) FROM testTable");
+    DataSchema schema = new DataSchema(new String[]{"count(*)"}, new 
ColumnDataType[]{ColumnDataType.LONG});
+    DataTable t1 = buildRow(schema, 5L);
+    DataTable t2 = buildRow(schema, 7L);
+    t1.addException(QueryErrorCode.QUERY_EXECUTION, "boom on server 1");
+    t2.addException(QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED, "limit on 
server 2");
+
+    DataTable merged = merge(brokerRequest, toMap(List.of(t1, t2)));
+    assertNotNull(merged);
+    
assertEquals(merged.getExceptions().get(QueryErrorCode.QUERY_EXECUTION.getId()),
 "boom on server 1");
+    
assertEquals(merged.getExceptions().get(QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.getId()),
+        "limit on server 2");
+
+    BrokerResponseNative viaMerge = reduce(brokerRequest, 
singletonMap(merged));
+    // Downstream reduce surfaces both as response exceptions (one per 
distinct error code).
+    assertEquals(viaMerge.getExceptions().size(), 2);
+  }
+
+  @Test
+  public void testTraceInfoCopiedToMergedDataTableWhenEnabled() {
+    // When trace is enabled, per-server trace info is JSON-encoded into the 
merged DataTable's
+    // TRACE_INFO metadata entry. The aggregator keys traces by server 
short-name (hostname-derived),
+    // so we use distinct hostnames to preserve attribution.
+    BrokerRequest brokerRequest = 
CalciteSqlCompiler.compileToBrokerRequest("SELECT COUNT(*) FROM testTable");
+    brokerRequest.getPinotQuery().putToQueryOptions(Broker.Request.TRACE, 
"true");
+    DataSchema schema = new DataSchema(new String[]{"count(*)"}, new 
ColumnDataType[]{ColumnDataType.LONG});
+    DataTable t1 = buildRow(schema, 5L);
+    DataTable t2 = buildRow(schema, 7L);
+    t1.getMetadata().put(MetadataKey.TRACE_INFO.getName(), 
"trace-from-server-1");
+    t2.getMetadata().put(MetadataKey.TRACE_INFO.getName(), 
"trace-from-server-2");
+    Map<ServerRoutingInstance, DataTable> map = new HashMap<>();
+    map.put(new ServerRoutingInstance("hostA", 1000, TableType.OFFLINE), t1);
+    map.put(new ServerRoutingInstance("hostB", 1001, TableType.OFFLINE), t2);
+
+    DataTable merged;
+    try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) {
+      merged = _reduceService.mergeOnDataTable(brokerRequest, map, TIMEOUT_MS, 
mock(BrokerMetrics.class));
+    }
+    assertNotNull(merged);
+    String mergedTrace = 
merged.getMetadata().get(MetadataKey.TRACE_INFO.getName());
+    assertNotNull(mergedTrace, "TRACE_INFO must be present on merged DataTable 
when trace is enabled");
+    // JSON-encoded map; both inputs' trace strings must be present.
+    assertTrue(mergedTrace.contains("trace-from-server-1"));
+    assertTrue(mergedTrace.contains("trace-from-server-2"));
+  }
+
+  @Test
+  public void testTraceInfoSkippedWhenTraceDisabled() {
+    // Without trace=true, the aggregator skips trace collection, so the 
merged DataTable has no
+    // TRACE_INFO metadata even if inputs carried it.
+    String query = "SELECT COUNT(*) FROM testTable";
+    DataSchema schema = new DataSchema(new String[]{"count(*)"}, new 
ColumnDataType[]{ColumnDataType.LONG});
+    DataTable t1 = buildRow(schema, 5L);
+    t1.getMetadata().put(MetadataKey.TRACE_INFO.getName(), 
"trace-from-server-1");
+
+    DataTable merged = merge(query, toMap(List.of(t1)));
+    assertNotNull(merged);
+    assertNull(merged.getMetadata().get(MetadataKey.TRACE_INFO.getName()));
+  }
+
+  @Test
+  public void testNumGroupsLimitReachedFlagSurfaced() {
+    String query = "SELECT col1, COUNT(*) FROM testTable GROUP BY col1";
+    DataSchema schema = new DataSchema(new String[]{"col1", "count(*)"},
+        new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG});
+    DataTable t1 = buildGroupBy(schema, new Object[][]{{1, 2L}});
+    DataTable t2 = buildGroupBy(schema, new Object[][]{{2, 3L}});
+    // Simulate a server that hit numGroupsLimit.
+    t2.getMetadata().put(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(), 
"true");
+
+    DataTable merged = merge(query, toMap(List.of(t1, t2)));
+    assertNotNull(merged);
+    
assertEquals(merged.getMetadata().get(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()),
 "true",
+        "NUM_GROUPS_LIMIT_REACHED from an input server must be surfaced on the 
merged DataTable");
+  }
+
+  // ---- helpers ----
+
+  /**
+   * Asserts the round-trip equivalence for the given query and per-server 
intermediate DataTables.
+   */
+  private void assertRoundTrip(String query, List<DataTable> serverTables) {
+    assertRoundTrip(CalciteSqlCompiler.compileToBrokerRequest(query), 
serverTables);
+  }
+
+  private void assertRoundTrip(BrokerRequest brokerRequest, List<DataTable> 
serverTables) {
+    BrokerResponseNative baseline = reduce(brokerRequest, toMap(serverTables));
+
+    DataTable merged = merge(brokerRequest, toMap(serverTables));
+    assertNotNull(merged, "merge produced null");
+
+    BrokerResponseNative viaMerge = reduce(brokerRequest, 
singletonMap(merged));
+
+    assertResultTablesEquivalent(baseline.getResultTable(), 
viaMerge.getResultTable());
+  }
+
+  private BrokerResponseNative reduce(BrokerRequest brokerRequest, 
Map<ServerRoutingInstance, DataTable> map) {
+    try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) {
+      return _reduceService.reduceOnDataTable(brokerRequest, brokerRequest, 
map, TIMEOUT_MS, mock(BrokerMetrics.class));
+    }
+  }
+
+  private DataTable merge(String query, Map<ServerRoutingInstance, DataTable> 
map) {
+    return merge(CalciteSqlCompiler.compileToBrokerRequest(query), map);
+  }
+
+  private DataTable merge(BrokerRequest brokerRequest, 
Map<ServerRoutingInstance, DataTable> map) {
+    try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) {
+      return _reduceService.mergeOnDataTable(brokerRequest, map, TIMEOUT_MS, 
mock(BrokerMetrics.class));
+    }
+  }
+
+  private static AggregationFunction[] aggFunctions(String query) {
+    QueryContext queryContext =
+        
QueryContextConverterUtils.getQueryContext(CalciteSqlCompiler.compileToBrokerRequest(query).getPinotQuery());
+    AggregationFunction[] aggregationFunctions = 
queryContext.getAggregationFunctions();
+    assertNotNull(aggregationFunctions);
+    return aggregationFunctions;
+  }
+
+  private static DataTable buildRow(DataSchema schema, Object... values) {
+    DataTableBuilder builder = 
DataTableBuilderFactory.getDataTableBuilder(schema);
+    try {
+      appendRow(builder, schema, values);
+      return builder.build();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static DataTable buildGroupBy(DataSchema schema, Object[][] rows) {
+    DataTableBuilder builder = 
DataTableBuilderFactory.getDataTableBuilder(schema);
+    try {
+      for (Object[] row : rows) {
+        appendRow(builder, schema, row);
+      }
+      return builder.build();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static DataTable buildObjectRow(DataSchema schema, 
AggregationFunction aggFunction, Object intermediate)
+      throws IOException {
+    DataTableBuilder builder = 
DataTableBuilderFactory.getDataTableBuilder(schema);
+    builder.startRow();
+    builder.setColumn(0, 
aggFunction.serializeIntermediateResult(intermediate));
+    builder.finishRow();
+    return builder.build();
+  }
+
+  /** Builds a single-row, single-DOUBLE-column intermediate DataTable with 
null-handling encoding. */
+  private static DataTable buildNullableDouble(DataSchema schema, Double value)
+      throws IOException {
+    DataTableBuilder builder = 
DataTableBuilderFactory.getDataTableBuilder(schema);
+    RoaringBitmap nullBitmap = new RoaringBitmap();
+    builder.startRow();
+    if (value == null) {
+      builder.setColumn(0, ((Number) 
ColumnDataType.DOUBLE.getNullPlaceholder()).doubleValue());
+      nullBitmap.add(0);
+    } else {
+      builder.setColumn(0, (double) value);
+    }
+    builder.finishRow();
+    builder.setNullRowIds(nullBitmap);
+    return builder.build();
+  }
+
+  private static void appendRow(DataTableBuilder builder, DataSchema schema, 
Object[] values)
+      throws IOException {
+    builder.startRow();
+    ColumnDataType[] columnDataTypes = schema.getColumnDataTypes();
+    for (int i = 0; i < values.length; i++) {
+      switch (columnDataTypes[i]) {
+        case INT:
+          builder.setColumn(i, (int) values[i]);
+          break;
+        case LONG:
+          builder.setColumn(i, (long) values[i]);
+          break;
+        case DOUBLE:
+          builder.setColumn(i, (double) values[i]);
+          break;
+        case STRING:
+          builder.setColumn(i, (String) values[i]);
+          break;
+        default:
+          throw new IllegalArgumentException("Unsupported test column type: " 
+ columnDataTypes[i]);
+      }
+    }
+    builder.finishRow();
+  }
+
+  private static Map<ServerRoutingInstance, DataTable> toMap(List<DataTable> 
serverTables) {
+    Map<ServerRoutingInstance, DataTable> map = new HashMap<>();
+    for (int i = 0; i < serverTables.size(); i++) {
+      map.put(new ServerRoutingInstance("localhost", 1000 + i, 
TableType.OFFLINE), serverTables.get(i));
+    }
+    return map;
+  }
+
+  private static Map<ServerRoutingInstance, DataTable> singletonMap(DataTable 
dataTable) {
+    Map<ServerRoutingInstance, DataTable> map = new HashMap<>();
+    map.put(new ServerRoutingInstance("Server_merge", 0, TableType.OFFLINE), 
dataTable);
+    return map;
+  }
+
+  /**
+   * Asserts the two result tables have the same schema and the same set of 
rows (order-independent, so
+   * the comparison is robust for unordered group-by / distinct results).
+   */
+  private static void assertResultTablesEquivalent(ResultTable expected, 
ResultTable actual) {
+    assertNotNull(expected);
+    assertNotNull(actual);
+    assertEquals(actual.getDataSchema(), expected.getDataSchema());
+    assertEquals(actual.getRows().size(), expected.getRows().size());
+    assertTrue(sortedRows(actual).equals(sortedRows(expected)),
+        "rows differ:\n expected=" + sortedRows(expected) + "\n actual  =" + 
sortedRows(actual));
+  }
+
+  private static List<String> sortedRows(ResultTable resultTable) {
+    List<String> rows = new ArrayList<>();
+    for (Object[] row : resultTable.getRows()) {
+      rows.add(Arrays.deepToString(row));
+    }
+    return rows.stream().sorted().collect(Collectors.toList());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to