This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ceb3ef2b2b8 Fix early termination on MSE operator (#16696)
ceb3ef2b2b8 is described below

commit ceb3ef2b2b88920341c9f3c6f3bbb1bf72b4847c
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Thu Aug 28 12:39:08 2025 -0700

    Fix early termination on MSE operator (#16696)
---
 .../tests/GroupByOptionsIntegrationTest.java       |  2 +-
 .../runtime/executor/OpChainSchedulerService.java  |  9 ++--
 .../query/runtime/operator/AggregateOperator.java  |  5 ++-
 .../query/runtime/operator/AsofJoinOperator.java   |  9 ++--
 .../operator/BaseMailboxReceiveOperator.java       |  4 +-
 .../query/runtime/operator/HashJoinOperator.java   | 23 +++++-----
 .../runtime/operator/MailboxSendOperator.java      |  4 +-
 .../query/runtime/operator/MultiStageOperator.java | 51 ++++++++++------------
 .../runtime/operator/NonEquiJoinOperator.java      |  7 ++-
 .../runtime/operator/WindowAggregateOperator.java  |  7 ++-
 .../plan/pipeline/PipelineBreakerOperator.java     | 35 +++++----------
 .../queries/PerQueryCPUMemAccountantTest.java      | 13 ++++--
 12 files changed, 79 insertions(+), 90 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
index 55c8a0ea02d..e2a5308720e 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java
@@ -531,7 +531,7 @@ public class GroupByOptionsIntegrationTest extends 
BaseClusterIntegrationTestSet
 
     String errorMessage = toResultStr(result);
 
-    Assert.assertTrue(errorMessage.contains("NUM_GROUPS_LIMIT has been reached 
at "), errorMessage);
+    Assert.assertTrue(errorMessage.contains("NUM_GROUPS_LIMIT has been 
reached"), errorMessage);
   }
 
   // for debug only
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index 807e0d038a5..f26d02315f1 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -116,6 +116,7 @@ public class OpChainSchedulerService {
 
   private void registerInternal(OpChain operatorChain) {
     OpChainId opChainId = operatorChain.getId();
+    MultiStageOperator rootOperator = operatorChain.getRoot();
     Future<?> scheduledFuture = _executorService.submit(new TraceRunnable() {
       @Override
       public void runJob() {
@@ -127,11 +128,11 @@ public class OpChainSchedulerService {
           Tracing.ThreadAccountantOps.setupWorker(opChainId.getStageId(), 
ThreadExecutionContext.TaskType.MSE,
               operatorChain.getParentContext());
           LOGGER.trace("({}): Executing", operatorChain);
-          MseBlock result = operatorChain.getRoot().nextBlock();
+          MseBlock result = rootOperator.nextBlock();
           while (result.isData()) {
-            result = operatorChain.getRoot().nextBlock();
+            result = rootOperator.nextBlock();
           }
-          MultiStageQueryStats stats = 
operatorChain.getRoot().calculateStats();
+          MultiStageQueryStats stats = rootOperator.calculateStats();
           if (result.isError()) {
             errorBlock = (ErrorMseBlock) result;
             LOGGER.error("({}): Completed erroneously {} {}", operatorChain, 
stats, errorBlock.getErrorMessages());
@@ -154,7 +155,7 @@ public class OpChainSchedulerService {
         }
       }
     });
-    _opChainCache.put(opChainId, operatorChain.getRoot());
+    _opChainCache.put(opChainId, rootOperator);
     _submittedOpChainMap.put(opChainId, scheduledFuture);
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
index f9fb427dd1b..da39a78fea6 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AggregateOperator.java
@@ -53,6 +53,7 @@ import org.apache.pinot.query.runtime.blocks.MseBlock;
 import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
 import org.apache.pinot.query.runtime.operator.utils.SortUtils;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
+import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.apache.pinot.spi.utils.CommonConstants.Server;
 import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
@@ -235,8 +236,8 @@ public class AggregateOperator extends MultiStageOperator {
 
         if (_groupByExecutor.isNumGroupsLimitReached()) {
           if (_errorOnNumGroupsLimit) {
-            _input.earlyTerminate();
-            throw new RuntimeException("NUM_GROUPS_LIMIT has been reached at " 
+ _operatorId);
+            throw QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException(
+                "NUM_GROUPS_LIMIT has been reached at: " + _operatorId);
           } else {
             _statMap.merge(StatKey.NUM_GROUPS_LIMIT_REACHED, true);
             _input.earlyTerminate();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
index 73d229a2304..cb88d64bdf5 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/AsofJoinOperator.java
@@ -32,7 +32,6 @@ import 
org.apache.pinot.query.planner.partitioning.KeySelectorFactory;
 import org.apache.pinot.query.planner.plannode.JoinNode;
 import org.apache.pinot.query.runtime.blocks.MseBlock;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.spi.trace.Tracing;
 
 
 public class AsofJoinOperator extends BaseJoinOperator {
@@ -103,7 +102,7 @@ public class AsofJoinOperator extends BaseJoinOperator {
       if (matchKey == null) {
         // Rows with null match keys cannot be matched with any right rows
         if (needUnmatchedLeftRows()) {
-          
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+          sampleAndCheckInterruptionPeriodically(rows.size());
           rows.add(joinRow(leftRow, null));
         }
         continue;
@@ -112,18 +111,18 @@ public class AsofJoinOperator extends BaseJoinOperator {
       NavigableMap<Comparable<?>, Object[]> rightRows = 
_rightTable.get(hashKey);
       if (rightRows == null) {
         if (needUnmatchedLeftRows()) {
-          
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+          sampleAndCheckInterruptionPeriodically(rows.size());
           rows.add(joinRow(leftRow, null));
         }
       } else {
         Object[] rightRow = closestMatch(matchKey, rightRows);
         if (rightRow == null) {
           if (needUnmatchedLeftRows()) {
-            
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+            sampleAndCheckInterruptionPeriodically(rows.size());
             rows.add(joinRow(leftRow, null));
           }
         } else {
-          
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+          sampleAndCheckInterruptionPeriodically(rows.size());
           rows.add(joinRow(leftRow, rightRow));
         }
       }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
index 71bee7bfa45..f6cf8e578c7 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -132,12 +132,12 @@ public abstract class BaseMailboxReceiveOperator extends 
MultiStageOperator {
   }
 
   @Override
-  protected void sampleAndCheckInterruption() {
+  protected long getDeadlineMs() {
     // mailbox receive operator uses passive deadline instead of the active 
one because it is not an active operator
     // as it just waits for data from the mailbox.
     // This way if timeout is reached, it will be less probable to hit the 
timeout here, on the stage waiting for data,
     // than in the operator that is actively processing the data, which will 
produce a more meaningful error message.
-    sampleAndCheckInterruption(_context.getPassiveDeadlineMs());
+    return _context.getPassiveDeadlineMs();
   }
 
   @Override
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
index dfec8c7e44c..091ffa46e77 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java
@@ -39,7 +39,6 @@ import 
org.apache.pinot.query.runtime.operator.join.LongLookupTable;
 import org.apache.pinot.query.runtime.operator.join.LookupTable;
 import org.apache.pinot.query.runtime.operator.join.ObjectLookupTable;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.spi.trace.Tracing;
 
 
 /**
@@ -127,7 +126,7 @@ public class HashJoinOperator extends BaseJoinOperator {
         }
         continue;
       }
-      
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(_rightTable.size());
+      sampleAndCheckInterruptionPeriodically(_rightTable.size());
       _rightTable.addRow(key, row);
     }
   }
@@ -215,7 +214,7 @@ public class HashJoinOperator extends BaseJoinOperator {
             break;
           }
           // defer copying of the content until row matches
-          
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+          sampleAndCheckInterruptionPeriodically(rows.size());
           rows.add(resultRowView.toArray());
           if (_matchedRightRows != null) {
             _matchedRightRows.put(key, BIT_SET_PLACEHOLDER);
@@ -235,7 +234,7 @@ public class HashJoinOperator extends BaseJoinOperator {
     List<Object[]> rows = new ArrayList<>(leftRows.size());
 
     for (Object[] leftRow : leftRows) {
-      
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+      sampleAndCheckInterruptionPeriodically(rows.size());
       Object key = _leftKeySelector.getKey(leftRow);
       // Skip rows with null join keys - they should not participate in 
equi-joins per SQL standard
       if (handleNullKey(key, leftRow, rows)) {
@@ -255,7 +254,7 @@ public class HashJoinOperator extends BaseJoinOperator {
               maxRowsLimitReached = true;
               break;
             }
-            
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+            sampleAndCheckInterruptionPeriodically(rows.size());
             rows.add(resultRowView.toArray());
             hasMatchForLeftRow = true;
             if (_matchedRightRows != null) {
@@ -280,7 +279,7 @@ public class HashJoinOperator extends BaseJoinOperator {
       if (isMaxRowsLimitReached(rows.size())) {
         return;
       }
-      
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+      sampleAndCheckInterruptionPeriodically(rows.size());
       rows.add(joinRow(leftRow, null));
     }
   }
@@ -293,7 +292,7 @@ public class HashJoinOperator extends BaseJoinOperator {
     for (Object[] leftRow : leftRows) {
       Object key = _leftKeySelector.getKey(leftRow);
       if (_rightTable.containsKey(key)) {
-        
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+        sampleAndCheckInterruptionPeriodically(rows.size());
         rows.add(leftRow);
       }
     }
@@ -309,7 +308,7 @@ public class HashJoinOperator extends BaseJoinOperator {
     for (Object[] leftRow : leftRows) {
       Object key = _leftKeySelector.getKey(leftRow);
       if (!_rightTable.containsKey(key)) {
-        
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+        sampleAndCheckInterruptionPeriodically(rows.size());
         rows.add(leftRow);
       }
     }
@@ -326,7 +325,7 @@ public class HashJoinOperator extends BaseJoinOperator {
       for (Map.Entry<Object, Object> entry : _rightTable.entrySet()) {
         Object[] rightRow = (Object[]) entry.getValue();
         if (!_matchedRightRows.containsKey(entry.getKey())) {
-          
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+          sampleAndCheckInterruptionPeriodically(rows.size());
           rows.add(joinRow(null, rightRow));
         }
       }
@@ -336,14 +335,14 @@ public class HashJoinOperator extends BaseJoinOperator {
         BitSet matchedIndices = _matchedRightRows.get(entry.getKey());
         if (matchedIndices == null) {
           for (Object[] rightRow : rightRows) {
-            
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+            sampleAndCheckInterruptionPeriodically(rows.size());
             rows.add(joinRow(null, rightRow));
           }
         } else {
           int numRightRows = rightRows.size();
           int unmatchedIndex = 0;
           while ((unmatchedIndex = 
matchedIndices.nextClearBit(unmatchedIndex)) < numRightRows) {
-            
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+            sampleAndCheckInterruptionPeriodically(rows.size());
             rows.add(joinRow(null, rightRows.get(unmatchedIndex++)));
           }
         }
@@ -352,7 +351,7 @@ public class HashJoinOperator extends BaseJoinOperator {
     // Add unmatched null key rows from right side for RIGHT and FULL JOIN
     if (_nullKeyRightRows != null) {
       for (Object[] nullKeyRow : _nullKeyRightRows) {
-        
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+        sampleAndCheckInterruptionPeriodically(rows.size());
         rows.add(joinRow(null, nullKeyRow));
       }
     }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index c1a0fcf6e95..cfdfecedb95 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -235,9 +235,9 @@ public class MailboxSendOperator extends MultiStageOperator 
{
   }
 
   @Override
-  protected void sampleAndCheckInterruption() {
+  protected long getDeadlineMs() {
     // mailbox send operator uses passive deadline instead of the active one
-    sampleAndCheckInterruption(_context.getPassiveDeadlineMs());
+    return _context.getPassiveDeadlineMs();
   }
 
   private void sendEos(MseBlock.Eos eosBlockWithoutStats)
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index abd61d2ea81..fc7bb41106b 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -48,17 +48,15 @@ import org.apache.pinot.spi.trace.Tracing;
 import org.slf4j.Logger;
 
 
-public abstract class MultiStageOperator
-    implements Operator<MseBlock>, AutoCloseable {
-
+public abstract class MultiStageOperator implements Operator<MseBlock>, 
AutoCloseable {
   protected final OpChainExecutionContext _context;
   protected final String _operatorId;
+
   protected boolean _isEarlyTerminated;
 
   public MultiStageOperator(OpChainExecutionContext context) {
     _context = context;
     _operatorId = Joiner.on("_").join(getClass().getSimpleName(), 
_context.getStageId(), _context.getServer());
-    _isEarlyTerminated = false;
   }
 
   /**
@@ -74,35 +72,34 @@ public abstract class MultiStageOperator
 
   public abstract void registerExecution(long time, int numRows);
 
-  /// This method should be called periodically by the operator to check 
whether the execution should be interrupted.
-  ///
-  /// This could happen when the request deadline is reached, or the thread 
accountant decides to interrupt the query
-  /// due to resource constraints.
-  ///
-  /// Normally, callers should call [#sampleAndCheckInterruption(long 
deadlineMs)] passing the correct deadline, but
-  /// given most operators use either the active or the passive deadline, this 
method is provided as a convenience
-  /// method. By default, it uses the active deadline, which is the one that 
should be used for most operators, but
-  /// if the operator does not actively process data (ie both mailbox 
operators), it should override this method to
-  /// use the passive deadline instead.
-  /// See for example 
[MailboxSendOperator][org.apache.pinot.query.runtime.operator.MailboxSendOperator]).
-  protected void sampleAndCheckInterruption() {
-    sampleAndCheckInterruption(_context.getActiveDeadlineMs());
+  /// By default, it uses the active deadline, which is the one that should be 
used for most operators, but if the
+  /// operator does not actively process data (ie both mailbox operators), it 
should override this method to use the
+  /// passive deadline instead.
+  protected long getDeadlineMs() {
+    return _context.getActiveDeadlineMs();
   }
 
   /// This method should be called periodically by the operator to check 
whether the execution should be interrupted.
   ///
   /// This could happen when the request deadline is reached, or the thread 
accountant decides to interrupt the query
   /// due to resource constraints.
-  protected void sampleAndCheckInterruption(long deadlineMs) {
-    if (System.currentTimeMillis() >= deadlineMs) {
-      earlyTerminate();
-      throw QueryErrorCode.EXECUTION_TIMEOUT.asException("Timing out on " + 
getExplainName());
+  protected void checkInterruption() {
+    if (System.currentTimeMillis() >= getDeadlineMs()) {
+      throw QueryErrorCode.EXECUTION_TIMEOUT.asException("Timing out on: " + 
getExplainName());
     }
-    Tracing.ThreadAccountantOps.sample();
     if (Tracing.ThreadAccountantOps.isInterrupted()) {
-      earlyTerminate();
-      throw 
QueryErrorCode.SERVER_RESOURCE_LIMIT_EXCEEDED.asException("Resource limit 
exceeded for operator: "
-          + getExplainName());
+      throw new EarlyTerminationException("Interrupted on: " + 
getExplainName());
+    }
+  }
+
+  protected void sampleAndCheckInterruption() {
+    checkInterruption();
+    Tracing.ThreadAccountantOps.sample();
+  }
+
+  protected void sampleAndCheckInterruptionPeriodically(int 
numRecordsProcessed) {
+    if ((numRecordsProcessed & 
Tracing.ThreadAccountantOps.MAX_ENTRIES_KEYS_MERGED_PER_INTERRUPTION_CHECK_MASK)
 == 0) {
+      sampleAndCheckInterruption();
     }
   }
 
@@ -113,9 +110,6 @@ public abstract class MultiStageOperator
    */
   @Override
   public MseBlock nextBlock() {
-    if (Tracing.ThreadAccountantOps.isInterrupted()) {
-      throw new EarlyTerminationException("Interrupted while processing next 
block");
-    }
     if (logger().isDebugEnabled()) {
       logger().debug("Operator {}: Reading next block", _operatorId);
     }
@@ -123,6 +117,7 @@ public abstract class MultiStageOperator
       MseBlock nextBlock;
       Stopwatch executeStopwatch = Stopwatch.createStarted();
       try {
+        checkInterruption();
         nextBlock = getNextBlock();
       } catch (Exception e) {
         logger().warn("Operator {}: Exception while processing next block", 
_operatorId, e);
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
index 6a5a15e97ac..d6e6d4c4de1 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/NonEquiJoinOperator.java
@@ -28,7 +28,6 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.query.planner.plannode.JoinNode;
 import org.apache.pinot.query.runtime.blocks.MseBlock;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
-import org.apache.pinot.spi.trace.Tracing;
 
 
 /**
@@ -91,7 +90,7 @@ public class NonEquiJoinOperator extends BaseJoinOperator {
             maxRowsLimitReached = true;
             break;
           }
-          
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+          sampleAndCheckInterruptionPeriodically(rows.size());
           rows.add(joinRowView.toArray());
           hasMatchForLeftRow = true;
           if (_matchedRightRows != null) {
@@ -106,7 +105,7 @@ public class NonEquiJoinOperator extends BaseJoinOperator {
         if (isMaxRowsLimitReached(rows.size())) {
           break;
         }
-        
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+        sampleAndCheckInterruptionPeriodically(rows.size());
         rows.add(joinRow(leftRow, null));
       }
     }
@@ -124,7 +123,7 @@ public class NonEquiJoinOperator extends BaseJoinOperator {
     List<Object[]> rows = new ArrayList<>(numRightRows - numMatchedRightRows);
     int unmatchedIndex = 0;
     while ((unmatchedIndex = _matchedRightRows.nextClearBit(unmatchedIndex)) < 
numRightRows) {
-      
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+      sampleAndCheckInterruptionPeriodically(rows.size());
       rows.add(joinRow(null, _rightTable.get(unmatchedIndex++)));
     }
     return rows;
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
index 51dd2812261..0edaa21d3b9 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java
@@ -43,7 +43,6 @@ import 
org.apache.pinot.query.runtime.operator.window.WindowFunction;
 import org.apache.pinot.query.runtime.operator.window.WindowFunctionFactory;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.spi.exception.QueryErrorCode;
-import org.apache.pinot.spi.trace.Tracing;
 import 
org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner.WindowOverFlowMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -231,7 +230,7 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
       for (Object[] row : container) {
         // TODO: Revisit null direction handling for all query types
         Key key = AggregationUtils.extractRowKey(row, _keys);
-        
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(_numRows);
+        sampleAndCheckInterruptionPeriodically(_numRows);
         partitionRows.computeIfAbsent(key, k -> new ArrayList<>()).add(row);
       }
       _numRows += containerSize;
@@ -255,7 +254,7 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
       for (WindowFunction windowFunction : _windowFunctions) {
         List<Object> processRows = windowFunction.processRows(rowList);
         assert processRows.size() == rowList.size();
-        
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(windowFunctionResults.size());
+        sampleAndCheckInterruptionPeriodically(windowFunctionResults.size());
         windowFunctionResults.add(processRows);
       }
 
@@ -268,7 +267,7 @@ public class WindowAggregateOperator extends 
MultiStageOperator {
         }
         // Convert the results from WindowFunction to the desired type
         TypeUtils.convertRow(row, resultStoredTypes);
-        
Tracing.ThreadAccountantOps.sampleAndCheckInterruptionPeriodically(rows.size());
+        sampleAndCheckInterruptionPeriodically(rows.size());
         rows.add(row);
       }
     }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
index 8fe38c776e0..c58bbfd26b4 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerOperator.java
@@ -18,10 +18,9 @@
  */
 package org.apache.pinot.query.runtime.plan.pipeline;
 
+import com.google.common.collect.Maps;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
@@ -43,20 +42,26 @@ public class PipelineBreakerOperator extends 
MultiStageOperator {
   private static final String EXPLAIN_NAME = "PIPELINE_BREAKER";
 
   private final Map<Integer, MultiStageOperator> _workerMap;
+  private final List<MultiStageOperator> _childOperators;
+  private final Map<Integer, List<MseBlock>> _resultMap;
+  private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
 
-  private Map<Integer, List<MseBlock>> _resultMap;
   private ErrorMseBlock _errorBlock;
-  private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
 
   public PipelineBreakerOperator(OpChainExecutionContext context, Map<Integer, 
MultiStageOperator> workerMap) {
     super(context);
     _workerMap = workerMap;
-    _resultMap = new HashMap<>();
+    _childOperators = new ArrayList<>(workerMap.values());
+    _resultMap = Maps.newHashMapWithExpectedSize(workerMap.size());
     for (int workerKey : workerMap.keySet()) {
       _resultMap.put(workerKey, new ArrayList<>());
     }
   }
 
+  public Map<Integer, List<MseBlock>> getResultMap() {
+    return _resultMap;
+  }
+
   @Override
   public void registerExecution(long time, int numRows) {
     _statMap.merge(StatKey.EXECUTION_TIME_MS, time);
@@ -65,7 +70,7 @@ public class PipelineBreakerOperator extends 
MultiStageOperator {
 
   @Override
   public List<MultiStageOperator> getChildOperators() {
-    return Collections.emptyList();
+    return _childOperators;
   }
 
   @Override
@@ -78,10 +83,6 @@ public class PipelineBreakerOperator extends 
MultiStageOperator {
     return LOGGER;
   }
 
-  public Map<Integer, List<MseBlock>> getResultMap() {
-    return _resultMap;
-  }
-
   @Nullable
   public ErrorMseBlock getErrorBlock() {
     return _errorBlock;
@@ -100,8 +101,7 @@ public class PipelineBreakerOperator extends 
MultiStageOperator {
     // NOTE: Put an empty list for each worker in case there is no data block 
returned from that worker
     if (_workerMap.size() == 1) {
       Map.Entry<Integer, MultiStageOperator> entry = 
_workerMap.entrySet().iterator().next();
-      List<MseBlock> dataBlocks = new ArrayList<>();
-      _resultMap = Collections.singletonMap(entry.getKey(), dataBlocks);
+      List<MseBlock> dataBlocks = _resultMap.get(entry.getKey());
       Operator<MseBlock> operator = entry.getValue();
       MseBlock block = operator.nextBlock();
       while (block.isData()) {
@@ -113,10 +113,6 @@ public class PipelineBreakerOperator extends 
MultiStageOperator {
         return block;
       }
     } else {
-      _resultMap = new HashMap<>();
-      for (int workerKey : _workerMap.keySet()) {
-        _resultMap.put(workerKey, new ArrayList<>());
-      }
       // Keep polling from every operator in round-robin fashion
       Queue<Map.Entry<Integer, MultiStageOperator>> entries = new 
ArrayDeque<>(_workerMap.entrySet());
       while (!entries.isEmpty()) {
@@ -154,13 +150,6 @@ public class PipelineBreakerOperator extends 
MultiStageOperator {
     return new StatMap<>(_statMap);
   }
 
-  @Override
-  public void close() {
-    for (MultiStageOperator operator : _workerMap.values()) {
-      operator.close();
-    }
-  }
-
   public enum StatKey implements StatMap.Key {
     EXECUTION_TIME_MS(StatMap.Type.LONG),
     EMITTED_ROWS(StatMap.Type.LONG);
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/PerQueryCPUMemAccountantTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/PerQueryCPUMemAccountantTest.java
index 1aa4b23af29..5f10d49ec37 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/PerQueryCPUMemAccountantTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/PerQueryCPUMemAccountantTest.java
@@ -25,15 +25,17 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.core.accounting.PerQueryCPUMemAccountantFactory;
 import org.apache.pinot.query.planner.physical.DispatchableSubPlan;
+import org.apache.pinot.query.service.dispatch.QueryDispatcher;
 import org.apache.pinot.spi.accounting.QueryResourceTracker;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageProvider;
 import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.exception.EarlyTerminationException;
+import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.mockito.MockedStatic;
@@ -41,6 +43,8 @@ import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
 
@@ -110,7 +114,7 @@ public class PerQueryCPUMemAccountantTest extends 
QueryRunnerAccountingTest {
     }
   }
 
-  @Test(expectedExceptions = EarlyTerminationException.class)
+  @Test
   void testInterrupt() {
     HashMap<String, Object> configs = getAccountingConfig();
 
@@ -121,7 +125,10 @@ public class PerQueryCPUMemAccountantTest extends 
QueryRunnerAccountingTest {
 
     try (MockedStatic<Tracing> tracing = Mockito.mockStatic(Tracing.class, 
Mockito.CALLS_REAL_METHODS)) {
       tracing.when(Tracing::getThreadAccountant).thenReturn(accountant);
-      queryRunner("SELECT * FROM a LIMIT 2", false).getResultTable();
+      QueryDispatcher.QueryResult queryResult = queryRunner("SELECT * FROM a 
LIMIT 2", false);
+      QueryProcessingException processingException = 
queryResult.getProcessingException();
+      assertNotNull(processingException);
+      assertEquals(processingException.getErrorCode(), 
QueryErrorCode.INTERNAL.getId());
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to