Copilot commented on code in PR #17825:
URL: https://github.com/apache/pinot/pull/17825#discussion_r2901476780
##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java:
##########
@@ -106,6 +116,32 @@ private GenericRowFileManager doReduce()
long rollupFileCreationStartTimeMs = System.currentTimeMillis();
_rollupFileManager = new GenericRowFileManager(partitionOutputDir,
fieldSpecs, includeNullFields, 0);
GenericRowFileWriter rollupFileWriter = _rollupFileManager.getFileWriter();
+
+ // Check if any aggregators support batch aggregation
+ boolean useBatchAggregation = aggregatorContextList.stream()
+ .anyMatch(ctx -> ctx._aggregator.supportsBatchAggregation());
+
+ if (useBatchAggregation) {
+ LOGGER.info("Using batch aggregation for partition: {}", _partitionId);
+ reduceBatch(recordReader, numRows, aggregatorContextList,
rollupFileWriter, includeNullFields);
+ } else {
+ reducePairwise(recordReader, numRows, aggregatorContextList,
rollupFileWriter, includeNullFields);
+ }
Review Comment:
`useBatchAggregation` is enabled if *any* metric aggregator supports batch
aggregation, but `reduceBatch()` then buffers values for *all* metric columns
(including simple SUM/MIN/etc). This changes memory behavior vs the original
streaming reducer and can increase memory/GC for wide schemas even if only one
sketch column benefits. Consider batching only for aggregators that return
`supportsBatchAggregation() == true` (keep pairwise aggregation for the rest),
or gate the batch path on `allMatch` + a clear rationale.
##########
pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java:
##########
@@ -140,12 +176,112 @@ private GenericRowFileManager doReduce()
}
}
rollupFileWriter.write(previousRow);
- _rollupFileManager.closeFileWriter();
- LOGGER.info("Finish creating rollup file in {}ms",
System.currentTimeMillis() - rollupFileCreationStartTimeMs);
+ }
- _fileManager.cleanUp();
- LOGGER.info("Finish reducing in {}ms", System.currentTimeMillis() -
reduceStartTimeMs);
- return _rollupFileManager;
+ /**
+ * Batch reduce - collects all rows for the same key before aggregating.
+ * This is more efficient for sketch aggregators that can batch merge.
+ */
+ private void reduceBatch(GenericRowFileRecordReader recordReader, int
numRows,
+ List<AggregatorContext> aggregatorContextList, GenericRowFileWriter
rollupFileWriter, boolean includeNullFields)
+ throws Exception {
+ // Collect values for each metric column across rows with the same key
+ List<List<Object>> batchValues = new
ArrayList<>(aggregatorContextList.size());
+ for (int j = 0; j < aggregatorContextList.size(); j++) {
+ batchValues.add(new ArrayList<>());
+ }
+
+ GenericRow baseRow = new GenericRow();
+ recordReader.read(0, baseRow);
+ int baseRowId = 0;
+
+ // Initialize batch with first row's values
+ for (int j = 0; j < aggregatorContextList.size(); j++) {
+ String column = aggregatorContextList.get(j)._column;
+ if (!includeNullFields || !baseRow.isNullValue(column)) {
+ batchValues.get(j).add(baseRow.getValue(column));
+ }
+ }
+
+ GenericRow currentRow = new GenericRow();
+ for (int i = 1; i < numRows; i++) {
+ currentRow.clear();
+ recordReader.read(i, currentRow);
+
+ if (recordReader.compare(baseRowId, i) == 0) {
+ // Same key - add values to batch
+ for (int j = 0; j < aggregatorContextList.size(); j++) {
+ String column = aggregatorContextList.get(j)._column;
+ if (!includeNullFields || !currentRow.isNullValue(column)) {
+ batchValues.get(j).add(currentRow.getValue(column));
+ }
+ }
+
+ // Memory safety: flush partial result if batch gets too large
+ if (batchValues.get(0).size() >= _maxBatchSize) {
Review Comment:
The batch flush threshold is based on `batchValues.get(0).size()`, but
per-metric lists can diverge in size (e.g., when `includeNullFields` is true
and some metrics are null more often). This can prevent flushing and lead to
unbounded memory growth for other metrics. Track batch size independently
(e.g., a per-key row counter) or compute the max size across all `batchValues`
when deciding to flush.
```suggestion
int currentBatchSize = 0;
for (int k = 0; k < batchValues.size(); k++) {
int size = batchValues.get(k).size();
if (size > currentBatchSize) {
currentBatchSize = size;
}
}
if (currentBatchSize >= _maxBatchSize) {
```
##########
pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/ReducerTest.java:
##########
@@ -570,4 +575,209 @@ public void testDedupWithSort()
assertEquals(fieldToValueMap.get("d2"), dValues[expectedDIndex][1]);
}
}
+
+ /**
+ * Test rollup with theta sketch metric to verify batch aggregation path.
+ * This test creates multiple rows per dimension key, each with a theta
sketch,
+ * and verifies they are properly aggregated using batch aggregation.
+ */
+ @Test
+ public void testRollupWithThetaSketch()
+ throws Exception {
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").build();
+ Schema schema = new Schema.SchemaBuilder().setSchemaName("testTable")
+ .addSingleValueDimension("d", DataType.INT)
+ .addMetric("sketch", DataType.BYTES)
+ .build();
+ Pair<List<FieldSpec>, Integer> result =
SegmentProcessorUtils.getFieldSpecs(schema, MergeType.ROLLUP, null);
+ GenericRowFileManager fileManager =
+ new GenericRowFileManager(FILE_MANAGER_OUTPUT_DIR, result.getLeft(),
false, result.getRight());
+
+ GenericRowFileWriter fileWriter = fileManager.getFileWriter();
+ int numRecords = 100;
+ int numDimValues = 5;
+ // Track expected distinct values per dimension
+ Map<Integer, Set<Integer>> expectedDistincts = new TreeMap<>();
+ for (int i = 0; i < numDimValues; i++) {
+ expectedDistincts.put(i, new HashSet<>());
+ }
+
+ GenericRow row = new GenericRow();
+ for (int i = 0; i < numRecords; i++) {
+ row.clear();
+ int d = RANDOM.nextInt(numDimValues);
+ // Create a sketch with some random values
+ UpdateSketch updateSketch =
UpdateSketch.builder().setNominalEntries(128).build();
+ int numValues = RANDOM.nextInt(10) + 1;
+ for (int j = 0; j < numValues; j++) {
+ int value = RANDOM.nextInt(1000);
+ updateSketch.update(value);
+ expectedDistincts.get(d).add(value);
+ }
Review Comment:
This test relies on non-seeded randomness (`RANDOM`) plus an approximate
theta sketch estimate with a 10% tolerance. That combination can make the test
flaky as the true cardinality varies and sketch error can occasionally exceed
the chosen tolerance. Prefer deterministic inputs (fixed seed and/or fixed
value sets) and/or keep the distinct count below the sketch’s nominal entries
so the estimate is exact, allowing a strict equality assertion.
##########
pinot-core/src/main/java/org/apache/pinot/core/common/MinionConstants.java:
##########
@@ -133,6 +133,15 @@ public static abstract class MergeTask {
public static final String MAX_NUM_RECORDS_PER_TASK_KEY =
"maxNumRecordsPerTask";
public static final String MAX_NUM_RECORDS_PER_SEGMENT_KEY =
"maxNumRecordsPerSegment";
+ // Reducer config
+ /**
+ * Maximum number of rows to batch before aggregating during rollup reduce
phase.
+ * Higher values improve performance for sketch aggregations but use more
memory.
+ * Default is 1000.
+ */
+ public static final String REDUCER_MAX_BATCH_SIZE_KEY =
"reducerMaxBatchSize";
+ public static final int DEFAULT_REDUCER_MAX_BATCH_SIZE = 1000;
+
Review Comment:
PR description states `reducerMaxBatchSize` default is 500, but the
introduced constant sets the default to 1000. Please align the implementation
(and any docs/tests) with the intended default to avoid surprising behavior
changes.
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskExecutor.java:
##########
@@ -101,6 +101,12 @@ protected List<SegmentConversionResult>
convert(PinotTaskConfig pinotTaskConfig,
// Segment config
segmentProcessorConfigBuilder.setSegmentConfig(MergeTaskUtils.getSegmentConfig(configs));
+ // Reducer config
+ String reducerMaxBatchSizeStr =
configs.get(MinionConstants.MergeTask.REDUCER_MAX_BATCH_SIZE_KEY);
+ if (reducerMaxBatchSizeStr != null) {
+
segmentProcessorConfigBuilder.setReducerMaxBatchSize(Integer.parseInt(reducerMaxBatchSizeStr));
Review Comment:
`Integer.parseInt(reducerMaxBatchSizeStr)` will throw a
`NumberFormatException` for invalid task configs, failing the task without a
clear message. Consider validating the value (numeric and > 0) and either
falling back to the default or throwing a more actionable exception/log message
that includes the offending config key/value.
```suggestion
try {
int reducerMaxBatchSize = Integer.parseInt(reducerMaxBatchSizeStr);
if (reducerMaxBatchSize > 0) {
segmentProcessorConfigBuilder.setReducerMaxBatchSize(reducerMaxBatchSize);
} else {
LOGGER.warn("Invalid value for {}: {}. Expected a positive
integer; using default reducer max batch size.",
MinionConstants.MergeTask.REDUCER_MAX_BATCH_SIZE_KEY,
reducerMaxBatchSizeStr);
}
} catch (NumberFormatException e) {
LOGGER.warn("Failed to parse reducer max batch size for {}: '{}'.
Using default reducer max batch size.",
MinionConstants.MergeTask.REDUCER_MAX_BATCH_SIZE_KEY,
reducerMaxBatchSizeStr, e);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]