This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 15b16e8f10 Improve server query cancellation and timeout checking
during execution (#9286)
15b16e8f10 is described below
commit 15b16e8f1029c745397dff869fd0759434761011
Author: Jia Guo <[email protected]>
AuthorDate: Mon Aug 29 06:49:08 2022 -0700
Improve server query cancellation and timeout checking during execution
(#9286)
* Improve query cancellation and timeout checking
* Improve query cancellation and timeout checking
* Improve query cancellation and timeout checking
* Address comments
* Address comments
* Address comments
---
.../pinot/core/data/manager/BaseTableDataManager.java | 2 +-
.../core/operator/combine/BaseCombineOperator.java | 19 ++++++++++++++-----
.../combine/GroupByOrderByCombineOperator.java | 16 ++++++++++++++++
3 files changed, 31 insertions(+), 6 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index bcb21234c4..a483c51cdc 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -142,7 +142,7 @@ public abstract class BaseTableDataManager implements
TableDataManager {
_isStreamSegmentDownloadUntar =
tableDataManagerParams.isStreamSegmentDownloadUntar();
if (_isStreamSegmentDownloadUntar) {
LOGGER.info("Using streamed download-untar for segment download! "
- + "The rate limit interval for streamed download-untar is {} ms",
+ + "The rate limit interval for streamed download-untar is {}
bytes/s",
_streamSegmentDownloadUntarRateLimitBytesPerSec);
}
int maxParallelSegmentDownloads =
tableDataManagerParams.getMaxParallelSegmentDownloads();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index 9adaec05f0..9238604796 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -196,14 +196,15 @@ public abstract class BaseCombineOperator extends
BaseOperator<IntermediateResul
int numBlocksMerged = 0;
long endTimeMs = _queryContext.getEndTimeMs();
while (numBlocksMerged < _numOperators) {
+ // Timeout has reached, shouldn't continue to process.
`_blockingQueue.poll` will continue to return blocks even
+ // if negative timeout is provided; therefore an extra check is needed
+ if (endTimeMs - System.currentTimeMillis() < 0) {
+ return generateTimeOutResult(numBlocksMerged);
+ }
IntermediateResultsBlock blockToMerge =
_blockingQueue.poll(endTimeMs - System.currentTimeMillis(),
TimeUnit.MILLISECONDS);
if (blockToMerge == null) {
- // Query times out, skip merging the remaining results blocks
- LOGGER.error("Timed out while polling results block, numBlocksMerged:
{} (query: {})", numBlocksMerged,
- _queryContext);
- return new
IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
- new TimeoutException("Timed out while polling results block")));
+ return generateTimeOutResult(numBlocksMerged);
}
if (blockToMerge.getProcessingExceptions() != null) {
// Caught exception while processing segment, skip merging the
remaining results blocks and directly return the
@@ -224,6 +225,14 @@ public abstract class BaseCombineOperator extends
BaseOperator<IntermediateResul
return mergedBlock;
}
+ private IntermediateResultsBlock generateTimeOutResult(int numBlocksMerged) {
+ // Query times out, skip merging the remaining results blocks
+ LOGGER.error("Timed out while polling results block, numBlocksMerged: {}
(query: {})", numBlocksMerged,
+ _queryContext);
+ return new
IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+ new TimeoutException("Timed out while polling results block")));
+ }
+
/**
* Can be overridden for early termination.
*/
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index f12e1c9b5e..afea001c1f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -45,6 +45,7 @@ import
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.spi.exception.EarlyTerminationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,6 +58,8 @@ import org.slf4j.LoggerFactory;
@SuppressWarnings("rawtypes")
public class GroupByOrderByCombineOperator extends BaseCombineOperator {
public static final int MAX_TRIM_THRESHOLD = 1_000_000_000;
+ public static final int MAX_GROUP_BY_KEYS_MERGED_PER_INTERRUPTION_CHECK =
10_000;
+
private static final Logger LOGGER =
LoggerFactory.getLogger(GroupByOrderByCombineOperator.class);
private static final String EXPLAIN_NAME = "COMBINE_GROUPBY_ORDERBY";
@@ -165,6 +168,8 @@ public class GroupByOrderByCombineOperator extends
BaseCombineOperator {
// Merge aggregation group-by result.
// Iterate over the group-by keys, for each key, update the group-by
result in the indexedTable
Collection<IntermediateRecord> intermediateRecords =
resultsBlock.getIntermediateRecords();
+ // Count the number of merged keys
+ int mergedKeys = 0;
// For now, only GroupBy OrderBy query has pre-constructed
intermediate records
if (intermediateRecords == null) {
// Merge aggregation group-by result.
@@ -181,12 +186,16 @@ public class GroupByOrderByCombineOperator extends
BaseCombineOperator {
values[_numGroupByExpressions + i] =
aggregationGroupByResult.getResultForGroupId(i, groupId);
}
_indexedTable.upsert(new Key(keys), new Record(values));
+ mergedKeys++;
+ checkMergePhaseInterruption(mergedKeys);
}
}
} else {
for (IntermediateRecord intermediateResult : intermediateRecords) {
//TODO: change upsert api so that it accepts intermediateRecord
directly
_indexedTable.upsert(intermediateResult._key,
intermediateResult._record);
+ mergedKeys++;
+ checkMergePhaseInterruption(mergedKeys);
}
}
} finally {
@@ -197,6 +206,13 @@ public class GroupByOrderByCombineOperator extends
BaseCombineOperator {
}
}
+ // Check for thread interruption, every time after merging 10_000 keys
+ private void checkMergePhaseInterruption(int mergedKeys) {
+ if (mergedKeys % MAX_GROUP_BY_KEYS_MERGED_PER_INTERRUPTION_CHECK == 0 &&
Thread.interrupted()) {
+ throw new EarlyTerminationException();
+ }
+ }
+
@Override
protected void onException(Exception e) {
_mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
e));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]