This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5dee07228bb Add merge-only reducer API for intercepting intermediate
results (#18621)
5dee07228bb is described below
commit 5dee07228bbbd5d3fbf3e2cb93adce34ec32995a
Author: Navina Ramesh <[email protected]>
AuthorDate: Tue Jun 2 15:40:41 2026 -0700
Add merge-only reducer API for intercepting intermediate results (#18621)
---
.../apache/pinot/common/datatable/DataTable.java | 9 +-
.../common/datatable/DataTableBuilderUtils.java | 128 +++++
.../blocks/results/AggregationResultsBlock.java | 34 +-
.../blocks/results/GroupByResultsBlock.java | 95 +---
.../function/AggregationFunctionUtils.java | 39 ++
.../query/reduce/AggregationDataTableReducer.java | 101 +++-
.../core/query/reduce/BrokerReduceService.java | 220 +++++---
.../pinot/core/query/reduce/DataTableReducer.java | 29 ++
.../query/reduce/DistinctDataTableReducer.java | 33 +-
.../query/reduce/ExecutionStatsAggregator.java | 131 +++++
.../core/query/reduce/GroupByDataTableReducer.java | 118 +++++
.../query/reduce/ExecutionStatsAggregatorTest.java | 179 +++++++
.../core/query/reduce/MergeDataTablesOnlyTest.java | 577 +++++++++++++++++++++
13 files changed, 1501 insertions(+), 192 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
index bf3f2f86117..ddba67d92df 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/datatable/DataTable.java
@@ -156,11 +156,16 @@ public interface DataTable {
WORKLOAD_NAME(40, "workloadName", MetadataValueType.STRING),
// Needed so that we can track query id in Netty channel response.
QUERY_ID(41, "queryId", MetadataValueType.STRING),
- EARLY_TERMINATION_REASON(42, "earlyTerminationReason",
MetadataValueType.STRING);
+ EARLY_TERMINATION_REASON(42, "earlyTerminationReason",
MetadataValueType.STRING),
+ // Set on a merged-only DataTable when one or more input server DataTables
were dropped during
+ // the merge (e.g., due to a schema conflict), so the merge ran over a
strict subset of the
+ // inputs. How a downstream consumer reacts (skip, retry, accept with
annotation) is the
+ // consumer's policy.
+ INCOMPLETE_MERGE(43, "incompleteMerge", MetadataValueType.STRING);
// We keep this constant to track the max id added so far for backward
compatibility.
// Increase it when adding new keys, but NEVER DECREASE IT!!!
- private static final int MAX_ID = EARLY_TERMINATION_REASON.getId();
+ private static final int MAX_ID = INCOMPLETE_MERGE.getId();
private static final MetadataKey[] ID_TO_ENUM_KEY_MAP = new
MetadataKey[MAX_ID + 1];
private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new
HashMap<>();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java
new file mode 100644
index 00000000000..d111ba21428
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableBuilderUtils.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.common.datatable;
+
+import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import java.io.IOException;
+import java.math.BigDecimal;
+import org.apache.pinot.common.utils.ArrayListUtils;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+/**
+ * Helpers for writing values into a {@link DataTableBuilder}.
+ */
+public class DataTableBuilderUtils {
+ private DataTableBuilderUtils() {
+ }
+
+ /**
+ * Writes a non-null value of the given stored column data type into the
{@link DataTableBuilder} at
+ * the given column. Supports all scalar and array stored types. OBJECT
columns are NOT handled here
+ * (callers serialize them via the owning aggregation function). Used by the
group-by result
+ * serialization on both the server ({@code GroupByResultsBlock}) and the
merge-only reduce path.
+ */
+ public static void setColumn(DataTableBuilder dataTableBuilder,
ColumnDataType storedColumnDataType,
+ int columnIndex, Object value)
+ throws IOException {
+ switch (storedColumnDataType) {
+ case INT:
+ dataTableBuilder.setColumn(columnIndex, (int) value);
+ break;
+ case LONG:
+ dataTableBuilder.setColumn(columnIndex, (long) value);
+ break;
+ case FLOAT:
+ dataTableBuilder.setColumn(columnIndex, (float) value);
+ break;
+ case DOUBLE:
+ dataTableBuilder.setColumn(columnIndex, (double) value);
+ break;
+ case BIG_DECIMAL:
+ dataTableBuilder.setColumn(columnIndex, (BigDecimal) value);
+ break;
+ case STRING:
+ dataTableBuilder.setColumn(columnIndex, value.toString());
+ break;
+ case BYTES:
+ dataTableBuilder.setColumn(columnIndex, (ByteArray) value);
+ break;
+ case INT_ARRAY:
+ if (value instanceof IntArrayList) {
+ dataTableBuilder.setColumn(columnIndex,
ArrayListUtils.toIntArray((IntArrayList) value));
+ } else {
+ dataTableBuilder.setColumn(columnIndex, (int[]) value);
+ }
+ break;
+ case LONG_ARRAY:
+ if (value instanceof LongArrayList) {
+ dataTableBuilder.setColumn(columnIndex,
ArrayListUtils.toLongArray((LongArrayList) value));
+ } else {
+ dataTableBuilder.setColumn(columnIndex, (long[]) value);
+ }
+ break;
+ case FLOAT_ARRAY:
+ if (value instanceof FloatArrayList) {
+ dataTableBuilder.setColumn(columnIndex,
ArrayListUtils.toFloatArray((FloatArrayList) value));
+ } else {
+ dataTableBuilder.setColumn(columnIndex, (float[]) value);
+ }
+ break;
+ case DOUBLE_ARRAY:
+ if (value instanceof DoubleArrayList) {
+ dataTableBuilder.setColumn(columnIndex,
ArrayListUtils.toDoubleArray((DoubleArrayList) value));
+ } else {
+ dataTableBuilder.setColumn(columnIndex, (double[]) value);
+ }
+ break;
+ case BIG_DECIMAL_ARRAY:
+ if (value instanceof ObjectArrayList) {
+ //noinspection unchecked
+ dataTableBuilder.setColumn(columnIndex,
+ ArrayListUtils.toBigDecimalArray((ObjectArrayList<BigDecimal>)
value));
+ } else {
+ dataTableBuilder.setColumn(columnIndex, (BigDecimal[]) value);
+ }
+ break;
+ case STRING_ARRAY:
+ if (value instanceof ObjectArrayList) {
+ //noinspection unchecked
+ dataTableBuilder.setColumn(columnIndex,
ArrayListUtils.toStringArray((ObjectArrayList<String>) value));
+ } else {
+ dataTableBuilder.setColumn(columnIndex, (String[]) value);
+ }
+ break;
+ case BYTES_ARRAY:
+ if (value instanceof ObjectArrayList) {
+ //noinspection unchecked
+ dataTableBuilder.setColumn(columnIndex,
ArrayListUtils.toBytesArray((ObjectArrayList<ByteArray>) value));
+ } else {
+ dataTableBuilder.setColumn(columnIndex, (ByteArray[]) value);
+ }
+ break;
+ default:
+ throw new IllegalStateException("Unsupported stored type: " +
storedColumnDataType);
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
index 8f493869e3b..c28e2a8faa2 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
@@ -146,7 +146,7 @@ public class AggregationResultsBlock extends
BaseResultsBlock {
nullBitmaps[i].add(0);
}
assert result != null;
- setIntermediateResult(dataTableBuilder, columnDataTypes, i,
result);
+ AggregationFunctionUtils.setIntermediateResult(dataTableBuilder,
columnDataTypes[i], i, result);
}
}
}
@@ -174,7 +174,7 @@ public class AggregationResultsBlock extends
BaseResultsBlock {
if (columnDataTypes[i] == ColumnDataType.OBJECT) {
dataTableBuilder.setColumn(i,
_aggregationFunctions[i].serializeIntermediateResult(result));
} else {
- setIntermediateResult(dataTableBuilder, columnDataTypes, i,
result);
+ AggregationFunctionUtils.setIntermediateResult(dataTableBuilder,
columnDataTypes[i], i, result);
}
}
}
@@ -184,36 +184,6 @@ public class AggregationResultsBlock extends
BaseResultsBlock {
return dataTableBuilder.build();
}
- private void setIntermediateResult(DataTableBuilder dataTableBuilder,
ColumnDataType[] columnDataTypes, int index,
- Object result) throws IOException {
- ColumnDataType columnDataType = columnDataTypes[index];
- switch (columnDataType) {
- case INT:
- dataTableBuilder.setColumn(index, (int) result);
- break;
- case LONG:
- dataTableBuilder.setColumn(index, (long) result);
- break;
- case FLOAT:
- dataTableBuilder.setColumn(index, (float) result);
- break;
- case DOUBLE:
- dataTableBuilder.setColumn(index, (double) result);
- break;
- case BIG_DECIMAL:
- dataTableBuilder.setColumn(index, (BigDecimal) result);
- break;
- case STRING:
- dataTableBuilder.setColumn(index, result.toString());
- break;
- case BYTES:
- dataTableBuilder.setColumn(index, (ByteArray) result);
- break;
- default:
- throw new IllegalStateException("Illegal column data type in
intermediate result: " + columnDataType);
- }
- }
-
private void setFinalResult(DataTableBuilder dataTableBuilder,
ColumnDataType[] columnDataTypes, int index,
Object result)
throws IOException {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
index ea1e9bab31f..5baac68f099 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
@@ -18,13 +18,7 @@
*/
package org.apache.pinot.core.operator.blocks.results;
-import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
-import it.unimi.dsi.fastutil.floats.FloatArrayList;
-import it.unimi.dsi.fastutil.ints.IntArrayList;
-import it.unimi.dsi.fastutil.longs.LongArrayList;
-import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.io.IOException;
-import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
@@ -33,11 +27,11 @@ import java.util.Map;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.datatable.DataTable.MetadataKey;
import org.apache.pinot.common.request.context.ExpressionContext;
-import org.apache.pinot.common.utils.ArrayListUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
import org.apache.pinot.core.data.table.IntermediateRecord;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.data.table.Table;
@@ -45,7 +39,6 @@ import
org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.spi.query.QueryThreadContext;
-import org.apache.pinot.spi.utils.ByteArray;
import org.roaringbitmap.RoaringBitmap;
@@ -243,7 +236,7 @@ public class GroupByResultsBlock extends BaseResultsBlock {
nullBitmaps[i].add(rowId);
}
assert value != null;
- setDataTableColumn(storedColumnDataTypes[i], dataTableBuilder, i,
value);
+ DataTableBuilderUtils.setColumn(dataTableBuilder,
storedColumnDataTypes[i], i, value);
}
}
dataTableBuilder.finishRow();
@@ -265,7 +258,7 @@ public class GroupByResultsBlock extends BaseResultsBlock {
} else if (storedColumnDataTypes[i] == ColumnDataType.OBJECT) {
dataTableBuilder.setColumn(i, aggregationFunctions[i -
numKeyColumns].serializeIntermediateResult(value));
} else {
- setDataTableColumn(storedColumnDataTypes[i], dataTableBuilder, i,
value);
+ DataTableBuilderUtils.setColumn(dataTableBuilder,
storedColumnDataTypes[i], i, value);
}
}
dataTableBuilder.finishRow();
@@ -275,88 +268,6 @@ public class GroupByResultsBlock extends BaseResultsBlock {
return dataTableBuilder.build();
}
- private void setDataTableColumn(ColumnDataType storedColumnDataType,
DataTableBuilder dataTableBuilder,
- int columnIndex, Object value)
- throws IOException {
- switch (storedColumnDataType) {
- case INT:
- dataTableBuilder.setColumn(columnIndex, (int) value);
- break;
- case LONG:
- dataTableBuilder.setColumn(columnIndex, (long) value);
- break;
- case FLOAT:
- dataTableBuilder.setColumn(columnIndex, (float) value);
- break;
- case DOUBLE:
- dataTableBuilder.setColumn(columnIndex, (double) value);
- break;
- case BIG_DECIMAL:
- dataTableBuilder.setColumn(columnIndex, (BigDecimal) value);
- break;
- case STRING:
- dataTableBuilder.setColumn(columnIndex, value.toString());
- break;
- case BYTES:
- dataTableBuilder.setColumn(columnIndex, (ByteArray) value);
- break;
- case INT_ARRAY:
- if (value instanceof IntArrayList) {
- dataTableBuilder.setColumn(columnIndex,
ArrayListUtils.toIntArray((IntArrayList) value));
- } else {
- dataTableBuilder.setColumn(columnIndex, (int[]) value);
- }
- break;
- case LONG_ARRAY:
- if (value instanceof LongArrayList) {
- dataTableBuilder.setColumn(columnIndex,
ArrayListUtils.toLongArray((LongArrayList) value));
- } else {
- dataTableBuilder.setColumn(columnIndex, (long[]) value);
- }
- break;
- case FLOAT_ARRAY:
- if (value instanceof FloatArrayList) {
- dataTableBuilder.setColumn(columnIndex,
ArrayListUtils.toFloatArray((FloatArrayList) value));
- } else {
- dataTableBuilder.setColumn(columnIndex, (float[]) value);
- }
- break;
- case DOUBLE_ARRAY:
- if (value instanceof DoubleArrayList) {
- dataTableBuilder.setColumn(columnIndex,
ArrayListUtils.toDoubleArray((DoubleArrayList) value));
- } else {
- dataTableBuilder.setColumn(columnIndex, (double[]) value);
- }
- break;
- case BIG_DECIMAL_ARRAY:
- if (value instanceof ObjectArrayList) {
- //noinspection unchecked
- dataTableBuilder.setColumn(columnIndex,
- ArrayListUtils.toBigDecimalArray((ObjectArrayList<BigDecimal>)
value));
- } else {
- dataTableBuilder.setColumn(columnIndex, (BigDecimal[]) value);
- }
- break;
- case STRING_ARRAY:
- if (value instanceof ObjectArrayList) {
- //noinspection unchecked
- dataTableBuilder.setColumn(columnIndex,
ArrayListUtils.toStringArray((ObjectArrayList<String>) value));
- } else {
- dataTableBuilder.setColumn(columnIndex, (String[]) value);
- }
- break;
- case BYTES_ARRAY:
- if (value instanceof ObjectArrayList) {
- //noinspection unchecked
- dataTableBuilder.setColumn(columnIndex,
ArrayListUtils.toBytesArray((ObjectArrayList<ByteArray>) value));
- } else {
- dataTableBuilder.setColumn(columnIndex, (ByteArray[]) value);
- }
- break;
- default:
- throw new IllegalStateException("Unsupported stored type: " +
storedColumnDataType);
- }
- }
@Override
public Map<String, String> getResultsMetadata() {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index 010510d485e..a9e6f439320 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -23,6 +23,8 @@ import it.unimi.dsi.fastutil.floats.FloatArrayList;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import java.io.IOException;
+import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collections;
@@ -41,6 +43,7 @@ import
org.apache.pinot.common.request.context.predicate.Predicate;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.operator.BaseProjectOperator;
import org.apache.pinot.core.operator.blocks.ValueBlock;
import org.apache.pinot.core.operator.filter.BaseFilterOperator;
@@ -54,6 +57,7 @@ import org.apache.pinot.core.startree.StarTreeUtils;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.segment.spi.SegmentContext;
import
org.apache.pinot.segment.spi.index.startree.AggregationFunctionColumnPair;
+import org.apache.pinot.spi.utils.ByteArray;
/**
@@ -168,6 +172,41 @@ public class AggregationFunctionUtils {
}
}
+ /**
+ * Writes a non-OBJECT intermediate result into the {@link DataTableBuilder}
at the given column.
+ * Counterpart of {@link #getIntermediateResult}. OBJECT columns are handled
by the caller via
+ * {@link AggregationFunction#serializeIntermediateResult}, since they need
the aggregation function.
+ */
+ public static void setIntermediateResult(DataTableBuilder dataTableBuilder,
ColumnDataType columnDataType, int colId,
+ Object result)
+ throws IOException {
+ switch (columnDataType) {
+ case INT:
+ dataTableBuilder.setColumn(colId, (int) result);
+ break;
+ case LONG:
+ dataTableBuilder.setColumn(colId, (long) result);
+ break;
+ case FLOAT:
+ dataTableBuilder.setColumn(colId, (float) result);
+ break;
+ case DOUBLE:
+ dataTableBuilder.setColumn(colId, (double) result);
+ break;
+ case BIG_DECIMAL:
+ dataTableBuilder.setColumn(colId, (BigDecimal) result);
+ break;
+ case STRING:
+ dataTableBuilder.setColumn(colId, result.toString());
+ break;
+ case BYTES:
+ dataTableBuilder.setColumn(colId, (ByteArray) result);
+ break;
+ default:
+ throw new IllegalStateException("Illegal column data type in
intermediate result: " + columnDataType);
+ }
+ }
+
/**
* Reads the final result from the {@link DataTable}.
*/
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
index 8f105d6af71..b8e6f249ce6 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.query.reduce;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -28,6 +29,8 @@ import
org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import
org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -83,6 +86,22 @@ public class AggregationDataTableReducer implements
DataTableReducer {
private void reduceWithIntermediateResult(DataSchema dataSchema,
Collection<DataTable> dataTables,
BrokerResponseNative brokerResponseNative) {
int numAggregationFunctions = _aggregationFunctions.length;
+ Object[] intermediateResults = mergeIntermediateResults(dataSchema,
dataTables);
+ Object[] finalResults = new Object[numAggregationFunctions];
+ for (int i = 0; i < numAggregationFunctions; i++) {
+ AggregationFunction aggregationFunction = _aggregationFunctions[i];
+ Comparable result =
aggregationFunction.extractFinalResult(intermediateResults[i]);
+ finalResults[i] = result != null ?
aggregationFunction.getFinalResultColumnType().convert(result) : null;
+ }
+
brokerResponseNative.setResultTable(reduceToResultTable(getPrePostAggregationDataSchema(dataSchema),
finalResults));
+ }
+
+ /**
+ * Merges the per-server intermediate aggregation results into a single
{@code Object[]} of merged
+ * intermediate results (one per aggregation function), WITHOUT finalizing.
+ */
+ private Object[] mergeIntermediateResults(DataSchema dataSchema,
Collection<DataTable> dataTables) {
+ int numAggregationFunctions = _aggregationFunctions.length;
Object[] intermediateResults = new Object[numAggregationFunctions];
for (DataTable dataTable : dataTables) {
QueryThreadContext.checkTerminationAndSampleUsage("AggregationDataTableReducer");
@@ -110,13 +129,83 @@ public class AggregationDataTableReducer implements
DataTableReducer {
}
}
}
- Object[] finalResults = new Object[numAggregationFunctions];
- for (int i = 0; i < numAggregationFunctions; i++) {
- AggregationFunction aggregationFunction = _aggregationFunctions[i];
- Comparable result =
aggregationFunction.extractFinalResult(intermediateResults[i]);
- finalResults[i] = result != null ?
aggregationFunction.getFinalResultColumnType().convert(result) : null;
+ return intermediateResults;
+ }
+
+ @Override
+ public DataTable mergeDataTablesOnly(String tableName, DataSchema dataSchema,
+ Map<ServerRoutingInstance, DataTable> dataTableMap,
DataTableReducerContext reducerContext,
+ BrokerMetrics brokerMetrics) {
+ // cannot support finalized value types returned by the servers as an
intermediate result type
+ if (_queryContext.isServerReturnFinalResult()) {
+ throw new UnsupportedOperationException(
+ "Datatable merge to intermediate results cannot be supported when
servers return final result");
}
-
brokerResponseNative.setResultTable(reduceToResultTable(getPrePostAggregationDataSchema(dataSchema),
finalResults));
+ dataSchema =
ReducerDataSchemaUtils.canonicalizeDataSchemaForAggregation(_queryContext,
dataSchema);
+ try {
+ if (dataTableMap.isEmpty()) {
+ return DataTableBuilderFactory.getDataTableBuilder(dataSchema).build();
+ }
+ Object[] intermediateResults = mergeIntermediateResults(dataSchema,
dataTableMap.values());
+ return buildIntermediateDataTable(dataSchema, intermediateResults);
+ } catch (IOException e) {
+ throw new RuntimeException("Caught IOException while building merged
intermediate DataTable for aggregation", e);
+ }
+ }
+
+ /**
+ * Serializes the merged intermediate results into a single-row intermediate
{@link DataTable},
+ * mirroring the non-final branch of {@code
AggregationResultsBlock#getDataTable()} so the output is
+ * byte-shape identical to a single server's intermediate response. Never
finalizes.
+ */
+ private DataTable buildIntermediateDataTable(DataSchema dataSchema, Object[]
intermediateResults)
+ throws IOException {
+ ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+ int numColumns = columnDataTypes.length;
+ DataTableBuilder dataTableBuilder =
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
+ if (_queryContext.isNullHandlingEnabled()) {
+ RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
+ for (int i = 0; i < numColumns; i++) {
+ nullBitmaps[i] = new RoaringBitmap();
+ }
+ dataTableBuilder.startRow();
+ for (int i = 0; i < numColumns; i++) {
+ Object result = intermediateResults[i];
+ if (columnDataTypes[i] == ColumnDataType.OBJECT) {
+ if (result == null) {
+ dataTableBuilder.setNull(i);
+ } else {
+ dataTableBuilder.setColumn(i,
_aggregationFunctions[i].serializeIntermediateResult(result));
+ }
+ } else {
+ if (result == null) {
+ result = columnDataTypes[i].getNullPlaceholder();
+ nullBitmaps[i].add(0);
+ }
+ AggregationFunctionUtils.setIntermediateResult(dataTableBuilder,
columnDataTypes[i], i, result);
+ }
+ }
+ dataTableBuilder.finishRow();
+ for (RoaringBitmap nullBitmap : nullBitmaps) {
+ dataTableBuilder.setNullRowIds(nullBitmap);
+ }
+ } else {
+ dataTableBuilder.startRow();
+ for (int i = 0; i < numColumns; i++) {
+ Object result = intermediateResults[i];
+ if (result == null) {
+ dataTableBuilder.setNull(i);
+ } else {
+ if (columnDataTypes[i] == ColumnDataType.OBJECT) {
+ dataTableBuilder.setColumn(i,
_aggregationFunctions[i].serializeIntermediateResult(result));
+ } else {
+ AggregationFunctionUtils.setIntermediateResult(dataTableBuilder,
columnDataTypes[i], i, result);
+ }
+ }
+ }
+ dataTableBuilder.finishRow();
+ }
+ return dataTableBuilder.build();
}
private void processSingleFinalResult(DataSchema dataSchema, DataTable
dataTable,
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index 07dd8300ee0..f4fd325b483 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.common.datatable.DataTable;
import org.apache.pinot.common.metrics.BrokerMeter;
@@ -75,46 +76,11 @@ public class BrokerReduceService extends BaseReduceService {
ExecutionStatsAggregator aggregator = new
ExecutionStatsAggregator(enableTrace);
BrokerResponseNative brokerResponseNative = new BrokerResponseNative();
- // Cache a data schema from data tables (try to cache one with data rows
associated with it).
- DataSchema dataSchemaFromEmptyDataTable = null;
- DataSchema dataSchemaFromNonEmptyDataTable = null;
+ // Process server response metadata, drop
empty/null-schema/conflicting-schema data tables, and pick
+ // a data schema (preferring one backed by data rows).
List<ServerRoutingInstance> serversWithConflictingDataSchema = new
ArrayList<>();
-
- // Process server response metadata.
- Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator =
dataTableMap.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<ServerRoutingInstance, DataTable> entry = iterator.next();
- DataTable dataTable = entry.getValue();
-
- // aggregate metrics
- aggregator.aggregate(entry.getKey(), dataTable);
-
- // After processing the metadata, remove data tables without data rows
inside.
- DataSchema dataSchema = dataTable.getDataSchema();
- if (dataSchema == null) {
- iterator.remove();
- } else {
- // Try to cache a data table with data rows inside, or cache one with
data schema inside.
- if (dataTable.getNumberOfRows() == 0) {
- if (dataSchemaFromEmptyDataTable == null) {
- dataSchemaFromEmptyDataTable = dataSchema;
- }
- iterator.remove();
- } else {
- if (dataSchemaFromNonEmptyDataTable == null) {
- dataSchemaFromNonEmptyDataTable = dataSchema;
- } else {
- // Remove data tables with conflicting data schema.
- // NOTE: Only compare the column data types, since the column
names (string representation of expression)
- // can change across different versions.
- if (!Arrays.equals(dataSchema.getColumnDataTypes(),
dataSchemaFromNonEmptyDataTable.getColumnDataTypes())) {
- serversWithConflictingDataSchema.add(entry.getKey());
- iterator.remove();
- }
- }
- }
- }
- }
+ DataSchema cachedDataSchema =
+ filterDataTablesAndPickSchema(dataTableMap, aggregator,
serversWithConflictingDataSchema);
String tableName = serverBrokerRequest.getQuerySource().getTableName();
String rawTableName = TableNameBuilder.extractRawTableName(tableName);
@@ -142,34 +108,17 @@ public class BrokerReduceService extends
BaseReduceService {
// NOTE: When there is no cached data schema, that means all servers
encountered exception. In such case, return the
// response with metadata only.
- DataSchema cachedDataSchema =
- dataSchemaFromNonEmptyDataTable != null ?
dataSchemaFromNonEmptyDataTable : dataSchemaFromEmptyDataTable;
if (cachedDataSchema == null) {
return brokerResponseNative;
}
QueryContext serverQueryContext =
QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery());
DataTableReducer dataTableReducer =
ResultReducerFactory.getResultReducer(serverQueryContext);
-
- Integer minGroupTrimSizeQueryOption = null;
- Integer groupTrimThresholdQueryOption = null;
- Integer minInitialIndexedTableCapacityQueryOption = null;
- if (queryOptions != null) {
- minGroupTrimSizeQueryOption =
QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions);
- groupTrimThresholdQueryOption =
QueryOptionsUtils.getGroupTrimThreshold(queryOptions);
- minInitialIndexedTableCapacityQueryOption =
QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions);
- }
- int minGroupTrimSize = minGroupTrimSizeQueryOption != null ?
minGroupTrimSizeQueryOption : _minGroupTrimSize;
- int groupTrimThreshold =
- groupTrimThresholdQueryOption != null ? groupTrimThresholdQueryOption
: _groupByTrimThreshold;
- int minInitialIndexedTableCapacity =
- minInitialIndexedTableCapacityQueryOption != null ?
minInitialIndexedTableCapacityQueryOption
- : _minInitialIndexedTableCapacity;
+ DataTableReducerContext reducerContext =
createReducerContext(queryOptions, reduceTimeOutMs);
try {
dataTableReducer.reduceAndSetResults(rawTableName, cachedDataSchema,
dataTableMap, brokerResponseNative,
- new DataTableReducerContext(_reduceExecutorService,
_maxReduceThreadsPerQuery, reduceTimeOutMs,
- groupTrimThreshold, minGroupTrimSize,
minInitialIndexedTableCapacity), brokerMetrics);
+ reducerContext, brokerMetrics);
} catch (RuntimeException e) {
// First check terminate exception and use it as the results block if
exists. We want to return the termination
// reason when query is explicitly terminated.
@@ -227,6 +176,161 @@ public class BrokerReduceService extends
BaseReduceService {
serverBrokerRequest.getPinotQuery().getQueryOptions().get(QueryOptionKey.MATERIALIZED_VIEW_REWRITE));
}
+ /**
+ * Merge-only counterpart of {@link #reduceOnDataTable}: merges the
per-server DataTables into a single
+ * intermediate {@link DataTable} WITHOUT finalizing (no {@code
extractFinalResult}). Returns
+ * {@code null} when there is nothing to merge (empty map, or all servers
returned no data / errored).
+ * Reuses the same schema-filtering preamble and reducer/trim resolution as
the regular reduce.
+ *
+ * <p>The returned DataTable carries intermediate, non-finalized state
(byte-shape identical to a
+ * single server's partial response).
+ *
+ * <p>If one or more input server DataTables are dropped during merge (e.g.,
due to a schema
+ * conflict with the first non-empty table), the returned DataTable's
metadata carries the
+ * {@link DataTable.MetadataKey#INCOMPLETE_MERGE} flag set to {@code "true"}
and the
+ * {@link BrokerMeter#RESPONSE_MERGE_EXCEPTIONS} meter is incremented. The
callers can
+ * read the flag and decide policy (skip, retry, accept).
+ *
+ * <p>Execution stats from the input DataTables are aggregated via {@link
ExecutionStatsAggregator}
+ * and written back onto the merged DataTable: additive longs (e.g. {@code
numDocsScanned},
+ * {@code numSegments*}, {@code threadCpuTimeNs}), {@code
minConsumingFreshnessTimeMs} (MIN-reduced),
+ * boolean flags ({@code groupsTrimmed}, {@code numGroupsLimitReached},
etc., OR-reduced),
+ * per-server exceptions, and trace info (JSON-encoded if {@code
trace=true}). This method does NOT bump
+ * broker meters/timers for the input stats.
+ *
+ * <p>Limitations on stats:
+ * <ul>
+ * <li>Exception attribution to original servers is lost; the wire format
is {@code Map<Integer,
+ * String>} so collisions on the same error code are resolved
last-write-wins.
+ * <li>Per-server trace info is JSON-encoded into a single {@code
TRACE_INFO} entry
+ * </ul>
+ *
+ * <p>WARNING: this performs a full cross-server merge and re-serializes the
result — heavyweight work
+ * that must be run asynchronously, decoupled from request serving. Intended
for callers that want to
+ * intercept the merged intermediate result instead of the finalized one.
+ *
+ * <p>[org.apache.pinot.spi.query.QueryThreadContext] must already be set up
before calling this method.
+ */
+ @Nullable
+ public DataTable mergeOnDataTable(BrokerRequest serverBrokerRequest,
+ Map<ServerRoutingInstance, DataTable> dataTableMap, long
reduceTimeOutMs, BrokerMetrics brokerMetrics) {
+ if (dataTableMap.isEmpty()) {
+ return null;
+ }
+ Map<String, String> queryOptions =
serverBrokerRequest.getPinotQuery().getQueryOptions();
+ boolean enableTrace =
+ queryOptions != null &&
Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE));
+ // Aggregate stats while filtering so we can write them back onto the
merged DataTable's metadata.
+ ExecutionStatsAggregator aggregator = new
ExecutionStatsAggregator(enableTrace);
+ List<ServerRoutingInstance> conflictingServers = new ArrayList<>();
+ DataSchema cachedDataSchema = filterDataTablesAndPickSchema(dataTableMap,
aggregator, conflictingServers);
+ if (cachedDataSchema == null) {
+ // All servers returned no data or encountered exceptions; nothing to
merge.
+ return null;
+ }
+
+ String rawTableName =
TableNameBuilder.extractRawTableName(serverBrokerRequest.getQuerySource().getTableName());
+ QueryContext serverQueryContext =
QueryContextConverterUtils.getQueryContext(serverBrokerRequest.getPinotQuery());
+ DataTableReducer dataTableReducer =
ResultReducerFactory.getResultReducer(serverQueryContext);
+ DataTableReducerContext reducerContext =
+
createReducerContext(serverBrokerRequest.getPinotQuery().getQueryOptions(),
reduceTimeOutMs);
+ DataTable merged = dataTableReducer.mergeDataTablesOnly(rawTableName,
cachedDataSchema, dataTableMap,
+ reducerContext, brokerMetrics);
+
+ if (merged != null) {
+ // Write accumulated stats (additive longs, booleans, MIN freshness,
exceptions, trace) onto the
+ // merged DataTable
+ aggregator.setStatsOnMergedDataTable(merged);
+
+ if (!conflictingServers.isEmpty()) {
+ LOGGER.warn("Merge-only reduce dropped {} server response(s) for table
{} due to data schema "
+ + "inconsistency: {}", conflictingServers.size(), rawTableName,
conflictingServers);
+ brokerMetrics.addMeteredTableValue(rawTableName,
BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1);
+
merged.getMetadata().put(DataTable.MetadataKey.INCOMPLETE_MERGE.getName(),
"true");
+ }
+ }
+ return merged;
+ }
+
+ /**
+ * Processes per-server response metadata and filters {@code dataTableMap}
in place:
+ * - drops tables with a null schema
+ * - drops empty tables (remembering their schema as a fallback)
+ * - drops tables whose column data types conflict with the first non-empty
table
+ * (collected into {@code conflictingServers}).
+ *
+ * When an {@code aggregator} is provided, per-table execution stats are
aggregated before a table is
+ * dropped.
+ * Returns the remembered data schema (non-empty preferred, else empty-table
schema, else
+ * {@code null}).
+ */
+ private static DataSchema
filterDataTablesAndPickSchema(Map<ServerRoutingInstance, DataTable>
dataTableMap,
+ @Nullable ExecutionStatsAggregator aggregator,
List<ServerRoutingInstance> conflictingServers) {
+ DataSchema dataSchemaFromEmptyDataTable = null;
+ DataSchema dataSchemaFromNonEmptyDataTable = null;
+ Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator =
dataTableMap.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<ServerRoutingInstance, DataTable> entry = iterator.next();
+ DataTable dataTable = entry.getValue();
+
+ // aggregate metrics
+ if (aggregator != null) {
+ aggregator.aggregate(entry.getKey(), dataTable);
+ }
+
+ // After processing the metadata, remove data tables without data rows
inside.
+ DataSchema dataSchema = dataTable.getDataSchema();
+ if (dataSchema == null) {
+ iterator.remove();
+ } else {
+ // Try to cache a data table with data rows inside, or cache one with
data schema inside.
+ if (dataTable.getNumberOfRows() == 0) {
+ if (dataSchemaFromEmptyDataTable == null) {
+ dataSchemaFromEmptyDataTable = dataSchema;
+ }
+ iterator.remove();
+ } else {
+ if (dataSchemaFromNonEmptyDataTable == null) {
+ dataSchemaFromNonEmptyDataTable = dataSchema;
+ } else {
+ // Remove data tables with conflicting data schema.
+ // NOTE: Only compare the column data types, since the column
names (string representation of expression)
+ // can change across different versions.
+ if (!Arrays.equals(dataSchema.getColumnDataTypes(),
dataSchemaFromNonEmptyDataTable.getColumnDataTypes())) {
+ conflictingServers.add(entry.getKey());
+ iterator.remove();
+ }
+ }
+ }
+ }
+ }
+ return dataSchemaFromNonEmptyDataTable != null ?
dataSchemaFromNonEmptyDataTable : dataSchemaFromEmptyDataTable;
+ }
+
+ /**
+ * Resolves the group-by trim parameters (query option overrides, else
broker defaults) and builds the
+ * {@link DataTableReducerContext}
+ */
+ private DataTableReducerContext createReducerContext(@Nullable Map<String,
String> queryOptions,
+ long reduceTimeOutMs) {
+ Integer minGroupTrimSizeQueryOption = null;
+ Integer groupTrimThresholdQueryOption = null;
+ Integer minInitialIndexedTableCapacityQueryOption = null;
+ if (queryOptions != null) {
+ minGroupTrimSizeQueryOption =
QueryOptionsUtils.getMinBrokerGroupTrimSize(queryOptions);
+ groupTrimThresholdQueryOption =
QueryOptionsUtils.getGroupTrimThreshold(queryOptions);
+ minInitialIndexedTableCapacityQueryOption =
QueryOptionsUtils.getMinInitialIndexedTableCapacity(queryOptions);
+ }
+ int minGroupTrimSize = minGroupTrimSizeQueryOption != null ?
minGroupTrimSizeQueryOption : _minGroupTrimSize;
+ int groupTrimThreshold =
+ groupTrimThresholdQueryOption != null ? groupTrimThresholdQueryOption
: _groupByTrimThreshold;
+ int minInitialIndexedTableCapacity =
+ minInitialIndexedTableCapacityQueryOption != null ?
minInitialIndexedTableCapacityQueryOption
+ : _minInitialIndexedTableCapacity;
+ return new DataTableReducerContext(_reduceExecutorService,
_maxReduceThreadsPerQuery, reduceTimeOutMs,
+ groupTrimThreshold, minGroupTrimSize, minInitialIndexedTableCapacity);
+ }
+
public void shutDown() {
_reduceExecutorService.shutdownNow();
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java
index 496df03da85..597b048958f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java
@@ -42,4 +42,33 @@ public interface DataTableReducer {
*/
void reduceAndSetResults(String tableName, DataSchema dataSchema,
Map<ServerRoutingInstance, DataTable> dataTableMap,
BrokerResponseNative brokerResponseNative, DataTableReducerContext
reducerContext, BrokerMetrics brokerMetrics);
+
+ /**
+ * Merges per-server data tables into a single <em>intermediate</em> {@link
DataTable} WITHOUT
+ * finalizing (no {@code extractFinalResult} / result formatting). The
returned DataTable carries
+ * intermediate state byte-shape identical to a single server's partial
response, so a consumer can
+ * intercept the merged intermediate results and custom handle it. It is
expected that the merged
+ * intermediate result can be reinjected in the normal reduce path.
+ *
+ * <p><b>WARNING:</b> this performs a full cross-server merge and
re-serializes the result —
+ * heavyweight work that must be run asynchronously, decoupled from request
serving. Invoking it
+ * inline while a query is being served adds that cost to the query and can
severely degrade its
+ * latency.
+ *
+ * <p>Reducers that cannot produce a re-mergeable intermediate (e.g.
explain-plan) leave this default
+ * implementation, which throws {@link UnsupportedOperationException}.
+ *
+ * @param tableName table name
+ * @param dataSchema schema from broker reduce service
+ * @param dataTableMap map of servers to data tables
+ * @param reducerContext DataTableReducer context
+ * @param brokerMetrics broker metrics
+ * @return the merged intermediate DataTable (intermediate, non-finalized
state)
+ */
+ default DataTable mergeDataTablesOnly(String tableName, DataSchema
dataSchema,
+ Map<ServerRoutingInstance, DataTable> dataTableMap,
DataTableReducerContext reducerContext,
+ BrokerMetrics brokerMetrics) {
+ throw new UnsupportedOperationException(
+ getClass().getSimpleName() + " does not support merge-only reduction");
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
index dab95c23227..c3a1c7d788c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.core.query.reduce;
+import java.io.IOException;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.pinot.common.datatable.DataTable;
@@ -27,6 +29,7 @@ import
org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.core.query.distinct.table.BigDecimalDistinctTable;
import org.apache.pinot.core.query.distinct.table.BytesDistinctTable;
import org.apache.pinot.core.query.distinct.table.DistinctTable;
@@ -61,8 +64,34 @@ public class DistinctDataTableReducer implements
DataTableReducer {
brokerResponseNative.setResultTable(new ResultTable(dataSchema,
List.of()));
return;
}
+ DistinctTable distinctTable = mergeToDistinctTable(dataSchema,
dataTableMap.values());
+ brokerResponseNative.setResultTable(distinctTable.toResultTable());
+ }
+
+ @Override
+ public DataTable mergeDataTablesOnly(String tableName, DataSchema dataSchema,
+ Map<ServerRoutingInstance, DataTable> dataTableMap,
DataTableReducerContext reducerContext,
+ BrokerMetrics brokerMetrics) {
+ dataSchema =
ReducerDataSchemaUtils.canonicalizeDataSchemaForDistinct(_queryContext,
dataSchema);
+ try {
+ if (dataTableMap.isEmpty() || _queryContext.getLimit() == 0) {
+ return DataTableBuilderFactory.getDataTableBuilder(dataSchema).build();
+ }
+ DistinctTable distinctTable = mergeToDistinctTable(dataSchema,
dataTableMap.values());
+ // Intermediate form: limit/sorting not applied
+ return distinctTable.toDataTable();
+ } catch (IOException e) {
+ throw new RuntimeException("Caught IOException while building merged
intermediate DataTable for distinct", e);
+ }
+ }
+
+ /**
+ * Merges the per-server DataTables into a single {@link DistinctTable}
(early-stopping once the
+ * distinct set is satisfied). Shared by the normal reduce path and the
merge-only path.
+ */
+ private DistinctTable mergeToDistinctTable(DataSchema dataSchema,
Collection<DataTable> dataTables) {
DistinctTable distinctTable = null;
- for (DataTable dataTable : dataTableMap.values()) {
+ for (DataTable dataTable : dataTables) {
QueryThreadContext.checkTerminationAndSampleUsage("DistinctDataTableReducer");
if (distinctTable == null) {
distinctTable = createDistinctTable(dataSchema, dataTable);
@@ -75,7 +104,7 @@ public class DistinctDataTableReducer implements
DataTableReducer {
}
}
}
- brokerResponseNative.setResultTable(distinctTable.toResultTable());
+ return distinctTable;
}
private DistinctTable createDistinctTable(DataSchema dataSchema, DataTable
dataTable) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
index 3ca72dd04e6..d76596987ae 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregator.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.query.reduce;
+import com.fasterxml.jackson.core.JsonProcessingException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -33,6 +34,7 @@ import
org.apache.pinot.common.response.broker.QueryProcessingException;
import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.JsonUtils;
public class ExecutionStatsAggregator {
@@ -346,4 +348,133 @@ public class ExecutionStatsAggregator {
consumer.accept(Long.parseLong(strValue));
}
}
+
+ /**
+ * Writes the accumulated execution stats onto the given DataTable's
metadata (and exception map),
+ * so a merged-only DataTable can be re-injected into the regular reduce
path with the same
+ * downstream totals as a direct reduce of the original inputs would have
produced.
+ *
+ * <p>Unlike {@link #setStats(String, BrokerResponseNative, BrokerMetrics)},
this method does NOT
+ * bump broker meters or timers. The merge-only path is expected to run off
the request-serving
+ * path; meter increments fire when the result is eventually re-reduced.
+ *
+ * <p>Limitations of the round-trip via DataTable metadata:
+ * <ul>
+ * <li>CPU and memory stats round-trip as a single combined value per key
+ * ({@link DataTable.MetadataKey#THREAD_CPU_TIME_NS}, etc.) because
the wire format has no
+ * per-tableType keys. In the standard reduce path the aggregator
attributes each server's
+ * value to offline vs realtime based on {@code
routingInstance.getTableType()} and surfaces
+ * them as separate fields on {@link BrokerResponseNative}; on a
re-reduce of the merged
+ * DataTable the whole combined value lands in one bucket — whichever
tableType the caller
+ * assigned to the synthetic server response. So the per-tableType
split visible on
+ * BrokerResponse is lost across the round-trip, even though the total
is preserved.
+ * <li>Per-server exceptions are written via {@link
DataTable#addException(int, String)} which
+ * backs a {@code Map<Integer, String>} keyed by error code; if two
inputs reported the
+ * same error code the merged DataTable carries last-write-wins for
the message.
+ * <li>Per-server trace info is JSON-encoded into a single
+ * {@link DataTable.MetadataKey#TRACE_INFO} entry; the downstream
aggregator reads it back
+ * as one trace blob attributed to the synthetic server.
+ * <li>DISTINCT early-termination reasons round-trip as a single enum name
+ * ({@link DataTable.MetadataKey#EARLY_TERMINATION_REASON}) because
the wire format is one
+ * string per DataTable. The aggregator OR-reduces multiple per-server
reasons into three
+ * independent booleans; on re-injection we encode only the first set
flag in declaration
+ * order. The user-visible "DISTINCT is partial" signal is preserved
(each of the three
+ * flags independently sets partial-ness on {@link
BrokerResponseNative}); the exact reason
+ * granularity is best-effort when multiple flags are true.
+ * </ul>
+ */
+ public void setStatsOnMergedDataTable(DataTable dataTable) {
+ Map<String, String> metadata = dataTable.getMetadata();
+
+ // Additive long stats: mirror setStats()'s pattern of unconditional
writes. Accumulators are
+ // initialized to 0, so a 0 here is indistinguishable to downstream from
"absent" — the
+ // downstream aggregator's Long.parseLong("0") + null-check both produce 0.
+ putLong(metadata, DataTable.MetadataKey.NUM_DOCS_SCANNED, _numDocsScanned);
+ putLong(metadata, DataTable.MetadataKey.NUM_ENTRIES_SCANNED_IN_FILTER,
_numEntriesScannedInFilter);
+ putLong(metadata, DataTable.MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER,
_numEntriesScannedPostFilter);
+ putLong(metadata, DataTable.MetadataKey.NUM_SEGMENTS_QUERIED,
_numSegmentsQueried);
+ putLong(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PROCESSED,
_numSegmentsProcessed);
+ putLong(metadata, DataTable.MetadataKey.NUM_SEGMENTS_MATCHED,
_numSegmentsMatched);
+ putLong(metadata, DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_QUERIED,
_numConsumingSegmentsQueried);
+ putLong(metadata, DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_PROCESSED,
_numConsumingSegmentsProcessed);
+ putLong(metadata, DataTable.MetadataKey.NUM_CONSUMING_SEGMENTS_MATCHED,
_numConsumingSegmentsMatched);
+ putLong(metadata, DataTable.MetadataKey.TOTAL_DOCS, _numTotalDocs);
+ putLong(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_SERVER,
_numSegmentsPrunedByServer);
+ putLong(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_INVALID,
_numSegmentsPrunedInvalid);
+ putLong(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_LIMIT,
_numSegmentsPrunedByLimit);
+ putLong(metadata, DataTable.MetadataKey.NUM_SEGMENTS_PRUNED_BY_VALUE,
_numSegmentsPrunedByValue);
+ putLong(metadata,
DataTable.MetadataKey.EXPLAIN_PLAN_NUM_EMPTY_FILTER_SEGMENTS,
+ _explainPlanNumEmptyFilterSegments);
+ putLong(metadata,
DataTable.MetadataKey.EXPLAIN_PLAN_NUM_MATCH_ALL_FILTER_SEGMENTS,
+ _explainPlanNumMatchAllFilterSegments);
+ // Collapse offline+realtime decomposition back to the combined
wire-format keys.
+ putLong(metadata, DataTable.MetadataKey.THREAD_CPU_TIME_NS,
+ _offlineThreadCpuTimeNs + _realtimeThreadCpuTimeNs);
+ putLong(metadata, DataTable.MetadataKey.SYSTEM_ACTIVITIES_CPU_TIME_NS,
+ _offlineSystemActivitiesCpuTimeNs +
_realtimeSystemActivitiesCpuTimeNs);
+ putLong(metadata, DataTable.MetadataKey.RESPONSE_SER_CPU_TIME_NS,
+ _offlineResponseSerializationCpuTimeNs +
_realtimeResponseSerializationCpuTimeNs);
+ putLong(metadata, DataTable.MetadataKey.THREAD_MEM_ALLOCATED_BYTES,
+ _offlineThreadMemAllocatedBytes + _realtimeThreadMemAllocatedBytes);
+ putLong(metadata, DataTable.MetadataKey.RESPONSE_SER_MEM_ALLOCATED_BYTES,
+ _offlineResponseSerMemAllocatedBytes +
_realtimeResponseSerMemAllocatedBytes);
+
+ // MIN_CONSUMING_FRESHNESS_TIME_MS: sentinel-guarded. Long.MAX_VALUE means
"no input had a real
+ // freshness reading"; writing the sentinel would mislead downstream
observability.
+ if (_minConsumingFreshnessTimeMs != Long.MAX_VALUE) {
+
metadata.put(DataTable.MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
+ Long.toString(_minConsumingFreshnessTimeMs));
+ }
+
+ // Boolean flags: OR-reduced; only write the key when true (a "false"
entry is noise and the
+ // existing reduce path treats absent as false).
+ if (_groupsTrimmed) {
+ metadata.put(DataTable.MetadataKey.GROUPS_TRIMMED.getName(), "true");
+ }
+ if (_numGroupsLimitReached) {
+ metadata.put(DataTable.MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(),
"true");
+ }
+ if (_numGroupsWarningLimitReached) {
+
metadata.put(DataTable.MetadataKey.NUM_GROUPS_WARNING_LIMIT_REACHED.getName(),
"true");
+ }
+
+ // EARLY_TERMINATION_REASON: 1-string wire format ↔ 3-boolean accumulator.
aggregate() OR-reduces
+ // multiple per-server reasons into the three DISTINCT booleans (each
per-server DataTable carries
+ // at most one enum name); on re-injection we can encode only one reason
back. Pick the first set
+ // flag in declaration order so the round-trip is deterministic.
Granularity loss when multiple
+ // flags are true is inherent to the single-string wire format — the
user-visible "DISTINCT is
+ // partial" signal is preserved because any one of the three flags
independently sets
+ // partial-ness on BrokerResponseNative.
+ BaseResultsBlock.EarlyTerminationReason distinctReason = null;
+ if (_maxRowsInDistinctReached) {
+ distinctReason =
BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_ROWS;
+ } else if (_maxRowsWithoutChangeInDistinctReached) {
+ distinctReason =
BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_ROWS_WITHOUT_CHANGE;
+ } else if (_maxExecutionTimeInDistinctReached) {
+ distinctReason =
BaseResultsBlock.EarlyTerminationReason.DISTINCT_MAX_EXECUTION_TIME;
+ }
+ if (distinctReason != null) {
+ metadata.put(DataTable.MetadataKey.EARLY_TERMINATION_REASON.getName(),
distinctReason.name());
+ }
+
+ // Exceptions: copy each accumulated exception onto the DataTable.
Last-write-wins on error-code
+ // collision (wire format is Map<Integer, String>).
+ for (QueryProcessingException qpe : _processingExceptions) {
+ dataTable.addException(qpe.getErrorCode(), qpe.getMessage());
+ }
+
+ // Trace: JSON-encode the per-server map into a single TRACE_INFO metadata
entry. On downstream
+ // readback the aggregator reads it as one string under the synthetic
server's name.
+ if (_enableTrace && !_traceInfo.isEmpty()) {
+ try {
+ metadata.put(DataTable.MetadataKey.TRACE_INFO.getName(),
JsonUtils.objectToString(_traceInfo));
+ } catch (JsonProcessingException e) {
+ throw new IllegalStateException("Failed to serialize trace info for
merged DataTable", e);
+ }
+ }
+ }
+
+ private static void putLong(Map<String, String> metadata,
DataTable.MetadataKey key, long value) {
+ metadata.put(key.getName(), Long.toString(value));
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 942544dab50..839948f88be 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -23,6 +23,7 @@ import it.unimi.dsi.fastutil.floats.FloatArrayList;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import java.io.IOException;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
@@ -37,6 +38,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.pinot.common.CustomObject;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
import org.apache.pinot.common.metrics.BrokerGauge;
import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -46,6 +48,9 @@ import
org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import org.apache.pinot.core.common.datatable.DataTableBuilderUtils;
import org.apache.pinot.core.data.table.IndexedTable;
import org.apache.pinot.core.data.table.Record;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
@@ -120,6 +125,49 @@ public class GroupByDataTableReducer implements
DataTableReducer {
}
}
+ @Override
+ public DataTable mergeDataTablesOnly(String tableName, DataSchema dataSchema,
+ Map<ServerRoutingInstance, DataTable> dataTableMap,
DataTableReducerContext reducerContext,
+ BrokerMetrics brokerMetrics) {
+ // When servers are configured to return final aggregate state, the input
DataTables hold final
+ // (not intermediate) values, so the merge-only contract — "produce an
intermediate DataTable that
+ // can be re-merged via the normal reduce path" — cannot be honored.
+ if (_queryContext.isServerReturnFinalResult()) {
+ throw new UnsupportedOperationException(
+ "Merge-only reduction is not supported when servers return final
aggregate results "
+ + "(server.returnFinalResult / isServerReturnFinalResult); input
would be final-typed, "
+ + "not intermediate.");
+ }
+ dataSchema =
ReducerDataSchemaUtils.canonicalizeDataSchemaForGroupBy(_queryContext,
dataSchema);
+ try {
+ if (dataTableMap.isEmpty()) {
+ return DataTableBuilderFactory.getDataTableBuilder(dataSchema).build();
+ }
+ Collection<DataTable> dataTables = dataTableMap.values();
+ // Reuse the regular reduce's merge: builds the IndexedTable of group
keys + intermediate agg state.
+ IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables,
reducerContext);
+ DataTable mergedDataTable = buildIntermediateDataTable(dataSchema,
indexedTable);
+ if (indexedTable.isTrimmed() && _queryContext.isUnsafeTrim()) {
+
mergedDataTable.getMetadata().put(MetadataKey.GROUPS_TRIMMED.getName(), "true");
+ }
+ if (anyNumGroupsLimitReached(dataTables)) {
+
mergedDataTable.getMetadata().put(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(),
"true");
+ }
+ return mergedDataTable;
+ } catch (IOException e) {
+ throw new RuntimeException("Caught IOException while building merged
intermediate DataTable for group-by", e);
+ }
+ }
+
+ private static boolean anyNumGroupsLimitReached(Collection<DataTable>
dataTables) {
+ for (DataTable dataTable : dataTables) {
+ if
(Boolean.parseBoolean(dataTable.getMetadata().get(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName())))
{
+ return true;
+ }
+ }
+ return false;
+ }
+
/// Reduces group-by results into a [ResultTable] and set it into the
[BrokerResponseNative].
private void reduceResult(BrokerResponseNative brokerResponseNative,
DataSchema dataSchema,
Collection<DataTable> dataTables, DataTableReducerContext
reducerContext, String rawTableName,
@@ -482,4 +530,74 @@ public class GroupByDataTableReducer implements
DataTableReducer {
throw new IllegalStateException("Illegal column data type in group
key: " + columnDataType);
}
}
+
+ /**
+ * Serializes the merged {@link IndexedTable} back into an intermediate
{@link DataTable}, mirroring
+ * {@code GroupByResultsBlock#getDataTable()} so the output is byte-shape
identical to a single
+ * server's intermediate group-by response. Group-key columns are written by
stored type; OBJECT
+ * aggregate columns via {@link
AggregationFunction#serializeIntermediateResult}. No limit / HAVING /
+ * post-aggregation / formatting is applied.
+ */
+ private DataTable buildIntermediateDataTable(DataSchema dataSchema,
IndexedTable indexedTable)
+ throws IOException {
+ DataTableBuilder dataTableBuilder =
DataTableBuilderFactory.getDataTableBuilder(dataSchema);
+ ColumnDataType[] storedColumnDataTypes =
dataSchema.getStoredColumnDataTypes();
+ Iterator<Record> iterator = indexedTable.iterator();
+ if (_queryContext.isNullHandlingEnabled()) {
+ RoaringBitmap[] nullBitmaps = new RoaringBitmap[_numColumns];
+ Object[] nullPlaceholders = new Object[_numColumns];
+ for (int colId = 0; colId < _numColumns; colId++) {
+ nullBitmaps[colId] = new RoaringBitmap();
+ nullPlaceholders[colId] =
storedColumnDataTypes[colId].getNullPlaceholder();
+ }
+ int rowId = 0;
+ while (iterator.hasNext()) {
+ QueryThreadContext.checkTerminationAndSampleUsagePeriodically(rowId,
"GroupByDataTableReducer#merge");
+ dataTableBuilder.startRow();
+ Object[] values = iterator.next().getValues();
+ for (int i = 0; i < _numColumns; i++) {
+ Object value = values[i];
+ if (storedColumnDataTypes[i] == ColumnDataType.OBJECT) {
+ if (value == null) {
+ dataTableBuilder.setNull(i);
+ } else {
+ dataTableBuilder.setColumn(i,
+ _aggregationFunctions[i -
_numGroupByExpressions].serializeIntermediateResult(value));
+ }
+ } else {
+ if (value == null) {
+ value = nullPlaceholders[i];
+ nullBitmaps[i].add(rowId);
+ }
+ DataTableBuilderUtils.setColumn(dataTableBuilder,
storedColumnDataTypes[i], i, value);
+ }
+ }
+ dataTableBuilder.finishRow();
+ rowId++;
+ }
+ for (RoaringBitmap nullBitmap : nullBitmaps) {
+ dataTableBuilder.setNullRowIds(nullBitmap);
+ }
+ } else {
+ int rowId = 0;
+ while (iterator.hasNext()) {
+ QueryThreadContext.checkTerminationAndSampleUsagePeriodically(rowId++,
"GroupByDataTableReducer#merge");
+ dataTableBuilder.startRow();
+ Object[] values = iterator.next().getValues();
+ for (int i = 0; i < _numColumns; i++) {
+ Object value = values[i];
+ if (value == null) {
+ dataTableBuilder.setNull(i);
+ } else if (storedColumnDataTypes[i] == ColumnDataType.OBJECT) {
+ dataTableBuilder.setColumn(i,
+ _aggregationFunctions[i -
_numGroupByExpressions].serializeIntermediateResult(value));
+ } else {
+ DataTableBuilderUtils.setColumn(dataTableBuilder,
storedColumnDataTypes[i], i, value);
+ }
+ }
+ dataTableBuilder.finishRow();
+ }
+ }
+ return dataTableBuilder.build();
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregatorTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregatorTest.java
new file mode 100644
index 00000000000..608287b05be
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/ExecutionStatsAggregatorTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import
org.apache.pinot.core.operator.blocks.results.BaseResultsBlock.EarlyTerminationReason;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.spi.config.table.TableType;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Focused tests for {@link
ExecutionStatsAggregator#setStatsOnMergedDataTable(DataTable)} —
+ * specifically the {@link MetadataKey#EARLY_TERMINATION_REASON} round-trip
for DISTINCT queries.
+ *
+ * <p>The aggregator collapses a single-string wire-format reason into three
independent booleans
+ * (one per {@link EarlyTerminationReason}); the round-trip can encode only
one reason back. These
+ * tests pin:
+ * <ul>
+ * <li>Each of the three reasons round-trips its own boolean when it is the
only one set.
+ * <li>When multiple booleans are set (different per-server reasons
OR-reduced), the round-trip
+ * encodes the first set flag in declaration order. The user-visible
"DISTINCT is partial"
+ * signal is preserved because any one flag independently sets
partial-ness on
+ * {@link BrokerResponseNative}.
+ * <li>When no DISTINCT early-termination booleans are set, the key is
absent from the merged
+ * metadata (so the downstream decoder does not see a stray non-DISTINCT
reason and the
+ * merged DataTable is indistinguishable from one whose inputs all
completed normally).
+ * </ul>
+ */
+public class ExecutionStatsAggregatorTest {
+
+ private static final ServerRoutingInstance SERVER =
+ new ServerRoutingInstance("localhost", 8080, TableType.OFFLINE);
+
+ /** Build an empty DataTable carrying a single {@code
EARLY_TERMINATION_REASON} metadata entry. */
+ private static DataTable serverTableWithReason(EarlyTerminationReason
reason) {
+ DataTable dt = DataTableBuilderFactory.getEmptyDataTable();
+ dt.getMetadata().put(MetadataKey.EARLY_TERMINATION_REASON.getName(),
reason.name());
+ return dt;
+ }
+
+ private static DataTable emptyServerTable() {
+ return DataTableBuilderFactory.getEmptyDataTable();
+ }
+
+ /**
+ * Drives one aggregate-then-readback round-trip and returns the three
DISTINCT booleans the
+ * downstream aggregator would flip from the merged DataTable's metadata.
Mirrors the production
+ * sequence: aggregate per-server inputs → serialize onto merged DataTable →
reduce path
+ * re-aggregates the merged DataTable on a fresh aggregator.
+ */
+ private static boolean[] roundTrip(DataTable... inputs) {
+ ExecutionStatsAggregator producer = new ExecutionStatsAggregator(false);
+ for (DataTable dt : inputs) {
+ producer.aggregate(SERVER, dt);
+ }
+ DataTable merged = DataTableBuilderFactory.getEmptyDataTable();
+ producer.setStatsOnMergedDataTable(merged);
+
+ ExecutionStatsAggregator consumer = new ExecutionStatsAggregator(false);
+ consumer.aggregate(SERVER, merged);
+ BrokerResponseNative response = new BrokerResponseNative();
+ consumer.setStats("testTable", response, mock(BrokerMetrics.class));
+ return new boolean[]{
+ response.isMaxRowsInDistinctReached(),
+ response.isMaxRowsWithoutChangeInDistinctReached(),
+ response.isMaxExecutionTimeInDistinctReached()
+ };
+ }
+
+ @Test
+ public void testDistinctMaxRowsReachedRoundTrip() {
+ boolean[] flags =
roundTrip(serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_ROWS));
+ assertTrue(flags[0], "_maxRowsInDistinctReached must round-trip");
+ assertFalse(flags[1], "other DISTINCT flags must stay false");
+ assertFalse(flags[2], "other DISTINCT flags must stay false");
+ }
+
+ @Test
+ public void testDistinctMaxRowsWithoutChangeReachedRoundTrip() {
+ boolean[] flags =
roundTrip(serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_ROWS_WITHOUT_CHANGE));
+ assertFalse(flags[0]);
+ assertTrue(flags[1], "_maxRowsWithoutChangeInDistinctReached must
round-trip");
+ assertFalse(flags[2]);
+ }
+
+ @Test
+ public void testDistinctMaxExecutionTimeReachedRoundTrip() {
+ boolean[] flags =
roundTrip(serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_EXECUTION_TIME));
+ assertFalse(flags[0]);
+ assertFalse(flags[1]);
+ assertTrue(flags[2], "_maxExecutionTimeInDistinctReached must round-trip");
+ }
+
+ /**
+ * Two inputs hit DIFFERENT DISTINCT early-termination reasons. The
aggregator OR-reduces both
+ * into independent booleans, but the wire format is one string per
DataTable. Round-trip can
+ * encode only one reason back; per the implementation contract, it picks
the first set flag in
+ * declaration order — {@code DISTINCT_MAX_ROWS} when both rows-reached and
execution-time are
+ * set.
+ *
+ * <p>The user-visible "DISTINCT is partial" signal is still preserved: at
least one of the
+ * three flags is true post-round-trip. The exact reason granularity is
best-effort.
+ */
+ @Test
+ public void testMultipleReasonsEncodeFirstInDeclarationOrder() {
+ boolean[] flags = roundTrip(
+
serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_EXECUTION_TIME),
+ serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_ROWS));
+ assertTrue(flags[0], "DISTINCT_MAX_ROWS wins over
DISTINCT_MAX_EXECUTION_TIME in priority order");
+ assertFalse(flags[1]);
+ assertFalse(flags[2], "execution-time flag is dropped — single-string wire
format can only carry one reason");
+ }
+
+ /**
+ * No DISTINCT early-termination on any input. The merged DataTable must NOT
carry an
+ * {@code EARLY_TERMINATION_REASON} key — writing one would mislead the
downstream decoder, and
+ * an empty/false value is not part of the wire-format vocabulary (the
consumer's
+ * {@code EarlyTerminationReason.valueOf} would throw on "" or "false" and
the
+ * {@code IllegalArgumentException} would silently mask any other reason we
tried to encode).
+ */
+ @Test
+ public void testNoReasonProducesAbsentMetadataKey() {
+ ExecutionStatsAggregator producer = new ExecutionStatsAggregator(false);
+ producer.aggregate(SERVER, emptyServerTable());
+ DataTable merged = DataTableBuilderFactory.getEmptyDataTable();
+ producer.setStatsOnMergedDataTable(merged);
+
+
assertNull(merged.getMetadata().get(MetadataKey.EARLY_TERMINATION_REASON.getName()),
+ "no DISTINCT early-termination → key must be absent on merged
DataTable");
+
+ // Sanity: re-aggregation of the merged DataTable leaves all three flags
false.
+ boolean[] flags = roundTrip(emptyServerTable());
+ assertFalse(flags[0]);
+ assertFalse(flags[1]);
+ assertFalse(flags[2]);
+ }
+
+ /**
+ * Same DISTINCT reason reported by multiple inputs round-trips to the same
single boolean. The
+ * aggregator's OR-reduce makes this a trivial case, but pinning it catches
a future regression
+ * where setStatsOnMergedDataTable starts skipping the reason on repeats.
+ */
+ @Test
+ public void testSameReasonFromMultipleInputsRoundTripsToOneBoolean() {
+ boolean[] flags = roundTrip(
+ serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_ROWS),
+ serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_ROWS),
+ serverTableWithReason(EarlyTerminationReason.DISTINCT_MAX_ROWS));
+ assertTrue(flags[0]);
+ assertFalse(flags[1]);
+ assertFalse(flags[2]);
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/MergeDataTablesOnlyTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/MergeDataTablesOnlyTest.java
new file mode 100644
index 00000000000..7aea1fe90fb
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/MergeDataTablesOnlyTest.java
@@ -0,0 +1,577 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.datatable.DataTable.MetadataKey;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.utils.CommonConstants.Broker;
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
+import org.roaringbitmap.RoaringBitmap;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+
+
+/**
+ * Verifies the core merge-only contract: reducing the raw per-server
DataTable map produces the same
+ * final result as reducing a single re-injected merged intermediate
DataTable, i.e.
+ * <pre>reduceOnDataTable(rawMap) == reduceOnDataTable({ syntheticKey ->
mergeOnDataTable(rawMap) })</pre>
+ * for the supported reducers (Aggregation, Group-by, Distinct), including the
OBJECT-column
+ * (DISTINCTCOUNT) round-trip. Also checks that the merged DataTable carries
the intermediate schema and
+ * that unsupported reducers (Selection) throw.
+ */
+public class MergeDataTablesOnlyTest {
+ private static final long TIMEOUT_MS = 60_000L;
+ private BrokerReduceService _reduceService;
+
+ @BeforeClass
+ public void setUp() {
+ _reduceService =
+ new BrokerReduceService(new
PinotConfiguration(Map.of(Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY, 2)));
+ }
+
+ @AfterClass
+ public void tearDown() {
+ _reduceService.shutDown();
+ }
+
+ @Test
+ public void testAggregationScalarRoundTrip() {
+ String query = "SELECT COUNT(*), SUM(m), MIN(m), MAX(m) FROM testTable";
+ DataSchema schema = new DataSchema(new String[]{"count(*)", "sum(m)",
"min(m)", "max(m)"},
+ new ColumnDataType[]{ColumnDataType.LONG, ColumnDataType.DOUBLE,
ColumnDataType.DOUBLE, ColumnDataType.DOUBLE});
+ List<DataTable> serverTables = List.of(
+ buildRow(schema, 3L, 10.0, 2.0, 8.0),
+ buildRow(schema, 2L, 5.0, 1.0, 9.0),
+ buildRow(schema, 4L, 20.0, 3.0, 7.0));
+ assertRoundTrip(query, serverTables);
+ }
+
+ @Test
+ public void testDistinctCountObjectRoundTrip()
+ throws IOException {
+ String query = "SELECT DISTINCTCOUNT(col1) FROM testTable";
+ AggregationFunction aggFunction = aggFunctions(query)[0];
+ DataSchema schema =
+ new DataSchema(new String[]{"distinctcount(col1)"}, new
ColumnDataType[]{ColumnDataType.OBJECT});
+ List<DataTable> serverTables = List.of(
+ buildObjectRow(schema, aggFunction, new IntOpenHashSet(new int[]{1, 2,
3})),
+ buildObjectRow(schema, aggFunction, new IntOpenHashSet(new int[]{3, 4,
5})));
+ assertRoundTrip(query, serverTables);
+ }
+
+ @Test
+ public void testGroupByRoundTrip() {
+ String query = "SELECT col1, COUNT(*), SUM(m) FROM testTable GROUP BY
col1";
+ DataSchema schema = new DataSchema(new String[]{"col1", "count(*)",
"sum(m)"},
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG,
ColumnDataType.DOUBLE});
+ List<DataTable> serverTables = List.of(
+ buildGroupBy(schema, new Object[][]{{1, 2L, 10.0}, {2, 1L, 5.0}}),
+ buildGroupBy(schema, new Object[][]{{1, 3L, 20.0}, {3, 4L, 8.0}}));
+ assertRoundTrip(query, serverTables);
+ }
+
+ @Test
+ public void testDistinctRoundTrip() {
+ String query = "SELECT DISTINCT col1 FROM testTable";
+ DataSchema schema = new DataSchema(new String[]{"col1"}, new
ColumnDataType[]{ColumnDataType.INT});
+ List<DataTable> serverTables = List.of(
+ buildGroupBy(schema, new Object[][]{{1}, {2}}),
+ buildGroupBy(schema, new Object[][]{{2}, {3}}));
+ assertRoundTrip(query, serverTables);
+ }
+
+ @Test
+ public void testAggregationServerReturnFinalResultRejected() {
+ // Under server.returnFinalResult, the per-server DataTables hold
finalized (not intermediate)
+ // aggregate state; the merge-only contract cannot be honored, so the
reducer must throw rather
+ // than silently merge final-typed values as if they were intermediates.
+ BrokerRequest brokerRequest =
CalciteSqlCompiler.compileToBrokerRequest("SELECT COUNT(*) FROM testTable");
+
brokerRequest.getPinotQuery().putToQueryOptions(Broker.Request.QueryOptionKey.SERVER_RETURN_FINAL_RESULT,
"true");
+ DataSchema schema = new DataSchema(new String[]{"count(*)"}, new
ColumnDataType[]{ColumnDataType.LONG});
+ Map<ServerRoutingInstance, DataTable> map = singletonMap(buildRow(schema,
5L));
+ assertThrows(UnsupportedOperationException.class, () ->
merge(brokerRequest, map));
+ }
+
+ @Test
+ public void testGroupByServerReturnFinalResultRejected() {
+ BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(
+ "SELECT col1, COUNT(*) FROM testTable GROUP BY col1");
+
brokerRequest.getPinotQuery().putToQueryOptions(Broker.Request.QueryOptionKey.SERVER_RETURN_FINAL_RESULT,
"true");
+ DataSchema schema = new DataSchema(new String[]{"col1", "count(*)"},
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG});
+ Map<ServerRoutingInstance, DataTable> map =
singletonMap(buildGroupBy(schema, new Object[][]{{1, 2L}}));
+ assertThrows(UnsupportedOperationException.class, () ->
merge(brokerRequest, map));
+ }
+
+ @Test
+ public void testGroupByLimitZeroRoundTrip() {
+ // LIMIT 0 on group-by: mergeDataTablesOnly does NOT apply LIMIT
(intermediate is unlimited),
+ // so a non-empty intermediate DataTable is produced. The re-injected
reduce path applies the
+ // LIMIT and produces an empty final result, matching the direct reduce.
+ String query = "SELECT col1, COUNT(*) FROM testTable GROUP BY col1 LIMIT
0";
+ DataSchema schema = new DataSchema(new String[]{"col1", "count(*)"},
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG});
+ List<DataTable> serverTables = List.of(
+ buildGroupBy(schema, new Object[][]{{1, 2L}}),
+ buildGroupBy(schema, new Object[][]{{2, 3L}}));
+ assertRoundTrip(query, serverTables);
+ }
+
+ @Test
+ public void testMergedDataTableCarriesIntermediateSchema() {
+ String query = "SELECT COUNT(*), SUM(m), DISTINCTCOUNT(col1) FROM
testTable";
+ AggregationFunction[] aggFunctions = aggFunctions(query);
+ // Build intermediate rows: COUNT->LONG, SUM->DOUBLE, DISTINCTCOUNT->OBJECT
+ DataSchema schema = new DataSchema(new String[]{"count(*)", "sum(m)",
"distinctcount(col1)"},
+ new ColumnDataType[]{ColumnDataType.LONG, ColumnDataType.DOUBLE,
ColumnDataType.OBJECT});
+ DataTableBuilder builder =
DataTableBuilderFactory.getDataTableBuilder(schema);
+ try {
+ builder.startRow();
+ builder.setColumn(0, 5L);
+ builder.setColumn(1, 12.0);
+ builder.setColumn(2, aggFunctions[2].serializeIntermediateResult(new
IntOpenHashSet(new int[]{1, 2})));
+ builder.finishRow();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ Map<ServerRoutingInstance, DataTable> map = singletonMap(builder.build());
+
+ DataTable merged = merge(query, map);
+ assertNotNull(merged);
+ ColumnDataType[] mergedTypes = merged.getDataSchema().getColumnDataTypes();
+ for (int i = 0; i < aggFunctions.length; i++) {
+ assertEquals(mergedTypes[i],
aggFunctions[i].getIntermediateResultColumnType(),
+ "merged column " + i + " must carry the intermediate type, not the
finalized type");
+ }
+ }
+
+ @Test
+ public void testEmptyMapReturnsNull() {
+ assertNull(merge("SELECT COUNT(*) FROM testTable", new HashMap<>()));
+ }
+
+ @Test
+ public void testAllEmptyDataTablesReturnsNull() {
+ // A metadata-only (0-row, null schema) data table is dropped, leaving
nothing to merge.
+ DataSchema schema = new DataSchema(new String[]{"count(*)"}, new
ColumnDataType[]{ColumnDataType.LONG});
+ DataTable emptyMetadataOnly =
DataTableBuilderFactory.getDataTableBuilder(schema).build().toMetadataOnlyDataTable();
+ assertNull(merge("SELECT COUNT(*) FROM testTable",
singletonMap(emptyMetadataOnly)));
+ }
+
+ @Test
+ public void testSelectionIsUnsupported() {
+ String query = "SELECT col1 FROM testTable";
+ DataSchema schema = new DataSchema(new String[]{"col1"}, new
ColumnDataType[]{ColumnDataType.INT});
+ Map<ServerRoutingInstance, DataTable> map =
singletonMap(buildGroupBy(schema, new Object[][]{{1}, {2}}));
+ // SelectionDataTableReducer inherits the default-throwing
mergeDataTablesOnly (selection is out of
+ // scope for merge-only reduction).
+ assertThrows(UnsupportedOperationException.class, () -> merge(query, map));
+ }
+
+ @Test
+ public void testNullHandlingAggregationRoundTrip()
+ throws IOException {
+ BrokerRequest brokerRequest =
CalciteSqlCompiler.compileToBrokerRequest("SELECT MIN(m) FROM testTable");
+
brokerRequest.getPinotQuery().putToQueryOptions(Broker.Request.QueryOptionKey.ENABLE_NULL_HANDLING,
"true");
+ DataSchema schema = new DataSchema(new String[]{"min(m)"}, new
ColumnDataType[]{ColumnDataType.DOUBLE});
+ // One server has a value, the other contributes a null (e.g. all rows
filtered out).
+ List<DataTable> serverTables = List.of(buildNullableDouble(schema, 5.0),
buildNullableDouble(schema, null));
+ assertRoundTrip(brokerRequest, serverTables);
+ }
+
+ @Test
+ public void testConflictingSchemaSurfacedAsIncompleteMerge() {
+ // When one server's column types conflict with the first non-empty
table's, filterDataTablesAndPickSchema
+ // drops that server. The merge proceeds on the remaining servers and the
returned DataTable carries
+ // the INCOMPLETE_MERGE flag so a downstream consumer can detect that the
merge ran over a strict
+ // subset of inputs.
+ String query = "SELECT COUNT(*) FROM testTable";
+ DataSchema schemaLong = new DataSchema(new String[]{"count(*)"}, new
ColumnDataType[]{ColumnDataType.LONG});
+ DataSchema schemaInt = new DataSchema(new String[]{"count(*)"}, new
ColumnDataType[]{ColumnDataType.INT});
+ DataTable t1 = buildRow(schemaLong, 5L);
+ DataTable t2 = buildRow(schemaLong, 7L);
+ // Conflicting column data type (INT vs LONG) —
filterDataTablesAndPickSchema will drop t3.
+ DataTable t3 = buildRow(schemaInt, 99);
+
+ DataTable merged = merge(query, toMap(List.of(t1, t2, t3)));
+ assertNotNull(merged);
+
assertEquals(merged.getMetadata().get(MetadataKey.INCOMPLETE_MERGE.getName()),
"true",
+ "INCOMPLETE_MERGE must be surfaced when a server is dropped due to
schema conflict");
+ }
+
+ @Test
+ public void testAllSameSchemaDoesNotSetIncompleteMerge() {
+ String query = "SELECT COUNT(*) FROM testTable";
+ DataSchema schema = new DataSchema(new String[]{"count(*)"}, new
ColumnDataType[]{ColumnDataType.LONG});
+ DataTable merged = merge(query, toMap(List.of(buildRow(schema, 5L),
buildRow(schema, 7L))));
+ assertNotNull(merged);
+
assertNull(merged.getMetadata().get(MetadataKey.INCOMPLETE_MERGE.getName()),
+ "INCOMPLETE_MERGE must not be set on a clean merge");
+ }
+
+ @Test
+ public void testAdditiveStatsAggregatedOntoMergedMetadata() {
+ // Two inputs carry numDocsScanned + threadCpuTimeNs in their metadata;
the merged DataTable
+ // should sum them so a downstream re-reduce sees the same totals.
+ String query = "SELECT COUNT(*) FROM testTable";
+ DataSchema schema = new DataSchema(new String[]{"count(*)"}, new
ColumnDataType[]{ColumnDataType.LONG});
+ DataTable t1 = buildRow(schema, 5L);
+ DataTable t2 = buildRow(schema, 7L);
+ t1.getMetadata().put(MetadataKey.NUM_DOCS_SCANNED.getName(), "100");
+ t1.getMetadata().put(MetadataKey.THREAD_CPU_TIME_NS.getName(), "1000");
+ t2.getMetadata().put(MetadataKey.NUM_DOCS_SCANNED.getName(), "250");
+ t2.getMetadata().put(MetadataKey.THREAD_CPU_TIME_NS.getName(), "3000");
+
+ DataTable merged = merge(query, toMap(List.of(t1, t2)));
+ assertNotNull(merged);
+
assertEquals(merged.getMetadata().get(MetadataKey.NUM_DOCS_SCANNED.getName()),
"350");
+
assertEquals(merged.getMetadata().get(MetadataKey.THREAD_CPU_TIME_NS.getName()),
"4000");
+ }
+
+ @Test
+ public void testAdditiveStatsAbsentFromAllInputsWriteZero() {
+ // Mirrors setStats()'s unconditional-write pattern: when none of the
inputs carry an additive
+ // stats key, the merged DataTable carries "0" for it. Downstream's
Long.parseLong("0") and
+ // null-check both yield 0, so the user-facing BrokerResponse is identical
either way.
+ String query = "SELECT COUNT(*) FROM testTable";
+ DataSchema schema = new DataSchema(new String[]{"count(*)"}, new
ColumnDataType[]{ColumnDataType.LONG});
+ DataTable merged = merge(query, toMap(List.of(buildRow(schema, 5L),
buildRow(schema, 7L))));
+ assertNotNull(merged);
+
assertEquals(merged.getMetadata().get(MetadataKey.NUM_DOCS_SCANNED.getName()),
"0");
+ }
+
+ @Test
+ public void testMinConsumingFreshnessTimeMsTakesMin() {
+ // MIN_CONSUMING_FRESHNESS_TIME_MS semantically captures the WORST
freshness across servers
+ // (used by FRESHNESS_LAG_MS); the merge must MIN-reduce, not SUM.
+ String query = "SELECT COUNT(*) FROM testTable";
+ DataSchema schema = new DataSchema(new String[]{"count(*)"}, new
ColumnDataType[]{ColumnDataType.LONG});
+ DataTable t1 = buildRow(schema, 5L);
+ DataTable t2 = buildRow(schema, 7L);
+
t1.getMetadata().put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
"1000");
+
t2.getMetadata().put(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName(),
"500");
+
+ DataTable merged = merge(query, toMap(List.of(t1, t2)));
+ assertNotNull(merged);
+
assertEquals(merged.getMetadata().get(MetadataKey.MIN_CONSUMING_FRESHNESS_TIME_MS.getName()),
"500");
+ }
+
+ @Test
+ public void testStatsFromDroppedZeroRowServersStillCounted() {
+ // A 0-row metadata-only DataTable is dropped from the merge inputs by
filterDataTablesAndPickSchema,
+ // but its stats must still be aggregated (matches reduceOnDataTable's
behavior — the aggregator
+ // runs BEFORE the iterator.remove()).
+ String query = "SELECT COUNT(*) FROM testTable";
+ DataSchema schema = new DataSchema(new String[]{"count(*)"}, new
ColumnDataType[]{ColumnDataType.LONG});
+ DataTable withData = buildRow(schema, 5L);
+ withData.getMetadata().put(MetadataKey.NUM_DOCS_SCANNED.getName(), "100");
+ // A 0-row server (e.g. all rows filtered out) still reports stats.
+ DataTable emptyRows =
DataTableBuilderFactory.getDataTableBuilder(schema).build();
+ emptyRows.getMetadata().put(MetadataKey.NUM_DOCS_SCANNED.getName(), "50");
+
+ DataTable merged = merge(query, toMap(List.of(withData, emptyRows)));
+ assertNotNull(merged);
+
assertEquals(merged.getMetadata().get(MetadataKey.NUM_DOCS_SCANNED.getName()),
"150");
+ }
+
+ @Test
+ public void testAdditiveStatsRoundTripThroughReduce() {
+ // End-to-end: aggregating stats on the merged DataTable means a
downstream reduce on the
+ // re-injected merged DataTable sees the same numDocsScanned the direct
reduce would.
+ BrokerRequest brokerRequest =
CalciteSqlCompiler.compileToBrokerRequest("SELECT COUNT(*) FROM testTable");
+ DataSchema schema = new DataSchema(new String[]{"count(*)"}, new
ColumnDataType[]{ColumnDataType.LONG});
+ DataTable t1 = buildRow(schema, 5L);
+ DataTable t2 = buildRow(schema, 7L);
+ t1.getMetadata().put(MetadataKey.NUM_DOCS_SCANNED.getName(), "100");
+ t2.getMetadata().put(MetadataKey.NUM_DOCS_SCANNED.getName(), "250");
+
+ BrokerResponseNative direct = reduce(brokerRequest, toMap(List.of(t1,
t2)));
+ DataTable merged = merge(brokerRequest, toMap(List.of(t1, t2)));
+ assertNotNull(merged);
+ BrokerResponseNative viaMerge = reduce(brokerRequest,
singletonMap(merged));
+
+ assertEquals(viaMerge.getNumDocsScanned(), direct.getNumDocsScanned(),
+ "merged DataTable must carry numDocsScanned so a downstream reduce
produces the same total");
+ assertEquals(viaMerge.getNumDocsScanned(), 350L);
+ }
+
+ @Test
+ public void testExceptionsCopiedToMergedDataTable() {
+ // Per-server exceptions are accumulated by the aggregator and written
back onto the merged
+ // DataTable's exception map, so a downstream reduce surfaces them on the
BrokerResponse.
+ BrokerRequest brokerRequest =
CalciteSqlCompiler.compileToBrokerRequest("SELECT COUNT(*) FROM testTable");
+ DataSchema schema = new DataSchema(new String[]{"count(*)"}, new
ColumnDataType[]{ColumnDataType.LONG});
+ DataTable t1 = buildRow(schema, 5L);
+ DataTable t2 = buildRow(schema, 7L);
+ t1.addException(QueryErrorCode.QUERY_EXECUTION, "boom on server 1");
+ t2.addException(QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED, "limit on
server 2");
+
+ DataTable merged = merge(brokerRequest, toMap(List.of(t1, t2)));
+ assertNotNull(merged);
+
assertEquals(merged.getExceptions().get(QueryErrorCode.QUERY_EXECUTION.getId()),
"boom on server 1");
+
assertEquals(merged.getExceptions().get(QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.getId()),
+ "limit on server 2");
+
+ BrokerResponseNative viaMerge = reduce(brokerRequest,
singletonMap(merged));
+ // Downstream reduce surfaces both as response exceptions (one per
distinct error code).
+ assertEquals(viaMerge.getExceptions().size(), 2);
+ }
+
+ @Test
+ public void testTraceInfoCopiedToMergedDataTableWhenEnabled() {
+ // When trace is enabled, per-server trace info is JSON-encoded into the
merged DataTable's
+ // TRACE_INFO metadata entry. The aggregator keys traces by server
short-name (hostname-derived),
+ // so we use distinct hostnames to preserve attribution.
+ BrokerRequest brokerRequest =
CalciteSqlCompiler.compileToBrokerRequest("SELECT COUNT(*) FROM testTable");
+ brokerRequest.getPinotQuery().putToQueryOptions(Broker.Request.TRACE,
"true");
+ DataSchema schema = new DataSchema(new String[]{"count(*)"}, new
ColumnDataType[]{ColumnDataType.LONG});
+ DataTable t1 = buildRow(schema, 5L);
+ DataTable t2 = buildRow(schema, 7L);
+ t1.getMetadata().put(MetadataKey.TRACE_INFO.getName(),
"trace-from-server-1");
+ t2.getMetadata().put(MetadataKey.TRACE_INFO.getName(),
"trace-from-server-2");
+ Map<ServerRoutingInstance, DataTable> map = new HashMap<>();
+ map.put(new ServerRoutingInstance("hostA", 1000, TableType.OFFLINE), t1);
+ map.put(new ServerRoutingInstance("hostB", 1001, TableType.OFFLINE), t2);
+
+ DataTable merged;
+ try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) {
+ merged = _reduceService.mergeOnDataTable(brokerRequest, map, TIMEOUT_MS,
mock(BrokerMetrics.class));
+ }
+ assertNotNull(merged);
+ String mergedTrace =
merged.getMetadata().get(MetadataKey.TRACE_INFO.getName());
+ assertNotNull(mergedTrace, "TRACE_INFO must be present on merged DataTable
when trace is enabled");
+ // JSON-encoded map; both inputs' trace strings must be present.
+ assertTrue(mergedTrace.contains("trace-from-server-1"));
+ assertTrue(mergedTrace.contains("trace-from-server-2"));
+ }
+
+ @Test
+ public void testTraceInfoSkippedWhenTraceDisabled() {
+ // Without trace=true, the aggregator skips trace collection, so the
merged DataTable has no
+ // TRACE_INFO metadata even if inputs carried it.
+ String query = "SELECT COUNT(*) FROM testTable";
+ DataSchema schema = new DataSchema(new String[]{"count(*)"}, new
ColumnDataType[]{ColumnDataType.LONG});
+ DataTable t1 = buildRow(schema, 5L);
+ t1.getMetadata().put(MetadataKey.TRACE_INFO.getName(),
"trace-from-server-1");
+
+ DataTable merged = merge(query, toMap(List.of(t1)));
+ assertNotNull(merged);
+ assertNull(merged.getMetadata().get(MetadataKey.TRACE_INFO.getName()));
+ }
+
+ @Test
+ public void testNumGroupsLimitReachedFlagSurfaced() {
+ String query = "SELECT col1, COUNT(*) FROM testTable GROUP BY col1";
+ DataSchema schema = new DataSchema(new String[]{"col1", "count(*)"},
+ new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG});
+ DataTable t1 = buildGroupBy(schema, new Object[][]{{1, 2L}});
+ DataTable t2 = buildGroupBy(schema, new Object[][]{{2, 3L}});
+ // Simulate a server that hit numGroupsLimit.
+ t2.getMetadata().put(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName(),
"true");
+
+ DataTable merged = merge(query, toMap(List.of(t1, t2)));
+ assertNotNull(merged);
+
assertEquals(merged.getMetadata().get(MetadataKey.NUM_GROUPS_LIMIT_REACHED.getName()),
"true",
+ "NUM_GROUPS_LIMIT_REACHED from an input server must be surfaced on the
merged DataTable");
+ }
+
+ // ---- helpers ----
+
+ /**
+ * Asserts the round-trip equivalence for the given query and per-server
intermediate DataTables.
+ */
+ private void assertRoundTrip(String query, List<DataTable> serverTables) {
+ assertRoundTrip(CalciteSqlCompiler.compileToBrokerRequest(query),
serverTables);
+ }
+
+ private void assertRoundTrip(BrokerRequest brokerRequest, List<DataTable>
serverTables) {
+ BrokerResponseNative baseline = reduce(brokerRequest, toMap(serverTables));
+
+ DataTable merged = merge(brokerRequest, toMap(serverTables));
+ assertNotNull(merged, "merge produced null");
+
+ BrokerResponseNative viaMerge = reduce(brokerRequest,
singletonMap(merged));
+
+ assertResultTablesEquivalent(baseline.getResultTable(),
viaMerge.getResultTable());
+ }
+
+ private BrokerResponseNative reduce(BrokerRequest brokerRequest,
Map<ServerRoutingInstance, DataTable> map) {
+ try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) {
+ return _reduceService.reduceOnDataTable(brokerRequest, brokerRequest,
map, TIMEOUT_MS, mock(BrokerMetrics.class));
+ }
+ }
+
+ private DataTable merge(String query, Map<ServerRoutingInstance, DataTable>
map) {
+ return merge(CalciteSqlCompiler.compileToBrokerRequest(query), map);
+ }
+
+ private DataTable merge(BrokerRequest brokerRequest,
Map<ServerRoutingInstance, DataTable> map) {
+ try (QueryThreadContext ignore = QueryThreadContext.openForSseTest()) {
+ return _reduceService.mergeOnDataTable(brokerRequest, map, TIMEOUT_MS,
mock(BrokerMetrics.class));
+ }
+ }
+
+ private static AggregationFunction[] aggFunctions(String query) {
+ QueryContext queryContext =
+
QueryContextConverterUtils.getQueryContext(CalciteSqlCompiler.compileToBrokerRequest(query).getPinotQuery());
+ AggregationFunction[] aggregationFunctions =
queryContext.getAggregationFunctions();
+ assertNotNull(aggregationFunctions);
+ return aggregationFunctions;
+ }
+
+ private static DataTable buildRow(DataSchema schema, Object... values) {
+ DataTableBuilder builder =
DataTableBuilderFactory.getDataTableBuilder(schema);
+ try {
+ appendRow(builder, schema, values);
+ return builder.build();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static DataTable buildGroupBy(DataSchema schema, Object[][] rows) {
+ DataTableBuilder builder =
DataTableBuilderFactory.getDataTableBuilder(schema);
+ try {
+ for (Object[] row : rows) {
+ appendRow(builder, schema, row);
+ }
+ return builder.build();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static DataTable buildObjectRow(DataSchema schema,
AggregationFunction aggFunction, Object intermediate)
+ throws IOException {
+ DataTableBuilder builder =
DataTableBuilderFactory.getDataTableBuilder(schema);
+ builder.startRow();
+ builder.setColumn(0,
aggFunction.serializeIntermediateResult(intermediate));
+ builder.finishRow();
+ return builder.build();
+ }
+
+ /** Builds a single-row, single-DOUBLE-column intermediate DataTable with
null-handling encoding. */
+ private static DataTable buildNullableDouble(DataSchema schema, Double value)
+ throws IOException {
+ DataTableBuilder builder =
DataTableBuilderFactory.getDataTableBuilder(schema);
+ RoaringBitmap nullBitmap = new RoaringBitmap();
+ builder.startRow();
+ if (value == null) {
+ builder.setColumn(0, ((Number)
ColumnDataType.DOUBLE.getNullPlaceholder()).doubleValue());
+ nullBitmap.add(0);
+ } else {
+ builder.setColumn(0, (double) value);
+ }
+ builder.finishRow();
+ builder.setNullRowIds(nullBitmap);
+ return builder.build();
+ }
+
+ private static void appendRow(DataTableBuilder builder, DataSchema schema,
Object[] values)
+ throws IOException {
+ builder.startRow();
+ ColumnDataType[] columnDataTypes = schema.getColumnDataTypes();
+ for (int i = 0; i < values.length; i++) {
+ switch (columnDataTypes[i]) {
+ case INT:
+ builder.setColumn(i, (int) values[i]);
+ break;
+ case LONG:
+ builder.setColumn(i, (long) values[i]);
+ break;
+ case DOUBLE:
+ builder.setColumn(i, (double) values[i]);
+ break;
+ case STRING:
+ builder.setColumn(i, (String) values[i]);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported test column type: "
+ columnDataTypes[i]);
+ }
+ }
+ builder.finishRow();
+ }
+
+ private static Map<ServerRoutingInstance, DataTable> toMap(List<DataTable>
serverTables) {
+ Map<ServerRoutingInstance, DataTable> map = new HashMap<>();
+ for (int i = 0; i < serverTables.size(); i++) {
+ map.put(new ServerRoutingInstance("localhost", 1000 + i,
TableType.OFFLINE), serverTables.get(i));
+ }
+ return map;
+ }
+
+ private static Map<ServerRoutingInstance, DataTable> singletonMap(DataTable
dataTable) {
+ Map<ServerRoutingInstance, DataTable> map = new HashMap<>();
+ map.put(new ServerRoutingInstance("Server_merge", 0, TableType.OFFLINE),
dataTable);
+ return map;
+ }
+
+ /**
+ * Asserts the two result tables have the same schema and the same set of
rows (order-independent, so
+ * the comparison is robust for unordered group-by / distinct results).
+ */
+ private static void assertResultTablesEquivalent(ResultTable expected,
ResultTable actual) {
+ assertNotNull(expected);
+ assertNotNull(actual);
+ assertEquals(actual.getDataSchema(), expected.getDataSchema());
+ assertEquals(actual.getRows().size(), expected.getRows().size());
+ assertTrue(sortedRows(actual).equals(sortedRows(expected)),
+ "rows differ:\n expected=" + sortedRows(expected) + "\n actual =" +
sortedRows(actual));
+ }
+
+ private static List<String> sortedRows(ResultTable resultTable) {
+ List<String> rows = new ArrayList<>();
+ for (Object[] row : resultTable.getRows()) {
+ rows.add(Arrays.deepToString(row));
+ }
+ return rows.stream().sorted().collect(Collectors.toList());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]