This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch revert-16633-remove_task_type
in repository https://gitbox.apache.org/repos/asf/pinot.git

commit 0d6c8babee0c180120765d7050816caca048d160
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Tue Aug 19 15:57:19 2025 -0700

    Revert "Remove TaskType from ThreadExecutionContext (#16633)"
    
    This reverts commit 72bf3bf26fa3d269daf5fd15a6dfbc43db6c8b66.
---
 .../BaseSingleStageBrokerRequestHandler.java       |  4 +++-
 .../MultiStageBrokerRequestHandler.java            |  4 +++-
 .../CPUMemThreadLevelAccountingObjects.java        | 25 +++++++++++++++++-----
 .../PerQueryCPUMemAccountantFactory.java           | 13 +++++------
 .../accounting/ResourceUsageAccountantFactory.java | 21 +++++++++---------
 .../core/query/reduce/GroupByDataTableReducer.java |  2 +-
 .../accounting/PerQueryCPUMemAccountantTest.java   | 21 +++++++++---------
 .../core/accounting/TestResourceAccountant.java    |  9 +++++---
 .../runtime/executor/OpChainSchedulerService.java  |  4 +++-
 .../pinot/query/service/server/QueryServer.java    |  3 ++-
 .../runtime/operator/MultiStageAccountingTest.java |  5 ++---
 .../MultistageResourceUsageAccountingTest.java     | 15 ++++++-------
 .../query/runtime/queries/QueryRunnerTestBase.java |  3 +--
 .../spi/accounting/ThreadExecutionContext.java     | 13 +++++++++++
 .../spi/accounting/ThreadResourceTracker.java      |  2 ++
 .../accounting/ThreadResourceUsageAccountant.java  |  7 ++++--
 .../java/org/apache/pinot/spi/trace/Tracing.java   | 24 +++++++++++++++++----
 .../ThrottleOnCriticalHeapUsageExecutorTest.java   |  6 ++++--
 18 files changed, 120 insertions(+), 61 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index aa5f38f5ee8..199fbdf9d76 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -99,6 +99,7 @@ import org.apache.pinot.core.transport.ServerInstance;
 import org.apache.pinot.core.util.GapfillUtils;
 import org.apache.pinot.query.parser.utils.ParserUtils;
 import org.apache.pinot.segment.local.function.GroovyFunctionEvaluator;
+import org.apache.pinot.spi.accounting.ThreadExecutionContext;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.auth.AuthorizationResult;
 import org.apache.pinot.spi.auth.TableRowColAccessResult;
@@ -325,7 +326,8 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
 
     //Start instrumentation context. This must not be moved further below 
interspersed into the code.
     String workloadName = 
