This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f95ada1ab35 Fix the error handling in
SequentialSortedGroupByCombineOperator (#16840)
f95ada1ab35 is described below
commit f95ada1ab35b288f3529966335f798c8fcb4c941
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Sep 18 11:22:40 2025 -0700
Fix the error handling in SequentialSortedGroupByCombineOperator (#16840)
---
.../SequentialSortedGroupByCombineOperator.java | 78 ++++++----------------
.../combine/SortedGroupByCombineOperator.java | 7 +-
.../combine/CombineErrorOperatorsTest.java | 63 ++++++++++++++---
3 files changed, 79 insertions(+), 69 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SequentialSortedGroupByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SequentialSortedGroupByCombineOperator.java
index b0a88acfe3e..629f5e7aab3 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SequentialSortedGroupByCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SequentialSortedGroupByCombineOperator.java
@@ -35,8 +35,6 @@ import
org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.query.scheduler.resources.ResourceManager;
import org.apache.pinot.core.query.utils.OrderByComparatorFactory;
import org.apache.pinot.core.util.GroupByUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
@@ -56,19 +54,16 @@ import org.slf4j.LoggerFactory;
*/
@SuppressWarnings({"rawtypes"})
public class SequentialSortedGroupByCombineOperator extends
BaseSingleBlockCombineOperator<GroupByResultsBlock> {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(SequentialSortedGroupByCombineOperator.class);
+ // TODO: Consider changing it to "COMBINE_GROUP_BY_SEQUENTIAL_SORTED" to
distinguish from GroupByCombineOperator
private static final String EXPLAIN_NAME = "COMBINE_GROUP_BY";
- // We use a CountDownLatch to track if all Futures are finished by the query
timeout, and cancel the unfinished
- // _futures (try to interrupt the execution if it already started).
- private volatile boolean _groupsTrimmed;
- private volatile boolean _numGroupsLimitReached;
- private volatile boolean _numGroupsWarningLimitReached;
-
- private SortedRecords _records = null;
private final SortedRecordsMerger _sortedRecordsMerger;
+ private SortedRecords _records;
+ private boolean _groupsTrimmed;
+ private boolean _numGroupsLimitReached;
+ private boolean _numGroupsWarningLimitReached;
+
public SequentialSortedGroupByCombineOperator(List<Operator> operators,
QueryContext queryContext,
ExecutorService executorService) {
super(null, operators, overrideMaxExecutionThreads(queryContext,
operators.size()), executorService);
@@ -105,22 +100,12 @@ public class SequentialSortedGroupByCombineOperator
extends BaseSingleBlockCombi
int operatorId;
while (_processingException.get() == null && (operatorId =
_nextOperatorId.getAndIncrement()) < _numOperators) {
Operator operator = _operators.get(operatorId);
+ GroupByResultsBlock resultsBlock;
try {
if (operator instanceof AcquireReleaseColumnsSegmentOperator) {
((AcquireReleaseColumnsSegmentOperator) operator).acquire();
}
- GroupByResultsBlock resultsBlock = (GroupByResultsBlock)
operator.nextBlock();
- if (resultsBlock.isGroupsTrimmed()) {
- _groupsTrimmed = true;
- }
- // Set groups limit reached flag.
- if (resultsBlock.isNumGroupsLimitReached()) {
- _numGroupsLimitReached = true;
- }
- if (resultsBlock.isNumGroupsWarningLimitReached()) {
- _numGroupsWarningLimitReached = true;
- }
- _blockingQueue.offer(resultsBlock);
+ resultsBlock = (GroupByResultsBlock) operator.nextBlock();
} catch (RuntimeException e) {
throw wrapOperatorException(operator, e);
} finally {
@@ -128,43 +113,24 @@ public class SequentialSortedGroupByCombineOperator
extends BaseSingleBlockCombi
((AcquireReleaseColumnsSegmentOperator) operator).release();
}
}
+ _blockingQueue.offer(resultsBlock);
}
}
- @Override
- public void onProcessSegmentsException(Throwable t) {
- _processingException.compareAndSet(null, t);
- }
-
- /**
- * {@inheritDoc}
- *
- * <p>Combines sorted intermediate aggregation result blocks from underlying
operators and returns a merged one.
- * <ul>
- * <li>
- * Merges multiple sorted intermediate aggregation result from {@link
this#_blockingQueue} into one
- * and create a result block
- * </li>
- * <li>
- * Set all exceptions encountered during execution into the merged
result block
- * </li>
- * </ul>
- */
+ /// {@inheritDoc}
+ ///
+ /// Merges multiple sorted intermediate results from [#_blockingQueue] into
one and creates a result block.
@Override
public BaseResultsBlock mergeResults()
throws Exception {
-
+ DataSchema dataSchema = null;
int numBlocksMerged = 0;
long endTimeMs = _queryContext.getEndTimeMs();
- DataSchema dataSchema = null;
while (numBlocksMerged < _numOperators) {
// Timeout has reached, shouldn't continue to process.
`_blockingQueue.poll` will continue to return blocks even
// if negative timeout is provided; therefore an extra check is needed
long waitTimeMs = endTimeMs - System.currentTimeMillis();
if (waitTimeMs <= 0) {
- String userError = "Timed out while combining group-by order-by
results after " + waitTimeMs + "ms";
- String logMsg = userError + ", queryContext = " + _queryContext;
- LOGGER.error(logMsg);
return getTimeoutResultsBlock(numBlocksMerged);
}
BaseResultsBlock blockToMerge = _blockingQueue.poll(waitTimeMs,
TimeUnit.MILLISECONDS);
@@ -181,10 +147,17 @@ public class SequentialSortedGroupByCombineOperator
extends BaseSingleBlockCombi
dataSchema = groupByResultBlockToMerge.getDataSchema();
}
+ // Merge records
+ if (_records == null) {
+ _records =
GroupByUtils.getAndPopulateSortedRecords(groupByResultBlockToMerge);
+ } else {
+ _records = _sortedRecordsMerger.mergeGroupByResultsBlock(_records,
groupByResultBlockToMerge);
+ }
+
+ // Set flags
if (groupByResultBlockToMerge.isGroupsTrimmed()) {
_groupsTrimmed = true;
}
- // Set groups limit reached flag.
if (groupByResultBlockToMerge.isNumGroupsLimitReached()) {
_numGroupsLimitReached = true;
}
@@ -192,17 +165,10 @@ public class SequentialSortedGroupByCombineOperator
extends BaseSingleBlockCombi
_numGroupsWarningLimitReached = true;
}
- if (_records == null) {
- _records =
GroupByUtils.getAndPopulateSortedRecords(groupByResultBlockToMerge);
- } else {
- _records = _sortedRecordsMerger.mergeGroupByResultsBlock(_records,
groupByResultBlockToMerge);
- }
numBlocksMerged++;
}
- SortedRecordTable table =
- new SortedRecordTable(_records, dataSchema, _queryContext,
_executorService);
-
+ SortedRecordTable table = new SortedRecordTable(_records, dataSchema,
_queryContext, _executorService);
if (_queryContext.isServerReturnFinalResult()) {
table.finish(true, true);
} else if (_queryContext.isServerReturnFinalResultKeyUnpartitioned()) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java
index 84632ea97e3..f917eedacec 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/SortedGroupByCombineOperator.java
@@ -66,8 +66,8 @@ import org.slf4j.LoggerFactory;
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public class SortedGroupByCombineOperator extends
BaseSingleBlockCombineOperator<GroupByResultsBlock> {
-
private static final Logger LOGGER =
LoggerFactory.getLogger(SortedGroupByCombineOperator.class);
+ // TODO: Consider changing it to "COMBINE_GROUP_BY_SORTED" to distinguish
from GroupByCombineOperator
private static final String EXPLAIN_NAME = "COMBINE_GROUP_BY";
// We use a CountDownLatch to track if all Futures are finished by the query
timeout, and cancel the unfinished
@@ -237,10 +237,7 @@ public class SortedGroupByCombineOperator extends
BaseSingleBlockCombineOperator
}
private GroupByResultsBlock finishSortedRecords(SortedRecords records) {
- SortedRecordTable table =
- new SortedRecordTable(records, _dataSchema, _queryContext,
_executorService);
-
- // finish
+ SortedRecordTable table = new SortedRecordTable(records, _dataSchema,
_queryContext, _executorService);
if (_queryContext.isServerReturnFinalResult()) {
table.finish(true, true);
} else if (_queryContext.isServerReturnFinalResultKeyUnpartitioned()) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineErrorOperatorsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineErrorOperatorsTest.java
index 4523581cb30..5d8eec88c98 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineErrorOperatorsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/operator/combine/CombineErrorOperatorsTest.java
@@ -20,17 +20,18 @@ package org.apache.pinot.core.operator.combine;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.common.Block;
import org.apache.pinot.core.common.Operator;
import org.apache.pinot.core.operator.BaseOperator;
import org.apache.pinot.core.operator.ExecutionStatistics;
import org.apache.pinot.core.operator.blocks.results.BaseResultsBlock;
import org.apache.pinot.core.operator.blocks.results.ExceptionResultsBlock;
+import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock;
import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
import org.apache.pinot.core.query.request.context.QueryContext;
import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
@@ -51,9 +52,12 @@ public class CombineErrorOperatorsTest {
private static final int NUM_THREADS = 2;
private static final QueryContext QUERY_CONTEXT =
QueryContextConverterUtils.getQueryContext("SELECT * FROM testTable");
+ private static final QueryContext GROUP_BY_QUERY_CONTEXT =
+ QueryContextConverterUtils.getQueryContext("SELECT myColumn, COUNT(*)
FROM testTable GROUP BY 1 ORDER BY 1");
static {
QUERY_CONTEXT.setEndTimeMs(Long.MAX_VALUE);
+ GROUP_BY_QUERY_CONTEXT.setEndTimeMs(Long.MAX_VALUE);
}
private ExecutorService _executorService;
@@ -66,8 +70,8 @@ public class CombineErrorOperatorsTest {
@DataProvider(name = "getErrorCodes")
public static Object[][] getErrorCodes() {
return Arrays.stream(QueryErrorCode.values())
- .map(queryErrorCode -> new Object[]{queryErrorCode})
- .toArray(Object[][]::new);
+ .map(queryErrorCode -> new Object[]{queryErrorCode})
+ .toArray(Object[][]::new);
}
@Test(dataProvider = "getErrorCodes")
@@ -88,6 +92,24 @@ public class CombineErrorOperatorsTest {
assertEquals(errorMsg.getErrCode(), queryErrorCode);
}
+ @Test(dataProvider = "getErrorCodes")
+ public void
testSequentialSortedGroupByCombineExceptionOperator(QueryErrorCode
queryErrorCode) {
+ List<Operator> operators = new ArrayList<>(NUM_OPERATORS);
+ for (int i = 0; i < NUM_OPERATORS - 1; i++) {
+ operators.add(new RegularGroupByOperator());
+ }
+ operators.add(new ExceptionOperator(queryErrorCode.asException("Test
exception message")));
+ SequentialSortedGroupByCombineOperator combineOperator =
+ new SequentialSortedGroupByCombineOperator(operators,
GROUP_BY_QUERY_CONTEXT, _executorService);
+ BaseResultsBlock resultsBlock = combineOperator.nextBlock();
+ assertTrue(resultsBlock instanceof ExceptionResultsBlock);
+ List<QueryErrorMessage> errorMsgs = resultsBlock.getErrorMessages();
+ assertNotNull(errorMsgs);
+ assertEquals(errorMsgs.size(), 1);
+ QueryErrorMessage errorMsg = errorMsgs.get(0);
+ assertEquals(errorMsg.getErrCode(), queryErrorCode);
+ }
+
@Test
public void testCombineErrorOperator() {
List<Operator> operators = new ArrayList<>(NUM_OPERATORS);
@@ -141,7 +163,7 @@ public class CombineErrorOperatorsTest {
@Override
public List<Operator> getChildOperators() {
- return Collections.emptyList();
+ return List.of();
}
@Override
@@ -165,7 +187,7 @@ public class CombineErrorOperatorsTest {
@Override
public List<Operator> getChildOperators() {
- return Collections.emptyList();
+ return List.of();
}
@Override
@@ -185,13 +207,38 @@ public class CombineErrorOperatorsTest {
@Override
protected Block getNextBlock() {
return new SelectionResultsBlock(
- new DataSchema(new String[]{"myColumn"}, new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT}),
- new ArrayList<>(), QUERY_CONTEXT);
+ new DataSchema(new String[]{"myColumn"}, new
ColumnDataType[]{ColumnDataType.STRING}), new ArrayList<>(),
+ QUERY_CONTEXT);
+ }
+
+ @Override
+ public List<Operator> getChildOperators() {
+ return List.of();
+ }
+
+ @Override
+ public String toExplainString() {
+ return EXPLAIN_NAME;
+ }
+
+ @Override
+ public ExecutionStatistics getExecutionStatistics() {
+ return new ExecutionStatistics(0, 0, 0, 0);
+ }
+ }
+
+ private static class RegularGroupByOperator extends BaseOperator {
+ private static final String EXPLAIN_NAME = "REGULAR_GROUP_BY";
+
+ @Override
+ protected Block getNextBlock() {
+ return new GroupByResultsBlock(new DataSchema(new String[]{"myColumn",
"count(*)"},
+ new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.LONG}),
new ArrayList<>(), GROUP_BY_QUERY_CONTEXT);
}
@Override
public List<Operator> getChildOperators() {
- return Collections.emptyList();
+ return List.of();
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]