This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 633c90d43be Fix LeafOperator thread accountant (#16651)
633c90d43be is described below

commit 633c90d43be4c52db784ec7ed0574b9dd3e5fa41
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Aug 22 11:10:40 2025 -0700

    Fix LeafOperator thread accountant (#16651)
---
 .../pinot/query/runtime/operator/LeafOperator.java | 135 ++++++++++-----------
 1 file changed, 61 insertions(+), 74 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
index 157bcf689e6..2d0939c85e8 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
@@ -93,6 +93,7 @@ public class LeafOperator extends MultiStageOperator {
   private final String _tableName;
   private final StatMap<StatKey> _statMap = new StatMap<>(StatKey.class);
   private final AtomicReference<ErrorMseBlock> _errorBlock = new 
AtomicReference<>();
+  private final ResultsBlockStreamer _resultsBlockStreamer = 
this::addResultsBlock;
 
   // Use a limit-sized BlockingQueue to store the results blocks and apply 
back pressure to the single-stage threads
   @VisibleForTesting
@@ -422,42 +423,19 @@ public class LeafOperator extends MultiStageOperator {
   }
 
   @VisibleForTesting
-  void execute(ThreadExecutionContext parentContext) {
-    ResultsBlockConsumer resultsBlockConsumer = new ResultsBlockConsumer();
-    ServerQueryLogger queryLogger = ServerQueryLogger.getInstance();
+  void execute(@Nullable ThreadExecutionContext parentContext) {
     if (_requests.size() == 1) {
       ServerQueryRequest request = _requests.get(0);
-      Tracing.ThreadAccountantOps.setupWorker(1, parentContext);
-
-      InstanceResponseBlock instanceResponseBlock =
-          _queryExecutor.execute(request, _executorService, 
resultsBlockConsumer);
-      if (queryLogger != null) {
-        queryLogger.logQuery(request, instanceResponseBlock, 
"MultistageEngine");
-      }
-      // Collect the execution stats
-      mergeExecutionStats(instanceResponseBlock.getResponseMetadata());
-      // TODO: Revisit if we should treat all exceptions as query failure. 
Currently MERGE_RESPONSE_ERROR and
-      //       SERVER_SEGMENT_MISSING_ERROR are counted as query failure.
-      Map<Integer, String> exceptions = instanceResponseBlock.getExceptions();
-      if (!exceptions.isEmpty()) {
-        
setErrorBlock(ErrorMseBlock.fromMap(QueryErrorCode.fromKeyMap(exceptions)));
-      } else {
-        // NOTE: Instance response block might contain data (not metadata 
only) when all the segments are pruned.
-        //       Add the results block if it contains data.
-        BaseResultsBlock resultsBlock = 
instanceResponseBlock.getResultsBlock();
-        if (resultsBlock != null && resultsBlock.getNumRows() > 0) {
-          try {
-            addResultsBlock(resultsBlock);
-          } catch (InterruptedException e) {
-            setErrorBlock(CANCELLED_BLOCK);
-          } catch (TimeoutException e) {
-            setErrorBlock(TIMEOUT_BLOCK);
-          } catch (Exception e) {
-            if (!(e instanceof EarlyTerminationException)) {
-              LOGGER.warn("Failed to add results block", e);
-            }
-          }
+      if (parentContext != null) {
+        // NOTE: Treat this as SSE runner (anchor) thread.
+        Tracing.ThreadAccountantOps.setupRunner(parentContext.getQueryId(), 
parentContext.getWorkloadName());
+        try {
+          executeOneRequest(request, null);
+        } finally {
+          Tracing.ThreadAccountantOps.clear();
         }
+      } else {
+        executeOneRequest(request, null);
       }
     } else {
       // Hit 2 physical tables, one REALTIME and one OFFLINE
@@ -469,43 +447,24 @@ public class LeafOperator extends MultiStageOperator {
       CountDownLatch latch = new CountDownLatch(2);
       for (int i = 0; i < 2; i++) {
         ServerQueryRequest request = _requests.get(i);
-        int taskId = i;
         futures[i] = _executorService.submit(() -> {
-          Tracing.ThreadAccountantOps.setupWorker(taskId, parentContext);
-
-          try {
-            InstanceResponseBlock instanceResponseBlock =
-                _queryExecutor.execute(request, _executorService, 
resultsBlockConsumer);
-            if (queryLogger != null) {
-              queryLogger.logQuery(request, instanceResponseBlock, 
"MultistageEngine");
+          if (parentContext != null) {
+            // NOTE: Treat this as SSE runner (anchor) thread.
+            
Tracing.ThreadAccountantOps.setupRunner(parentContext.getQueryId(), 
parentContext.getWorkloadName());
+            try {
+              // Drain the latch when receiving exception block and not wait 
for the other thread to finish
+              executeOneRequest(request, latch::countDown);
+            } finally {
+              Tracing.ThreadAccountantOps.clear();
+              latch.countDown();
             }
-            // Collect the execution stats
-            mergeExecutionStats(instanceResponseBlock.getResponseMetadata());
-            Map<Integer, String> exceptions = 
instanceResponseBlock.getExceptions();
-            if (!exceptions.isEmpty()) {
-              
setErrorBlock(ErrorMseBlock.fromMap(QueryErrorCode.fromKeyMap(exceptions)));
+          } else {
+            try {
               // Drain the latch when receiving exception block and not wait 
for the other thread to finish
+              executeOneRequest(request, latch::countDown);
+            } finally {
               latch.countDown();
-            } else {
-              // NOTE: Instance response block might contain data (not 
metadata only) when all the segments are
-              //       pruned. Add the results block if it contains data.
-              BaseResultsBlock resultsBlock = 
instanceResponseBlock.getResultsBlock();
-              if (resultsBlock != null && resultsBlock.getNumRows() > 0) {
-                try {
-                  addResultsBlock(resultsBlock);
-                } catch (InterruptedException e) {
-                  setErrorBlock(CANCELLED_BLOCK);
-                } catch (TimeoutException e) {
-                  setErrorBlock(TIMEOUT_BLOCK);
-                } catch (Exception e) {
-                  if (!(e instanceof EarlyTerminationException)) {
-                    LOGGER.warn("Failed to add results block", e);
-                  }
-                }
-              }
             }
-          } finally {
-            latch.countDown();
           }
         });
       }
@@ -523,6 +482,43 @@ public class LeafOperator extends MultiStageOperator {
     }
   }
 
+  private void executeOneRequest(ServerQueryRequest request, @Nullable 
Runnable onException) {
+    InstanceResponseBlock instanceResponseBlock =
+        _queryExecutor.execute(request, _executorService, 
_resultsBlockStreamer);
+    ServerQueryLogger queryLogger = ServerQueryLogger.getInstance();
+    if (queryLogger != null) {
+      queryLogger.logQuery(request, instanceResponseBlock, "MultistageEngine");
+    }
+    // Collect the execution stats
+    mergeExecutionStats(instanceResponseBlock.getResponseMetadata());
+    // TODO: Revisit if we should treat all exceptions as query failure. 
Currently MERGE_RESPONSE_ERROR and
+    //       SERVER_SEGMENT_MISSING_ERROR are counted as query failure.
+    Map<Integer, String> exceptions = instanceResponseBlock.getExceptions();
+    if (!exceptions.isEmpty()) {
+      
setErrorBlock(ErrorMseBlock.fromMap(QueryErrorCode.fromKeyMap(exceptions)));
+      if (onException != null) {
+        onException.run();
+      }
+    } else {
+      // NOTE: Instance response block might contain data (not metadata only) 
when all the segments are pruned.
+      //       Add the results block if it contains data.
+      BaseResultsBlock resultsBlock = instanceResponseBlock.getResultsBlock();
+      if (resultsBlock != null && resultsBlock.getNumRows() > 0) {
+        try {
+          addResultsBlock(resultsBlock);
+        } catch (InterruptedException e) {
+          setErrorBlock(CANCELLED_BLOCK);
+        } catch (TimeoutException e) {
+          setErrorBlock(TIMEOUT_BLOCK);
+        } catch (Exception e) {
+          if (!(e instanceof EarlyTerminationException)) {
+            LOGGER.warn("Failed to add results block", e);
+          }
+        }
+      }
+    }
+  }
+
   @VisibleForTesting
   void addResultsBlock(BaseResultsBlock resultsBlock)
       throws InterruptedException, TimeoutException {
@@ -670,15 +666,6 @@ public class LeafOperator extends MultiStageOperator {
     }
   }
 
-  private class ResultsBlockConsumer implements ResultsBlockStreamer {
-
-    @Override
-    public void send(BaseResultsBlock block)
-        throws InterruptedException, TimeoutException {
-      addResultsBlock(block);
-    }
-  }
-
   public enum StatKey implements StatMap.Key {
     TABLE(StatMap.Type.STRING, null),
     EXECUTION_TIME_MS(StatMap.Type.LONG, null) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to