This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5bc318eacff Misc bugfix/cleanup for thread accountant (#16532)
5bc318eacff is described below
commit 5bc318eacff5d5226bb594db00fc8bafce428da9
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Aug 13 15:24:28 2025 -0600
Misc bugfix/cleanup for thread accountant (#16532)
---
.../BaseSingleStageBrokerRequestHandler.java | 4 +--
.../MultiStageBrokerRequestHandler.java | 4 +--
.../common/function/scalar/InternalFunctions.java | 4 +--
.../common/utils/config/QueryOptionsUtils.java | 3 +-
.../PerQueryCPUMemAccountantFactory.java | 5 +--
.../accounting/ResourceUsageAccountantFactory.java | 4 +--
.../core/accounting/TestResourceAccountant.java | 14 ++++----
.../runtime/plan/OpChainExecutionContext.java | 19 +++--------
.../pinot/query/service/server/QueryServer.java | 3 +-
.../executor/OpChainSchedulerServiceTest.java | 8 ++---
.../runtime/operator/MailboxSendOperatorTest.java | 4 +--
.../query/runtime/operator/OperatorTestUtil.java | 12 +++----
.../accounting/ThreadResourceUsageAccountant.java | 3 +-
.../apache/pinot/spi/query/QueryThreadContext.java | 37 ++--------------------
.../java/org/apache/pinot/spi/trace/Tracing.java | 5 ++-
.../ThrottleOnCriticalHeapUsageExecutorTest.java | 2 +-
16 files changed, 43 insertions(+), 88 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index d405e998b68..c829ae8427b 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -326,8 +326,8 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
//Start instrumentation context. This must not be moved further below
interspersed into the code.
String workloadName =
QueryOptionsUtils.getWorkloadName(sqlNodeAndOptions.getOptions());
- _resourceUsageAccountant.setupRunner(QueryThreadContext.getCid(),
CommonConstants.Accounting.ANCHOR_TASK_ID,
- ThreadExecutionContext.TaskType.SSE, workloadName);
+ _resourceUsageAccountant.setupRunner(QueryThreadContext.getCid(),
ThreadExecutionContext.TaskType.SSE,
+ workloadName);
try {
return doHandleRequest(requestId, query, sqlNodeAndOptions, request,
requesterIdentity, requestContext,
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index f62d48b9554..f841e46aafc 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -530,8 +530,8 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
try {
String workloadName =
QueryOptionsUtils.getWorkloadName(query.getOptions());
- _resourceUsageAccountant.setupRunner(QueryThreadContext.getCid(),
CommonConstants.Accounting.ANCHOR_TASK_ID,
- ThreadExecutionContext.TaskType.MSE, workloadName);
+ _resourceUsageAccountant.setupRunner(QueryThreadContext.getCid(),
ThreadExecutionContext.TaskType.MSE,
+ workloadName);
long executionStartTimeNs = System.nanoTime();
QueryDispatcher.QueryResult queryResults;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/InternalFunctions.java
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/InternalFunctions.java
index 4158f32b627..7c2649792ff 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/InternalFunctions.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/InternalFunctions.java
@@ -63,7 +63,7 @@ public class InternalFunctions {
return QueryThreadContext.getStartTimeMs();
}
- /// Returns the [deadline][QueryThreadContext#getDeadlineMs] of the query.
+ /// Returns the [deadline][QueryThreadContext#getActiveDeadlineMs()] of the
query.
///
/// The input value is not directly used. Instead it is here to control
whether the function is called during query
/// optimization or execution. In order to do the latter, a non-constant
value (like a column) should be passed as
@@ -72,7 +72,7 @@ public class InternalFunctions {
/// This is mostly useful for test and internal usage
@ScalarFunction
public static long endTime(String input) {
- return QueryThreadContext.getDeadlineMs();
+ return QueryThreadContext.getActiveDeadlineMs();
}
/// Returns the [broker id][QueryThreadContext#getBrokerId] of the query.
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 362236adcd3..0b47884fdb8 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -547,7 +547,6 @@ public class QueryOptionsUtils {
}
public static String getWorkloadName(Map<String, String> queryOptions) {
- return queryOptions.get(QueryOptionKey.WORKLOAD_NAME) != null ?
queryOptions.get(QueryOptionKey.WORKLOAD_NAME)
- : CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME;
+ return queryOptions.getOrDefault(QueryOptionKey.WORKLOAD_NAME,
CommonConstants.Accounting.DEFAULT_WORKLOAD_NAME);
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
index 293edff6ff6..79f5d671966 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/PerQueryCPUMemAccountantFactory.java
@@ -84,7 +84,8 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
*/
private static final String ACCOUNTANT_TASK_NAME =
"CPUMemThreadAccountant";
private static final int ACCOUNTANT_PRIORITY = 4;
- private final ExecutorService _executorService =
Executors.newFixedThreadPool(1, r -> {
+
+ private final ExecutorService _executorService =
Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setPriority(ACCOUNTANT_PRIORITY);
thread.setDaemon(true);
@@ -340,7 +341,7 @@ public class PerQueryCPUMemAccountantFactory implements
ThreadAccountantFactory
}
@Override
- public void setupRunner(@Nullable String queryId, int taskId,
ThreadExecutionContext.TaskType taskType,
+ public void setupRunner(@Nullable String queryId,
ThreadExecutionContext.TaskType taskType,
String workloadName) {
_threadLocalEntry.get()._errorStatus.set(null);
if (queryId != null) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
index 93c68ca7fe3..b5f341a1188 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/accounting/ResourceUsageAccountantFactory.java
@@ -54,7 +54,7 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
private static final String ACCOUNTANT_TASK_NAME =
"ResourceUsageAccountant";
private static final int ACCOUNTANT_PRIORITY = 4;
- private final ExecutorService _executorService =
Executors.newFixedThreadPool(1, r -> {
+ private final ExecutorService _executorService =
Executors.newSingleThreadExecutor(r -> {
Thread thread = new Thread(r);
thread.setPriority(ACCOUNTANT_PRIORITY);
thread.setDaemon(true);
@@ -141,7 +141,7 @@ public class ResourceUsageAccountantFactory implements
ThreadAccountantFactory {
}
@Override
- public void setupRunner(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType, String workloadName) {
+ public void setupRunner(@Nullable String queryId,
ThreadExecutionContext.TaskType taskType, String workloadName) {
_threadLocalEntry.get()._errorStatus.set(null);
if (queryId != null) {
_threadLocalEntry.get()
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java
index ba49824c451..ba46611343a 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/accounting/TestResourceAccountant.java
@@ -20,9 +20,7 @@ package org.apache.pinot.core.accounting;
import java.util.HashSet;
import java.util.Map;
-import java.util.Objects;
import java.util.concurrent.CountDownLatch;
-import java.util.stream.Collectors;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -76,11 +74,13 @@ class TestResourceAccountant extends
PerQueryCPUMemAccountantFactory.PerQueryCPU
}
public TaskThread getTaskThread(String queryId, int taskId) {
- Map.Entry<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry>
workerEntry =
- _threadEntriesMap.entrySet().stream().filter(
- e -> e.getValue()._currentThreadTaskStatus.get().getTaskId() == 3
&& Objects.equals(
- e.getValue()._currentThreadTaskStatus.get().getQueryId(),
queryId)).collect(Collectors.toList()).get(0);
- return new TaskThread(workerEntry.getValue(), workerEntry.getKey());
+ for (Map.Entry<Thread, CPUMemThreadLevelAccountingObjects.ThreadEntry>
entry : _threadEntriesMap.entrySet()) {
+ CPUMemThreadLevelAccountingObjects.ThreadEntry threadEntry =
entry.getValue();
+ if (queryId.equals(threadEntry.getQueryId()) && taskId ==
threadEntry.getTaskId()) {
+ return new TaskThread(threadEntry, entry.getKey());
+ }
+ }
+ throw new IllegalStateException("Failed to find thread for queryId: " +
queryId + ", taskId: " + taskId);
}
public static class TaskThread {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index 0c95edbfdd6..a0c9e6da2fd 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -39,7 +39,6 @@ import org.apache.pinot.spi.utils.CommonConstants;
* This information is then used by the OpChain to create the Operators for a
query.
*/
public class OpChainExecutionContext {
-
private final MailboxService _mailboxService;
private final long _requestId;
private final long _activeDeadlineMs;
@@ -58,20 +57,10 @@ public class OpChainExecutionContext {
private ServerPlanRequestContext _leafStageContext;
private final boolean _sendStats;
- @Deprecated
- public OpChainExecutionContext(MailboxService mailboxService, long
requestId, long deadlineMs,
- Map<String, String> opChainMetadata, StageMetadata stageMetadata,
WorkerMetadata workerMetadata,
- @Nullable PipelineBreakerResult pipelineBreakerResult, @Nullable
ThreadExecutionContext parentContext,
- boolean sendStats) {
- this(mailboxService, requestId, deadlineMs, deadlineMs, opChainMetadata,
stageMetadata, workerMetadata,
- pipelineBreakerResult, parentContext, sendStats);
- }
-
- public OpChainExecutionContext(MailboxService mailboxService, long requestId,
- long activeDeadlineMs, long passiveDeadlineMs,
- Map<String, String> opChainMetadata, StageMetadata stageMetadata,
WorkerMetadata workerMetadata,
- @Nullable PipelineBreakerResult pipelineBreakerResult, @Nullable
ThreadExecutionContext parentContext,
- boolean sendStats) {
+ public OpChainExecutionContext(MailboxService mailboxService, long
requestId, long activeDeadlineMs,
+ long passiveDeadlineMs, Map<String, String> opChainMetadata,
StageMetadata stageMetadata,
+ WorkerMetadata workerMetadata, @Nullable PipelineBreakerResult
pipelineBreakerResult,
+ @Nullable ThreadExecutionContext parentContext, boolean sendStats) {
_mailboxService = mailboxService;
_requestId = requestId;
_activeDeadlineMs = activeDeadlineMs;
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index b551737326b..a3ab6471c93 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -41,6 +41,7 @@ import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.proto.PinotQueryWorkerGrpc;
import org.apache.pinot.common.proto.Worker;
import org.apache.pinot.common.utils.NamedThreadFactory;
+import org.apache.pinot.common.utils.config.QueryOptionsUtils;
import org.apache.pinot.core.transport.grpc.GrpcQueryServer;
import org.apache.pinot.query.MseWorkerThreadContext;
import org.apache.pinot.query.planner.serde.PlanNodeSerializer;
@@ -283,7 +284,7 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
/// (normally cancelling other already started workers and sending the error
through GRPC)
private CompletableFuture<Void> submitWorker(WorkerMetadata workerMetadata,
StagePlan stagePlan,
Map<String, String> reqMetadata) {
- String workloadName =
reqMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.WORKLOAD_NAME);
+ String workloadName = QueryOptionsUtils.getWorkloadName(reqMetadata);
//TODO: Verify if this matches with what OOM protection expects. This
method will not block for the query to
// finish, so it may be breaking some of the OOM protection assumptions.
Tracing.ThreadAccountantOps.setupRunner(QueryThreadContext.getCid(),
ThreadExecutionContext.TaskType.MSE,
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
index b38ef228932..40c676313ae 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerServiceTest.java
@@ -19,7 +19,7 @@
package org.apache.pinot.query.runtime.executor;
import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -78,10 +78,10 @@ public class OpChainSchedulerServiceTest {
MailboxService mailboxService = mock(MailboxService.class);
when(mailboxService.getHostname()).thenReturn("localhost");
when(mailboxService.getPort()).thenReturn(1234);
- WorkerMetadata workerMetadata = new WorkerMetadata(0, ImmutableMap.of(),
ImmutableMap.of());
+ WorkerMetadata workerMetadata = new WorkerMetadata(0, Map.of(), Map.of());
OpChainExecutionContext context =
- new OpChainExecutionContext(mailboxService, 123L, Long.MAX_VALUE,
ImmutableMap.of(),
- new StageMetadata(0, ImmutableList.of(workerMetadata),
ImmutableMap.of()), workerMetadata, null, null,
+ new OpChainExecutionContext(mailboxService, 123L, Long.MAX_VALUE,
Long.MAX_VALUE, Map.of(),
+ new StageMetadata(0, ImmutableList.of(workerMetadata), Map.of()),
workerMetadata, null, null,
true);
return new OpChain(context, operator);
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
index 225520074fe..ee028d7ac71 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java
@@ -196,8 +196,8 @@ public class MailboxSendOperatorTest {
WorkerMetadata workerMetadata = new WorkerMetadata(0, Map.of(), Map.of());
StageMetadata stageMetadata = new StageMetadata(SENDER_STAGE_ID,
List.of(workerMetadata), Map.of());
OpChainExecutionContext context =
- new OpChainExecutionContext(_mailboxService, 123L, Long.MAX_VALUE,
Map.of(), stageMetadata, workerMetadata,
- null, null, true);
+ new OpChainExecutionContext(_mailboxService, 123L, Long.MAX_VALUE,
Long.MAX_VALUE, Map.of(), stageMetadata,
+ workerMetadata, null, null, true);
return new MailboxSendOperator(context, _input, statMap -> _exchange);
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
index b8e2be7d17d..99e55c1a8c1 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java
@@ -109,7 +109,7 @@ public class OperatorTestUtil {
public static OpChainExecutionContext getOpChainContext(MailboxService
mailboxService, long deadlineMs,
StageMetadata stageMetadata) {
- return new OpChainExecutionContext(mailboxService, 0, deadlineMs,
Map.of(), stageMetadata,
+ return new OpChainExecutionContext(mailboxService, 0, deadlineMs,
deadlineMs, Map.of(), stageMetadata,
stageMetadata.getWorkerMetadataList().get(0), null, null, true);
}
@@ -133,12 +133,10 @@ public class OperatorTestUtil {
StageMetadata stageMetadata =
new StageMetadata(0, List.of(workerMetadata),
Map.of(DispatchablePlanFragment.TABLE_NAME_KEY, "testTable"));
OpChainExecutionContext opChainExecutionContext =
- new OpChainExecutionContext(mailboxService, 123L, Long.MAX_VALUE,
opChainMetadata, stageMetadata,
- workerMetadata, null, null, true);
-
- StagePlan stagePlan = new StagePlan(null, stageMetadata);
-
- opChainExecutionContext.setLeafStageContext(new
ServerPlanRequestContext(stagePlan, null, null, null));
+ new OpChainExecutionContext(mailboxService, 123L, Long.MAX_VALUE,
Long.MAX_VALUE, opChainMetadata,
+ stageMetadata, workerMetadata, null, null, true);
+ opChainExecutionContext.setLeafStageContext(
+ new ServerPlanRequestContext(new StagePlan(null, stageMetadata), null,
null, null));
return opChainExecutionContext;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
index b744778e8a9..ea1a7bf3f5d 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadResourceUsageAccountant.java
@@ -49,11 +49,10 @@ public interface ThreadResourceUsageAccountant {
/**
* Set up the thread execution context for an anchor a.k.a runner thread.
* @param queryId query id string
- * @param taskId a unique task id
* @param taskType the type of the task - SSE or MSE
* @param workloadName the name of the workload, can be null
*/
- void setupRunner(String queryId, int taskId, ThreadExecutionContext.TaskType
taskType, String workloadName);
+ void setupRunner(@Nullable String queryId, ThreadExecutionContext.TaskType
taskType, String workloadName);
/**
* Set up the thread execution context for a worker thread.
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java
index dbfc6ece966..bac83460bfd 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java
@@ -19,6 +19,7 @@
package org.apache.pinot.spi.query;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import java.util.Map;
@@ -59,7 +60,7 @@ import org.slf4j.MDC;
public class QueryThreadContext {
private static final Logger LOGGER =
LoggerFactory.getLogger(QueryThreadContext.class);
private static final ThreadLocal<Instance> THREAD_LOCAL = new
ThreadLocal<>();
- public static volatile boolean _strictMode = false;
+ public static volatile boolean _strictMode;
private static final FakeInstance FAKE_INSTANCE = new FakeInstance();
static {
@@ -156,6 +157,7 @@ public class QueryThreadContext {
* @return an {@link AutoCloseable} object that should be used within a
try-with-resources block
* @throws IllegalStateException if the {@link QueryThreadContext} is
already initialized.
*/
+ @VisibleForTesting
public static CloseableContext open() {
return open("unknown");
}
@@ -166,12 +168,6 @@ public class QueryThreadContext {
return open;
}
- /// Just kept for backward compatibility.
- @Deprecated
- public static CloseableContext openFromRequestMetadata(Map<String, String>
requestMetadata) {
- return openFromRequestMetadata("unknown", requestMetadata);
- }
-
public static CloseableContext openFromRequestMetadata(String instanceId,
Map<String, String> requestMetadata) {
CloseableContext open = open(instanceId);
String cid =
requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.CORRELATION_ID);
@@ -298,23 +294,6 @@ public class QueryThreadContext {
get().setStartTimeMs(startTimeMs);
}
- /**
- * Use {@link #getActiveDeadlineMs()} instead.
- */
- @Deprecated
- public static long getDeadlineMs() {
- return get().getActiveDeadlineMs();
- }
-
- /**
- * @deprecated Use {@link #setActiveDeadlineMs(long)} instead.
- * @throws IllegalStateException if deadline is already set or if the {@link
QueryThreadContext} is not initialized
- */
- @Deprecated
- public static void setDeadlineMs(long deadlineMs) {
- get().setActiveDeadlineMs(deadlineMs);
- }
-
/**
* Returns the active deadline time of the query in milliseconds since epoch.
*
@@ -516,20 +495,10 @@ public class QueryThreadContext {
_startTimeMs = startTimeMs;
}
- @Deprecated
- public long getDeadlineMs() {
- return getActiveDeadlineMs();
- }
-
public long getActiveDeadlineMs() {
return _activeDeadlineMs;
}
- @Deprecated
- public void setDeadlineMs(long deadlineMs) {
- setActiveDeadlineMs(deadlineMs);
- }
-
public void setActiveDeadlineMs(long activeDeadlineMs) {
Preconditions.checkState(getActiveDeadlineMs() == 0, "Deadline already
set to %s, cannot set again",
getActiveDeadlineMs());
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
index d2466edbe47..5501d180517 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/Tracing.java
@@ -213,7 +213,7 @@ public class Tracing {
}
@Override
- public void setupRunner(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType, String workloadName) {
+ public void setupRunner(@Nullable String queryId,
ThreadExecutionContext.TaskType taskType, String workloadName) {
}
@Override
@@ -264,8 +264,7 @@ public class Tracing {
public static void setupRunner(String queryId,
ThreadExecutionContext.TaskType taskType, String workloadName) {
// Set up the runner thread with the given query ID and workload name
- Tracing.getThreadAccountant()
- .setupRunner(queryId, CommonConstants.Accounting.ANCHOR_TASK_ID,
taskType, workloadName);
+ Tracing.getThreadAccountant().setupRunner(queryId, taskType,
workloadName);
}
/**
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
index 5005c543677..7266a4a7a9b 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/executor/ThrottleOnCriticalHeapUsageExecutorTest.java
@@ -56,7 +56,7 @@ public class ThrottleOnCriticalHeapUsageExecutorTest {
}
@Override
- public void setupRunner(String queryId, int taskId,
ThreadExecutionContext.TaskType taskType,
+ public void setupRunner(@Nullable String queryId,
ThreadExecutionContext.TaskType taskType,
String workloadName) {
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]