This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch revert-16633-remove_task_type in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 0d6c8babee0c180120765d7050816caca048d160 Author: Xiaotian (Jackie) Jiang <[email protected]> AuthorDate: Tue Aug 19 15:57:19 2025 -0700 Revert "Remove TaskType from ThreadExecutionContext (#16633)" This reverts commit 72bf3bf26fa3d269daf5fd15a6dfbc43db6c8b66. --- .../BaseSingleStageBrokerRequestHandler.java | 4 +++- .../MultiStageBrokerRequestHandler.java | 4 +++- .../CPUMemThreadLevelAccountingObjects.java | 25 +++++++++++++++++----- .../PerQueryCPUMemAccountantFactory.java | 13 +++++------ .../accounting/ResourceUsageAccountantFactory.java | 21 +++++++++--------- .../core/query/reduce/GroupByDataTableReducer.java | 2 +- .../accounting/PerQueryCPUMemAccountantTest.java | 21 +++++++++--------- .../core/accounting/TestResourceAccountant.java | 9 +++++--- .../runtime/executor/OpChainSchedulerService.java | 4 +++- .../pinot/query/service/server/QueryServer.java | 3 ++- .../runtime/operator/MultiStageAccountingTest.java | 5 ++--- .../MultistageResourceUsageAccountingTest.java | 15 ++++++------- .../query/runtime/queries/QueryRunnerTestBase.java | 3 +-- .../spi/accounting/ThreadExecutionContext.java | 13 +++++++++++ .../spi/accounting/ThreadResourceTracker.java | 2 ++ .../accounting/ThreadResourceUsageAccountant.java | 7 ++++-- .../java/org/apache/pinot/spi/trace/Tracing.java | 24 +++++++++++++++++---- .../ThrottleOnCriticalHeapUsageExecutorTest.java | 6 ++++-- 18 files changed, 120 insertions(+), 61 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java index aa5f38f5ee8..199fbdf9d76 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java @@ -99,6 +99,7 @@ import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.core.util.GapfillUtils; import org.apache.pinot.query.parser.utils.ParserUtils; import org.apache.pinot.segment.local.function.GroovyFunctionEvaluator; +import org.apache.pinot.spi.accounting.ThreadExecutionContext; import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant; import org.apache.pinot.spi.auth.AuthorizationResult; import org.apache.pinot.spi.auth.TableRowColAccessResult; @@ -325,7 +326,8 @@ public abstract class BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ //Start instrumentation context. This must not be moved further below interspersed into the code. String workloadName = QueryOptionsUtils.getWorkloadName(sqlNodeAndOptions.getOptions()); - _resourceUsageAccountant.setupRunner(QueryThreadContext.getCid(), workloadName); + _resourceUsageAccountant.setupRunner(QueryThreadContext.getCid(), ThreadExecutionContext.TaskType.SSE, + workloadName); try { return doHandleRequest(requestId, query, sqlNodeAndOptions, request, requesterIdentity, requestContext, diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 601bdb0d9b7..a366fd07144 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -84,6 +84,7 @@ import org.apache.pinot.query.routing.WorkerManager; import org.apache.pinot.query.runtime.MultiStageStatsTreeBuilder; import org.apache.pinot.query.runtime.plan.MultiStageQueryStats; import org.apache.pinot.query.service.dispatch.QueryDispatcher; +import org.apache.pinot.spi.accounting.ThreadExecutionContext; import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant; import org.apache.pinot.spi.auth.TableAuthorizationResult; import org.apache.pinot.spi.auth.broker.RequesterIdentity; @@ -533,7 +534,8 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler { try { String workloadName = QueryOptionsUtils.getWorkloadName(query.getOptions()); - _resourceUsageAccountant.setupRunner(QueryThreadContext.getCid(), workloadName); + _resourceUsageAccountant.setupRunner(QueryThreadContext.getCid(), ThreadExecutionContext.TaskType.MSE, + workloadName); long executionStartTimeNs = System.nanoTime(); QueryDispatcher.QueryResult queryResults; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java index 8a80e7d0f68..b6a0bc661a9 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/CPUMemThreadLevelAccountingObjects.java @@ -108,8 +108,15 @@ public class CPUMemThreadLevelAccountingObjects { return taskEntry == null ? -1 : taskEntry.getTaskId(); } - public void setThreadTaskStatus(String queryId, int taskId, Thread anchorThread, String workloadName) { - _currentThreadTaskStatus.set(new TaskEntry(queryId, taskId, anchorThread, workloadName)); + @Override + public ThreadExecutionContext.TaskType getTaskType() { + TaskEntry taskEntry = _currentThreadTaskStatus.get(); + return taskEntry == null ? ThreadExecutionContext.TaskType.UNKNOWN : taskEntry.getTaskType(); + } + + public void setThreadTaskStatus(String queryId, int taskId, ThreadExecutionContext.TaskType taskType, + Thread anchorThread, String workloadName) { + _currentThreadTaskStatus.set(new TaskEntry(queryId, taskId, taskType, anchorThread, workloadName)); _threadResourceSnapshot.reset(); } @@ -136,6 +143,7 @@ public class CPUMemThreadLevelAccountingObjects { private final String _queryId; private final int _taskId; private final Thread _anchorThread; + private final TaskType _taskType; private final String _workloadName; @@ -143,10 +151,11 @@ public class CPUMemThreadLevelAccountingObjects { return _taskId == CommonConstants.Accounting.ANCHOR_TASK_ID; } - public TaskEntry(String queryId, int taskId, Thread anchorThread, String workloadName) { + public TaskEntry(String queryId, int taskId, TaskType taskType, Thread anchorThread, String workloadName) { _queryId = queryId; _taskId = taskId; _anchorThread = anchorThread; + _taskType = taskType; _workloadName = workloadName; } @@ -162,14 +171,20 @@ public class CPUMemThreadLevelAccountingObjects { return _anchorThread; } + @Override + public TaskType getTaskType() { + return _taskType; + } + + public String getWorkloadName() { return _workloadName; } @Override public String toString() { - return "TaskEntry{" + "_queryId='" + _queryId + '\'' + ", _taskId=" + _taskId + ", _anchorThread=" + _anchorThread - + ", _workloadName='" + _workloadName + '\'' + '}'; + return "TaskEntry{" + "_queryId='" + _queryId + '\'' + ", _taskId=" + _taskId + ", _rootThread=" + _anchorThread + + ", _taskType=" + _taskType + ", _workloadName=" + _workloadName + '}'; } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java index 1328e17e621..79f5d671966 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java @@ -341,22 +341,23 @@ public class PerQueryCPUMemAccountantFactory implements ThreadAccountantFactory } @Override - public void setupRunner(@Nullable String queryId, String workloadName) { + public void setupRunner(@Nullable String queryId, ThreadExecutionContext.TaskType taskType, + String workloadName) { _threadLocalEntry.get()._errorStatus.set(null); if (queryId != null) { _threadLocalEntry.get() - .setThreadTaskStatus(queryId, CommonConstants.Accounting.ANCHOR_TASK_ID, Thread.currentThread(), + .setThreadTaskStatus(queryId, CommonConstants.Accounting.ANCHOR_TASK_ID, taskType, Thread.currentThread(), workloadName); } } @Override - public void setupWorker(int taskId, @Nullable ThreadExecutionContext parentContext) { + public void setupWorker(int taskId, ThreadExecutionContext.TaskType taskType, + @Nullable ThreadExecutionContext parentContext) { _threadLocalEntry.get()._errorStatus.set(null); if (parentContext != null && parentContext.getQueryId() != null && parentContext.getAnchorThread() != null) { - _threadLocalEntry.get() - .setThreadTaskStatus(parentContext.getQueryId(), taskId, parentContext.getAnchorThread(), - parentContext.getWorkloadName()); + _threadLocalEntry.get().setThreadTaskStatus(parentContext.getQueryId(), taskId, parentContext.getTaskType(), + parentContext.getAnchorThread(), parentContext.getWorkloadName()); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java index 3252f994d53..b5f341a1188 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java @@ -141,22 +141,23 @@ public class ResourceUsageAccountantFactory implements ThreadAccountantFactory { } @Override - public void setupRunner(@Nullable String queryId, String workloadName) { - CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = _threadLocalEntry.get(); - threadEntry._errorStatus.set(null); + public void setupRunner(@Nullable String queryId, ThreadExecutionContext.TaskType taskType, String workloadName) { + _threadLocalEntry.get()._errorStatus.set(null); if (queryId != null) { - threadEntry.setThreadTaskStatus(queryId, CommonConstants.Accounting.ANCHOR_TASK_ID, Thread.currentThread(), - workloadName); + _threadLocalEntry.get() + .setThreadTaskStatus(queryId, CommonConstants.Accounting.ANCHOR_TASK_ID, taskType, Thread.currentThread(), + workloadName); } } @Override - public void setupWorker(int taskId, @Nullable ThreadExecutionContext parentContext) { - CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = _threadLocalEntry.get(); - threadEntry._errorStatus.set(null); + public void setupWorker(int taskId, ThreadExecutionContext.TaskType taskType, + @Nullable ThreadExecutionContext parentContext) { + _threadLocalEntry.get()._errorStatus.set(null); if (parentContext != null && parentContext.getQueryId() != null && parentContext.getAnchorThread() != null) { - threadEntry.setThreadTaskStatus(parentContext.getQueryId(), taskId, parentContext.getAnchorThread(), - parentContext.getWorkloadName()); + _threadLocalEntry.get() + .setThreadTaskStatus(parentContext.getQueryId(), taskId, parentContext.getTaskType(), + parentContext.getAnchorThread(), parentContext.getWorkloadName()); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java index 76f8af85b38..d385f29dae1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java @@ -268,7 +268,7 @@ public class GroupByDataTableReducer implements DataTableReducer { futures[i] = reducerContext.getExecutorService().submit(new TraceRunnable() { @Override public void runJob() { - _resourceUsageAccountant.setupWorker(taskId, parentContext); + _resourceUsageAccountant.setupWorker(taskId, ThreadExecutionContext.TaskType.SSE, parentContext); try { for (DataTable dataTable : reduceGroup) { boolean nullHandlingEnabled = _queryContext.isNullHandlingEnabled(); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantTest.java b/pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantTest.java index 71192ddac33..5cefb3fc374 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantTest.java @@ -22,6 +22,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.CountDownLatch; import org.apache.pinot.spi.accounting.QueryResourceTracker; +import org.apache.pinot.spi.accounting.ThreadExecutionContext; import org.apache.pinot.spi.utils.CommonConstants; import org.testng.annotations.Test; @@ -70,8 +71,8 @@ public class PerQueryCPUMemAccountantTest { // New Task CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = workerEntry._threadEntry; threadEntry._currentThreadTaskStatus.set( - new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5, anchorThread._workerThread, - CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME)); + new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5, ThreadExecutionContext.TaskType.SSE, + anchorThread._workerThread, CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME)); threadEntry._currentThreadMemoryAllocationSampleBytes = 1500; Map<String, ? extends QueryResourceTracker> queryResourceTrackerMap = accountant.getQueryResources(); @@ -100,8 +101,8 @@ public class PerQueryCPUMemAccountantTest { // New Task CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = workerEntry._threadEntry; threadEntry._currentThreadTaskStatus.set( - new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5, anchorThread._workerThread, - CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME)); + new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5, ThreadExecutionContext.TaskType.SSE, + anchorThread._workerThread, CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME)); threadEntry.setToIdle(); Map<String, ? extends QueryResourceTracker> queryResourceTrackerMap = accountant.getQueryResources(); @@ -135,8 +136,8 @@ public class PerQueryCPUMemAccountantTest { // New Task CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = workerEntry._threadEntry; threadEntry._currentThreadTaskStatus.set( - new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5, anchorThread._workerThread, - CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME)); + new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5, ThreadExecutionContext.TaskType.SSE, + anchorThread._workerThread, CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME)); threadEntry._currentThreadMemoryAllocationSampleBytes = 1500; accountant.reapFinishedTasks(); @@ -203,8 +204,8 @@ public class PerQueryCPUMemAccountantTest { // New Task CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = workerEntry._threadEntry; threadEntry._currentThreadTaskStatus.set( - new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5, anchorThread._workerThread, - CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME)); + new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, 5, ThreadExecutionContext.TaskType.SSE, + anchorThread._workerThread, CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME)); threadEntry._currentThreadMemoryAllocationSampleBytes = 1500; accountant.reapFinishedTasks(); @@ -249,8 +250,8 @@ public class PerQueryCPUMemAccountantTest { // New Task CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry = workerEntry._threadEntry; threadEntry._currentThreadTaskStatus.set( - new CPUMemThreadLevelAccountingObjects.TaskEntry(newQueryId, 5, anchorThread._workerThread, - CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME)); + new CPUMemThreadLevelAccountingObjects.TaskEntry(newQueryId, 5, ThreadExecutionContext.TaskType.SSE, + anchorThread._workerThread, CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME)); threadEntry._currentThreadMemoryAllocationSampleBytes = 3500; accountant.reapFinishedTasks(); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java index 2f2a29bb8b3..ba46611343a 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java @@ -21,6 +21,7 @@ package org.apache.pinot.core.accounting; import java.util.HashSet; import java.util.Map; import java.util.concurrent.CountDownLatch; +import org.apache.pinot.spi.accounting.ThreadExecutionContext; import org.apache.pinot.spi.config.instance.InstanceType; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; @@ -42,7 +43,8 @@ class TestResourceAccountant extends PerQueryCPUMemAccountantFactory.PerQueryCPU CPUMemThreadLevelAccountingObjects.ThreadEntry anchorEntry = new CPUMemThreadLevelAccountingObjects.ThreadEntry(); anchorEntry._currentThreadTaskStatus.set( new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, CommonConstants.Accounting.ANCHOR_TASK_ID, - anchorThread._workerThread, CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME)); + ThreadExecutionContext.TaskType.SSE, anchorThread._workerThread, + CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME)); anchorEntry._currentThreadMemoryAllocationSampleBytes = 1000; threadEntries.put(anchorThread._workerThread, anchorEntry); @@ -57,8 +59,9 @@ class TestResourceAccountant extends PerQueryCPUMemAccountantFactory.PerQueryCPU private static TaskThread getTaskThread(String queryId, int taskId, CountDownLatch threadLatch, Thread anchorThread) { CPUMemThreadLevelAccountingObjects.ThreadEntry worker1 = new CPUMemThreadLevelAccountingObjects.ThreadEntry(); - worker1._currentThreadTaskStatus.set(new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, taskId, anchorThread, - CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME)); + worker1._currentThreadTaskStatus.set( + new CPUMemThreadLevelAccountingObjects.TaskEntry(queryId, taskId, ThreadExecutionContext.TaskType.SSE, + anchorThread, CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME)); Thread workerThread1 = new Thread(() -> { try { threadLatch.await(); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java index d7a0ad23503..18d34f35c67 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java @@ -35,6 +35,7 @@ import org.apache.pinot.query.runtime.operator.MultiStageOperator; import org.apache.pinot.query.runtime.operator.OpChain; import org.apache.pinot.query.runtime.operator.OpChainId; import org.apache.pinot.query.runtime.plan.MultiStageQueryStats; +import org.apache.pinot.spi.accounting.ThreadExecutionContext; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner; @@ -82,7 +83,8 @@ public class OpChainSchedulerService { // try-with-resources to ensure that the operator chain is closed // TODO: Change the code so we ownership is expressed in the code in a better way try (OpChain closeMe = operatorChain) { - Tracing.ThreadAccountantOps.setupWorker(operatorChain.getId().getStageId(), operatorChain.getParentContext()); + Tracing.ThreadAccountantOps.setupWorker(operatorChain.getId().getStageId(), + ThreadExecutionContext.TaskType.MSE, operatorChain.getParentContext()); LOGGER.trace("({}): Executing", operatorChain); MseBlock result = operatorChain.getRoot().nextBlock(); while (result.isData()) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java index b8fbe6f4145..a3ab6471c93 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java @@ -287,7 +287,8 @@ public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase { String workloadName = QueryOptionsUtils.getWorkloadName(reqMetadata); //TODO: Verify if this matches with what OOM protection expects. This method will not block for the query to // finish, so it may be breaking some of the OOM protection assumptions. - Tracing.ThreadAccountantOps.setupRunner(QueryThreadContext.getCid(), workloadName); + Tracing.ThreadAccountantOps.setupRunner(QueryThreadContext.getCid(), ThreadExecutionContext.TaskType.MSE, + workloadName); ThreadExecutionContext parentContext = Tracing.getThreadAccountant().getThreadExecutionContext(); try { diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java index c99a684e2d2..a1ee88e56a9 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultiStageAccountingTest.java @@ -98,10 +98,9 @@ public class MultiStageAccountingTest implements ITest { Tracing.ThreadAccountantOps.startThreadAccountant(); // Setup Thread Context - Tracing.ThreadAccountantOps.setupRunner("MultiStageAccountingTest", - CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME); + Tracing.ThreadAccountantOps.setupRunner("MultiStageAccountingTest", ThreadExecutionContext.TaskType.MSE, null); ThreadExecutionContext threadExecutionContext = Tracing.getThreadAccountant().getThreadExecutionContext(); - Tracing.ThreadAccountantOps.setupWorker(1, threadExecutionContext); + Tracing.ThreadAccountantOps.setupWorker(1, ThreadExecutionContext.TaskType.MSE, threadExecutionContext); } @BeforeMethod diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java index b5b81999178..dd17202f644 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MultistageResourceUsageAccountingTest.java @@ -57,12 +57,10 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Factory; import org.testng.annotations.Test; -import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.DOUBLE; -import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.INT; -import static org.mockito.Mockito.when; -import static org.mockito.MockitoAnnotations.openMocks; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; +import static org.apache.pinot.common.utils.DataSchema.ColumnDataType.*; +import static org.mockito.Mockito.*; +import static org.mockito.MockitoAnnotations.*; +import static org.testng.Assert.*; public class MultistageResourceUsageAccountingTest implements ITest { @@ -99,10 +97,9 @@ public class MultistageResourceUsageAccountingTest implements ITest { Tracing.ThreadAccountantOps.startThreadAccountant(); // Setup Thread Context - Tracing.ThreadAccountantOps.setupRunner("MultiStageAccountingTest", - CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME); + Tracing.ThreadAccountantOps.setupRunner("MultiStageAccountingTest", ThreadExecutionContext.TaskType.MSE, null); ThreadExecutionContext threadExecutionContext = Tracing.getThreadAccountant().getThreadExecutionContext(); - Tracing.ThreadAccountantOps.setupWorker(1, threadExecutionContext); + Tracing.ThreadAccountantOps.setupWorker(1, ThreadExecutionContext.TaskType.MSE, threadExecutionContext); } @BeforeMethod diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java index 58a5ad886b9..dfb88fe10c8 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java @@ -177,8 +177,7 @@ public abstract class QueryRunnerTestBase extends QueryTestSet { for (Map.Entry<QueryServerInstance, List<Integer>> entry : dispatchableStagePlan.getServerInstanceToWorkerIdMap() .entrySet()) { QueryServerEnclosure serverEnclosure = _servers.get(entry.getKey()); - Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId), - CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME); + Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId), ThreadExecutionContext.TaskType.MSE, null); ThreadExecutionContext parentContext = Tracing.getThreadAccountant().getThreadExecutionContext(); List<WorkerMetadata> workerMetadataList = entry.getValue().stream().map(stageWorkerMetadataList::get).collect(Collectors.toList()); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java index 63c900fac03..9aa18b5b230 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadExecutionContext.java @@ -23,6 +23,17 @@ package org.apache.pinot.spi.accounting; */ public interface ThreadExecutionContext { + /** + * SSE: Single Stage Engine + * MSE: Multi Stage Engine + * UNKNOWN: Default + */ + enum TaskType { + SSE, + MSE, + UNKNOWN + } + /** * get query id of the execution context * @return query id in string @@ -35,5 +46,7 @@ public interface ThreadExecutionContext { */ Thread getAnchorThread(); + TaskType getTaskType(); + String getWorkloadName(); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java index 3d169bf6497..d084bf40d41 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceTracker.java @@ -51,4 +51,6 @@ public interface ThreadResourceTracker { * @return an int containing the task id. */ int getTaskId(); + + ThreadExecutionContext.TaskType getTaskType(); } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java index 49301ee6994..ea1a7bf3f5d 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java @@ -49,16 +49,19 @@ public interface ThreadResourceUsageAccountant { /** * Set up the thread execution context for an anchor a.k.a runner thread. * @param queryId query id string + * @param taskType the type of the task - SSE or MSE * @param workloadName the name of the workload, can be null */ - void setupRunner(@Nullable String queryId, String workloadName); + void setupRunner(@Nullable String queryId, ThreadExecutionContext.TaskType taskType, String workloadName); /** * Set up the thread execution context for a worker thread. * @param taskId a unique task id + * @param taskType the type of the task - SSE or MSE * @param parentContext the parent execution context */ - void setupWorker(int taskId, @Nullable ThreadExecutionContext parentContext); + void setupWorker(int taskId, ThreadExecutionContext.TaskType taskType, + @Nullable ThreadExecutionContext parentContext); /** * get the executon context of current thread diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java index 3b7561ac23c..5501d180517 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java @@ -213,11 +213,12 @@ public class Tracing { } @Override - public void setupRunner(@Nullable String queryId, String workloadName) { + public void setupRunner(@Nullable String queryId, ThreadExecutionContext.TaskType taskType, String workloadName) { } @Override - public void setupWorker(int taskId, @Nullable ThreadExecutionContext parentContext) { + public void setupWorker(int taskId, ThreadExecutionContext.TaskType taskType, + @Nullable ThreadExecutionContext parentContext) { } @Override @@ -258,7 +259,12 @@ public class Tracing { } public static void setupRunner(String queryId, String workloadName) { - Tracing.getThreadAccountant().setupRunner(queryId, workloadName); + setupRunner(queryId, ThreadExecutionContext.TaskType.SSE, workloadName); + } + + public static void setupRunner(String queryId, ThreadExecutionContext.TaskType taskType, String workloadName) { + // Set up the runner thread with the given query ID and workload name + Tracing.getThreadAccountant().setupRunner(queryId, taskType, workloadName); } /** @@ -267,7 +273,17 @@ public class Tracing { * @param threadExecutionContext Context holds metadata about the query. */ public static void setupWorker(int taskId, ThreadExecutionContext threadExecutionContext) { - Tracing.getThreadAccountant().setupWorker(taskId, threadExecutionContext); + setupWorker(taskId, ThreadExecutionContext.TaskType.SSE, threadExecutionContext); + } + + /** + * Setup metadata of query worker threads. + * @param taskId Query task ID of the thread. In SSE, ID is an incrementing counter. In MSE, id is the stage id. + * @param threadExecutionContext Context holds metadata about the query. + */ + public static void setupWorker(int taskId, ThreadExecutionContext.TaskType taskType, + @Nullable ThreadExecutionContext threadExecutionContext) { + Tracing.getThreadAccountant().setupWorker(taskId, taskType, threadExecutionContext); } public static void sample() { diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java index 3020cf4b21b..7266a4a7a9b 100644 --- a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java +++ b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java @@ -56,11 +56,13 @@ public class ThrottleOnCriticalHeapUsageExecutorTest { } @Override - public void setupRunner(@Nullable String queryId, String workloadName) { + public void setupRunner(@Nullable String queryId, ThreadExecutionContext.TaskType taskType, + String workloadName) { } @Override - public void setupWorker(int taskId, @Nullable ThreadExecutionContext parentContext) { + public void setupWorker(int taskId, ThreadExecutionContext.TaskType taskType, + @Nullable ThreadExecutionContext parentContext) { } @Nullable --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