QueryOptionsUtils.getWorkloadName(sqlNodeAndOptions.getOptions());
-    _resourceUsageAccountant.setupRunner(QueryThreadContext.getCid(), 
workloadName);
+    _resourceUsageAccountant.setupRunner(QueryThreadContext.getCid(), 
ThreadExecutionContext.TaskType.SSE,
+        workloadName);
 
     try {
       return doHandleRequest(requestId, query, sqlNodeAndOptions, request, 
requesterIdentity, requestContext,
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index 601bdb0d9b7..a366fd07144 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -84,6 +84,7 @@ import org.apache.pinot.query.routing.WorkerManager;
 import org.apache.pinot.query.runtime.MultiStageStatsTreeBuilder;
 import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
 import org.apache.pinot.query.service.dispatch.QueryDispatcher;
+import org.apache.pinot.spi.accounting.ThreadExecutionContext;
 import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
 import org.apache.pinot.spi.auth.TableAuthorizationResult;
 import org.apache.pinot.spi.auth.broker.RequesterIdentity;
@@ -533,7 +534,8 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
 
     try {
       String workloadName = 
QueryOptionsUtils.getWorkloadName(query.getOptions());
-      _resourceUsageAccountant.setupRunner(QueryThreadContext.getCid(), 
workloadName);
+      _resourceUsageAccountant.setupRunner(QueryThreadContext.getCid(), 
ThreadExecutionContext.TaskType.MSE,
+          workloadName);
 
       long executionStartTimeNs = System.nanoTime();
       QueryDispatcher.QueryResult queryResults;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
index 8a80e7d0f68..b6a0bc661a9 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java
@@ -108,8 +108,15 @@ public class CPUMemThreadLevelAccountingObjects {
       return taskEntry == null ? -1 : taskEntry.getTaskId();
     }
 
-    public void setThreadTaskStatus(String queryId, int taskId, Thread 
anchorThread, String workloadName) {
-      _currentThreadTaskStatus.set(new TaskEntry(queryId, taskId, 
anchorThread, workloadName));
+    @Override
+    public ThreadExecutionContext.TaskType getTaskType() {
+      TaskEntry taskEntry = _currentThreadTaskStatus.get();
+      return taskEntry == null ? ThreadExecutionContext.TaskType.UNKNOWN : 
taskEntry.getTaskType();
+    }
+
+    public void setThreadTaskStatus(String queryId, int taskId, 
ThreadExecutionContext.TaskType taskType,
+        Thread anchorThread, String workloadName) {
+      _currentThreadTaskStatus.set(new TaskEntry(queryId, taskId, taskType, 
anchorThread, workloadName));
       _threadResourceSnapshot.reset();
     }
 
@@ -136,6 +143,7 @@ public class CPUMemThreadLevelAccountingObjects {
     private final String _queryId;
     private final int _taskId;
     private final Thread _anchorThread;
+    private final TaskType _taskType;
 
     private final String _workloadName;
 
@@ -143,10 +151,11 @@ public class CPUMemThreadLevelAccountingObjects {
       return _taskId == CommonConstants.Accounting.ANCHOR_TASK_ID;
     }
 
-    public TaskEntry(String queryId, int taskId, Thread anchorThread, String 
workloadName) {
+    public TaskEntry(String queryId, int taskId, TaskType taskType, Thread 
anchorThread, String workloadName) {
       _queryId = queryId;
       _taskId = taskId;
       _anchorThread = anchorThread;
+      _taskType = taskType;
       _workloadName = workloadName;
     }
 
@@ -162,14 +171,20 @@ public class CPUMemThreadLevelAccountingObjects {
       return _anchorThread;
     }
 
+    @Override
+    public TaskType getTaskType() {
+      return _taskType;
+    }
+
+
     public String getWorkloadName() {
       return _workloadName;
     }
 
     @Override
     public String toString() {
-      return "TaskEntry{" + "_queryId='" + _queryId + '\'' + ", _taskId=" + 
_taskId + ", _anchorThread=" + _anchorThread
-          + ", _workloadName='" + _workloadName + '\'' + '}';
+      return "TaskEntry{" + "_queryId='" + _queryId + '\'' + ", _taskId=" + 
_taskId + ", _rootThread=" + _anchorThread
+          + ", _taskType=" + _taskType + ", _workloadName=" + _workloadName + 
'}';
     }
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index 1328e17e621..79f5d671966 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -341,22 +341,23 @@ public class PerQueryCPUMemAccountantFactory implements 
ThreadAccountantFactory
     }
 
     @Override
-    public void setupRunner(@Nullable String queryId, String workloadName) {
+    public void setupRunner(@Nullable String queryId, 
ThreadExecutionContext.TaskType taskType,
+        String workloadName) {
       _threadLocalEntry.get()._errorStatus.set(null);
       if (queryId != null) {
         _threadLocalEntry.get()
-            .setThreadTaskStatus(queryId, 
CommonConstants.Accounting.ANCHOR_TASK_ID, Thread.currentThread(),
+            .setThreadTaskStatus(queryId, 
CommonConstants.Accounting.ANCHOR_TASK_ID, taskType, Thread.currentThread(),
                 workloadName);
       }
     }
 
     @Override
-    public void setupWorker(int taskId, @Nullable ThreadExecutionContext 
parentContext) {
+    public void setupWorker(int taskId, ThreadExecutionContext.TaskType 
taskType,
+        @Nullable ThreadExecutionContext parentContext) {
       _threadLocalEntry.get()._errorStatus.set(null);
       if (parentContext != null && parentContext.getQueryId() != null && 
parentContext.getAnchorThread() != null) {
-        _threadLocalEntry.get()
-            .setThreadTaskStatus(parentContext.getQueryId(), taskId, 
parentContext.getAnchorThread(),
-                parentContext.getWorkloadName());
+        
_threadLocalEntry.get().setThreadTaskStatus(parentContext.getQueryId(), taskId, 
parentContext.getTaskType(),
+            parentContext.getAnchorThread(), parentContext.getWorkloadName());
       }
     }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
index 3252f994d53..b5f341a1188 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
@@ -141,22 +141,23 @@ public class ResourceUsageAccountantFactory implements 
ThreadAccountantFactory {
     }
 
     @Override
-    public void setupRunner(@Nullable String queryId, String workloadName) {
-      CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = 
_threadLocalEntry.get();
-      threadEntry._errorStatus.set(null);
+    public void setupRunner(@Nullable String queryId, 
ThreadExecutionContext.TaskType taskType, String workloadName) {
+      _threadLocalEntry.get()._errorStatus.set(null);
       if (queryId != null) {
-        threadEntry.setThreadTaskStatus(queryId, 
CommonConstants.Accounting.ANCHOR_TASK_ID, Thread.currentThread(),
-            workloadName);
+        _threadLocalEntry.get()
+            .setThreadTaskStatus(queryId, 
CommonConstants.Accounting.ANCHOR_TASK_ID, taskType, Thread.currentThread(),
+                workloadName);
       }
     }
 
     @Override
-    public void setupWorker(int taskId, @Nullable ThreadExecutionContext 
parentContext) {
-      CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = 
_threadLocalEntry.get();
-      threadEntry._errorStatus.set(null);
+    public void setupWorker(int taskId, ThreadExecutionContext.TaskType 
taskType,
+        @Nullable ThreadExecutionContext parentContext) {
+      _threadLocalEntry.get()._errorStatus.set(null);
       if (parentContext != null && parentContext.getQueryId() != null && 
parentContext.getAnchorThread() != null) {
-        threadEntry.setThreadTaskStatus(parentContext.getQueryId(), taskId, 
parentContext.getAnchorThread(),
-            parentContext.getWorkloadName());
+        _threadLocalEntry.get()
+            .setThreadTaskStatus(parentContext.getQueryId(), taskId, 
parentContext.getTaskType(),
+                parentContext.getAnchorThread(), 
parentContext.getWorkloadName());
       }
     }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index 76f8af85b38..d385f29dae1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -268,7 +268,7 @@ public class GroupByDataTableReducer implements 
DataTableReducer {
       futures[i] = reducerContext.getExecutorService().submit(new 
TraceRunnable() {
         @Override
         public void runJob() {
-          _resourceUsageAccountant.setupWorker(taskId, parentContext);
+          _resourceUsageAccountant.setupWorker(taskId, 
ThreadExecutionContext.TaskType.SSE, parentContext);
           try {
             for (DataTable dataTable : reduceGroup) {
               boolean nullHandlingEnabled = 
_queryContext.isNullHandlingEnabled();
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantTest.java
index 71192ddac33..5cefb3fc374 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantTest.java
@@ -22,6 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import org.apache.pinot.spi.accounting.QueryResourceTracker;
+import org.apache.pinot.spi.accounting.ThreadExecutionContext;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.testng.annotations.Test;
 
@@ -70,8 +71,8 @@ public class PerQueryCPUMemAccountantTest {
     // New Task
     CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = 
workerEntry._threadEntry;
     threadEntry._currentThreadTaskStatus.set(
-        new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5, 
anchorThread._workerThread,
-            CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
+        new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5, 
ThreadExecutionContext.TaskType.SSE,
+            anchorThread._workerThread, 
CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
     threadEntry._currentThreadMemoryAllocationSampleBytes = 1500;
 
     Map<String, ? extends QueryResourceTracker> queryResourceTrackerMap = 
accountant.getQueryResources();
@@ -100,8 +101,8 @@ public class PerQueryCPUMemAccountantTest {
     // New Task
     CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = 
workerEntry._threadEntry;
     threadEntry._currentThreadTaskStatus.set(
-        new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5, 
anchorThread._workerThread,
-            CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
+        new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5, 
ThreadExecutionContext.TaskType.SSE,
+            anchorThread._workerThread, 
CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
     threadEntry.setToIdle();
 
     Map<String, ? extends QueryResourceTracker> queryResourceTrackerMap = 
accountant.getQueryResources();
@@ -135,8 +136,8 @@ public class PerQueryCPUMemAccountantTest {
     // New Task
     CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = 
workerEntry._threadEntry;
     threadEntry._currentThreadTaskStatus.set(
-        new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5, 
anchorThread._workerThread,
-            CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
+        new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5, 
ThreadExecutionContext.TaskType.SSE,
+            anchorThread._workerThread, 
CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
     threadEntry._currentThreadMemoryAllocationSampleBytes = 1500;
 
     accountant.reapFinishedTasks();
@@ -203,8 +204,8 @@ public class PerQueryCPUMemAccountantTest {
     // New Task
     CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = 
workerEntry._threadEntry;
     threadEntry._currentThreadTaskStatus.set(
-        new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5, 
anchorThread._workerThread,
-            CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
+        new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5, 
ThreadExecutionContext.TaskType.SSE,
+            anchorThread._workerThread, 
CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
     threadEntry._currentThreadMemoryAllocationSampleBytes = 1500;
 
     accountant.reapFinishedTasks();
@@ -249,8 +250,8 @@ public class PerQueryCPUMemAccountantTest {
     // New Task
     CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = 
workerEntry._threadEntry;
     threadEntry._currentThreadTaskStatus.set(
-        new CPUMemThreadLevelAccountingObjects.TaskEntry(newQueryId, 5, 
anchorThread._workerThread,
-            CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
+        new CPUMemThreadLevelAccountingObjects.TaskEntry(newQueryId, 5, 
ThreadExecutionContext.TaskType.SSE,
+            anchorThread._workerThread, 
CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
     threadEntry._currentThreadMemoryAllocationSampleBytes = 3500;
 
     accountant.reapFinishedTasks();
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java
index 2f2a29bb8b3..ba46611343a 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.accounting;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import org.apache.pinot.spi.accounting.ThreadExecutionContext;
 import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -42,7 +43,8 @@ class TestResourceAccountant extends 
PerQueryCPUMemAccountantFactory.PerQueryCPU
     CPUMemThreadLevelAccountingObjects.ThreadEntry anchorEntry = new 
CPUMemThreadLevelAccountingObjects.ThreadEntry();
     anchorEntry._currentThreadTaskStatus.set(
         new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 
CommonConstants.Accounting.ANCHOR_TASK_ID,
-            anchorThread._workerThread, 
CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
+            ThreadExecutionContext.TaskType.SSE, anchorThread._workerThread,
+            CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
     anchorEntry._currentThreadMemoryAllocationSampleBytes = 1000;
     threadEntries.put(anchorThread._workerThread, anchorEntry);
 
@@ -57,8 +59,9 @@ class TestResourceAccountant extends 
PerQueryCPUMemAccountantFactory.PerQueryCPU
 
   private static TaskThread getTaskThread(String queryId, int taskId, 
CountDownLatch threadLatch, Thread anchorThread) {
     CPUMemThreadLevelAccountingObjects.ThreadEntry worker1 = new 
CPUMemThreadLevelAccountingObjects.ThreadEntry();
-    worker1._currentThreadTaskStatus.set(new 
CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, taskId, anchorThread,
-        CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
+    worker1._currentThreadTaskStatus.set(
+        new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, taskId, 
ThreadExecutionContext.TaskType.SSE,
+            anchorThread, CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME));
     Thread workerThread1 = new Thread(() -> {
       try {
         threadLatch.await();
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index d7a0ad23503..18d34f35c67 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -35,6 +35,7 @@ import 
org.apache.pinot.query.runtime.operator.MultiStageOperator;
 import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.operator.OpChainId;
 import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
+import org.apache.pinot.spi.accounting.ThreadExecutionContext;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.trace.Tracing;
 import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner;
@@ -82,7 +83,8 @@ public class OpChainSchedulerService {
         // try-with-resources to ensure that the operator chain is closed
         // TODO: Change the code so we ownership is expressed in the code in a 
better way
         try (OpChain closeMe = operatorChain) {
-          
Tracing.ThreadAccountantOps.setupWorker(operatorChain.getId().getStageId(), 
operatorChain.getParentContext());
+          
Tracing.ThreadAccountantOps.setupWorker(operatorChain.getId().getStageId(),
+              ThreadExecutionContext.TaskType.MSE, 
operatorChain.getParentContext());
           LOGGER.trace("({}): Executing", operatorChain);
           MseBlock result = operatorChain.getRoot().nextBlock();
           while (result.isData()) {
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index b8fbe6f4145..a3ab6471c93 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -287,7 +287,8 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
     String workloadName = QueryOptionsUtils.getWorkloadName(reqMetadata);
     //TODO: Verify if this matches with what OOM protection expects. This 
method will not block for the query to
     // finish, so it may be breaking some of the OOM protection assumptions.
-    Tracing.ThreadAccountantOps.setupRunner(QueryThreadContext.getCid(), 
workloadName);
+    Tracing.ThreadAccountantOps.setupRunner(QueryThreadContext.getCid(), 
ThreadExecutionContext.TaskType.MSE,
+        workloadName);
     ThreadExecutionContext parentContext = 
Tracing.getThreadAccountant().getThreadExecutionContext();
 
     try {
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
index c99a684e2d2..a1ee88e56a9 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java
@@ -98,10 +98,9 @@ public class MultiStageAccountingTest implements ITest {
     Tracing.ThreadAccountantOps.startThreadAccountant();
 
     // Setup Thread Context
-    Tracing.ThreadAccountantOps.setupRunner("MultiStageAccountingTest",
-        CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME);
+    Tracing.ThreadAccountantOps.setupRunner("MultiStageAccountingTest", 
ThreadExecutionContext.TaskType.MSE, null);
     ThreadExecutionContext threadExecutionContext = 
Tracing.getThreadAccountant().getThreadExecutionContext();
-    Tracing.ThreadAccountantOps.setupWorker(1, threadExecutionContext);
+    Tracing.ThreadAccountantOps.setupWorker(1, 
ThreadExecutionContext.TaskType.MSE, threadExecutionContext);
   }
 
   @BeforeMethod
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java
index b5b81999178..dd17202f644 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java
@@ -57,12 +57,10 @@ import org.testng.annotations.DataProvider;
 import org.testng.annotations.Factory;
 import org.testng.annotations.Test;
 
-import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.DOUBLE;
-import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT;
-import static org.mockito.Mockito.when;
-import static org.mockito.MockitoAnnotations.openMocks;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertTrue;
+import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.*;
+import static org.mockito.Mockito.*;
+import static org.mockito.MockitoAnnotations.*;
+import static org.testng.Assert.*;
 
 
 public class MultistageResourceUsageAccountingTest implements ITest {
@@ -99,10 +97,9 @@ public class MultistageResourceUsageAccountingTest 
implements ITest {
     Tracing.ThreadAccountantOps.startThreadAccountant();
 
     // Setup Thread Context
-    Tracing.ThreadAccountantOps.setupRunner("MultiStageAccountingTest",
-        CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME);
+    Tracing.ThreadAccountantOps.setupRunner("MultiStageAccountingTest", 
ThreadExecutionContext.TaskType.MSE, null);
     ThreadExecutionContext threadExecutionContext = 
Tracing.getThreadAccountant().getThreadExecutionContext();
-    Tracing.ThreadAccountantOps.setupWorker(1, threadExecutionContext);
+    Tracing.ThreadAccountantOps.setupWorker(1, 
ThreadExecutionContext.TaskType.MSE, threadExecutionContext);
   }
 
   @BeforeMethod
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
index 58a5ad886b9..dfb88fe10c8 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
@@ -177,8 +177,7 @@ public abstract class QueryRunnerTestBase extends 
QueryTestSet {
     for (Map.Entry<QueryServerInstance, List<Integer>> entry : 
dispatchableStagePlan.getServerInstanceToWorkerIdMap()
         .entrySet()) {
       QueryServerEnclosure serverEnclosure = _servers.get(entry.getKey());
-      Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId),
-          CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME);
+      Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId), 
ThreadExecutionContext.TaskType.MSE, null);
       ThreadExecutionContext parentContext = 
Tracing.getThreadAccountant().getThreadExecutionContext();
       List<WorkerMetadata> workerMetadataList =
           
entry.getValue().stream().map(stageWorkerMetadataList::get).collect(Collectors.toList());
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java
index 63c900fac03..9aa18b5b230 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java
@@ -23,6 +23,17 @@ package org.apache.pinot.spi.accounting;
  */
 public interface ThreadExecutionContext {
 
+   /**
+    * SSE: Single Stage Engine
+    * MSE: Multi Stage Engine
+    * UNKNOWN: Default
+    */
+   enum TaskType {
+      SSE,
+      MSE,
+      UNKNOWN
+   }
+
    /**
     * get query id of the execution context
     * @return query id in string
@@ -35,5 +46,7 @@ public interface ThreadExecutionContext {
     */
    Thread getAnchorThread();
 
+   TaskType getTaskType();
+
    String getWorkloadName();
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
index 3d169bf6497..d084bf40d41 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java
@@ -51,4 +51,6 @@ public interface ThreadResourceTracker {
    * @return an int containing the task id.
    */
   int getTaskId();
+
+  ThreadExecutionContext.TaskType getTaskType();
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
index 49301ee6994..ea1a7bf3f5d 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
@@ -49,16 +49,19 @@ public interface ThreadResourceUsageAccountant {
   /**
    * Set up the thread execution context for an anchor a.k.a runner thread.
    * @param queryId query id string
+   * @param taskType the type of the task - SSE or MSE
    * @param workloadName the name of the workload, can be null
    */
-  void setupRunner(@Nullable String queryId, String workloadName);
+  void setupRunner(@Nullable String queryId, ThreadExecutionContext.TaskType 
taskType, String workloadName);
 
   /**
    * Set up the thread execution context for a worker thread.
    * @param taskId a unique task id
+   * @param taskType the type of the task - SSE or MSE
    * @param parentContext the parent execution context
    */
-  void setupWorker(int taskId, @Nullable ThreadExecutionContext parentContext);
+  void setupWorker(int taskId, ThreadExecutionContext.TaskType taskType,
+      @Nullable ThreadExecutionContext parentContext);
 
   /**
    * get the executon context of current thread
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index 3b7561ac23c..5501d180517 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -213,11 +213,12 @@ public class Tracing {
     }
 
     @Override
-    public void setupRunner(@Nullable String queryId, String workloadName) {
+    public void setupRunner(@Nullable String queryId, 
ThreadExecutionContext.TaskType taskType, String workloadName) {
     }
 
     @Override
-    public void setupWorker(int taskId, @Nullable ThreadExecutionContext 
parentContext) {
+    public void setupWorker(int taskId, ThreadExecutionContext.TaskType 
taskType,
+        @Nullable ThreadExecutionContext parentContext) {
     }
 
     @Override
@@ -258,7 +259,12 @@ public class Tracing {
     }
 
     public static void setupRunner(String queryId, String workloadName) {
-      Tracing.getThreadAccountant().setupRunner(queryId, workloadName);
+      setupRunner(queryId, ThreadExecutionContext.TaskType.SSE, workloadName);
+    }
+
+    public static void setupRunner(String queryId, 
ThreadExecutionContext.TaskType taskType, String workloadName) {
+      // Set up the runner thread with the given query ID and workload name
+      Tracing.getThreadAccountant().setupRunner(queryId, taskType, 
workloadName);
     }
 
     /**
@@ -267,7 +273,17 @@ public class Tracing {
      * @param threadExecutionContext Context holds metadata about the query.
      */
     public static void setupWorker(int taskId, ThreadExecutionContext 
threadExecutionContext) {
-      Tracing.getThreadAccountant().setupWorker(taskId, 
threadExecutionContext);
+      setupWorker(taskId, ThreadExecutionContext.TaskType.SSE, 
threadExecutionContext);
+    }
+
+    /**
+     * Setup metadata of query worker threads.
+     * @param taskId Query task ID of the thread. In SSE, ID is an 
incrementing counter. In MSE, id is the stage id.
+     * @param threadExecutionContext Context holds metadata about the query.
+     */
+    public static void setupWorker(int taskId, ThreadExecutionContext.TaskType 
taskType,
+        @Nullable ThreadExecutionContext threadExecutionContext) {
+      Tracing.getThreadAccountant().setupWorker(taskId, taskType, 
threadExecutionContext);
     }
 
     public static void sample() {
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
index 3020cf4b21b..7266a4a7a9b 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
@@ -56,11 +56,13 @@ public class ThrottleOnCriticalHeapUsageExecutorTest {
       }
 
       @Override
-      public void setupRunner(@Nullable String queryId, String workloadName) {
+      public void setupRunner(@Nullable String queryId, 
ThreadExecutionContext.TaskType taskType,
+          String workloadName) {
       }
 
       @Override
-      public void setupWorker(int taskId, @Nullable ThreadExecutionContext 
parentContext) {
+      public void setupWorker(int taskId, ThreadExecutionContext.TaskType 
taskType,
+          @Nullable ThreadExecutionContext parentContext) {
       }
 
       @Nullable


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to