This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 15b16e8f10 Improve server query cancellation and timeout checking 
during execution (#9286)
15b16e8f10 is described below

commit 15b16e8f1029c745397dff869fd0759434761011
Author: Jia Guo <[email protected]>
AuthorDate: Mon Aug 29 06:49:08 2022 -0700

    Improve server query cancellation and timeout checking during execution 
(#9286)
    
    * Improve query cancellation and timeout checking
    
    * Improve query cancellation and timeout checking
    
    * Improve query cancellation and timeout checking
    
    * Address comments
    
    * Address comments
    
    * Address comments
---
 .../pinot/core/data/manager/BaseTableDataManager.java |  2 +-
 .../core/operator/combine/BaseCombineOperator.java    | 19 ++++++++++++++-----
 .../combine/GroupByOrderByCombineOperator.java        | 16 ++++++++++++++++
 3 files changed, 31 insertions(+), 6 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index bcb21234c4..a483c51cdc 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -142,7 +142,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     _isStreamSegmentDownloadUntar = 
tableDataManagerParams.isStreamSegmentDownloadUntar();
     if (_isStreamSegmentDownloadUntar) {
       LOGGER.info("Using streamed download-untar for segment download! "
-              + "The rate limit interval for streamed download-untar is {} ms",
+              + "The rate limit interval for streamed download-untar is {} 
bytes/s",
           _streamSegmentDownloadUntarRateLimitBytesPerSec);
     }
     int maxParallelSegmentDownloads = 
tableDataManagerParams.getMaxParallelSegmentDownloads();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
index 9adaec05f0..9238604796 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java
@@ -196,14 +196,15 @@ public abstract class BaseCombineOperator extends 
BaseOperator<IntermediateResul
     int numBlocksMerged = 0;
     long endTimeMs = _queryContext.getEndTimeMs();
     while (numBlocksMerged < _numOperators) {
+      // Timeout has reached, shouldn't continue to process. 
`_blockingQueue.poll` will continue to return blocks even
+      // if negative timeout is provided; therefore an extra check is needed
+      if (endTimeMs - System.currentTimeMillis() < 0) {
+        return generateTimeOutResult(numBlocksMerged);
+      }
       IntermediateResultsBlock blockToMerge =
           _blockingQueue.poll(endTimeMs - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS);
       if (blockToMerge == null) {
-        // Query times out, skip merging the remaining results blocks
-        LOGGER.error("Timed out while polling results block, numBlocksMerged: 
{} (query: {})", numBlocksMerged,
-            _queryContext);
-        return new 
IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
-            new TimeoutException("Timed out while polling results block")));
+        return generateTimeOutResult(numBlocksMerged);
       }
       if (blockToMerge.getProcessingExceptions() != null) {
         // Caught exception while processing segment, skip merging the 
remaining results blocks and directly return the
@@ -224,6 +225,14 @@ public abstract class BaseCombineOperator extends 
BaseOperator<IntermediateResul
     return mergedBlock;
   }
 
+  private IntermediateResultsBlock generateTimeOutResult(int numBlocksMerged) {
+    // Query times out, skip merging the remaining results blocks
+    LOGGER.error("Timed out while polling results block, numBlocksMerged: {} 
(query: {})", numBlocksMerged,
+        _queryContext);
+    return new 
IntermediateResultsBlock(QueryException.getException(QueryException.EXECUTION_TIMEOUT_ERROR,
+        new TimeoutException("Timed out while polling results block")));
+  }
+
   /**
    * Can be overridden for early termination.
    */
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index f12e1c9b5e..afea001c1f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -45,6 +45,7 @@ import 
org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
 import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.spi.exception.EarlyTerminationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +58,8 @@ import org.slf4j.LoggerFactory;
 @SuppressWarnings("rawtypes")
 public class GroupByOrderByCombineOperator extends BaseCombineOperator {
   public static final int MAX_TRIM_THRESHOLD = 1_000_000_000;
+  public static final int MAX_GROUP_BY_KEYS_MERGED_PER_INTERRUPTION_CHECK = 
10_000;
+
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GroupByOrderByCombineOperator.class);
 
   private static final String EXPLAIN_NAME = "COMBINE_GROUPBY_ORDERBY";
@@ -165,6 +168,8 @@ public class GroupByOrderByCombineOperator extends 
BaseCombineOperator {
         // Merge aggregation group-by result.
         // Iterate over the group-by keys, for each key, update the group-by 
result in the indexedTable
         Collection<IntermediateRecord> intermediateRecords = 
resultsBlock.getIntermediateRecords();
+        // Count the number of merged keys
+        int mergedKeys = 0;
         // For now, only GroupBy OrderBy query has pre-constructed 
intermediate records
         if (intermediateRecords == null) {
           // Merge aggregation group-by result.
@@ -181,12 +186,16 @@ public class GroupByOrderByCombineOperator extends 
BaseCombineOperator {
                 values[_numGroupByExpressions + i] = 
aggregationGroupByResult.getResultForGroupId(i, groupId);
               }
               _indexedTable.upsert(new Key(keys), new Record(values));
+              mergedKeys++;
+              checkMergePhaseInterruption(mergedKeys);
             }
           }
         } else {
           for (IntermediateRecord intermediateResult : intermediateRecords) {
             //TODO: change upsert api so that it accepts intermediateRecord 
directly
             _indexedTable.upsert(intermediateResult._key, 
intermediateResult._record);
+            mergedKeys++;
+            checkMergePhaseInterruption(mergedKeys);
           }
         }
       } finally {
@@ -197,6 +206,13 @@ public class GroupByOrderByCombineOperator extends 
BaseCombineOperator {
     }
   }
 
+  // Check for thread interruption, every time after merging 10_000 keys
+  private void checkMergePhaseInterruption(int mergedKeys) {
+    if (mergedKeys % MAX_GROUP_BY_KEYS_MERGED_PER_INTERRUPTION_CHECK == 0 && 
Thread.interrupted()) {
+      throw new EarlyTerminationException();
+    }
+  }
+
   @Override
   protected void onException(Exception e) {
     
_mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR,
 e));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to