Jackie-Jiang commented on code in PR #16728:
URL: https://github.com/apache/pinot/pull/16728#discussion_r2350073504
##########
pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java:
##########
@@ -18,680 +18,257 @@
*/
package org.apache.pinot.spi.query;
-import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
import javax.annotation.Nullable;
-import org.apache.pinot.spi.env.PinotConfiguration;
+import
org.apache.pinot.spi.accounting.ThreadAccounting.DefaultThreadResourceUsageAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.exception.EarlyTerminationException;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
import org.apache.pinot.spi.executor.DecoratorExecutorService;
import org.apache.pinot.spi.trace.LoggerConstants;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-/**
- * The {@code QueryThreadContext} class is a thread-local context for storing
common query-related information
- * associated to the current thread.
- *
- * <p>It is used to pass information between different layers of the query
execution stack without changing the
- * method signatures. This is also used to populate the {@link MDC} context
for logging.
- *
- * Use {@link #open(String)} to initialize the empty context. As any other
{@link AutoCloseable} object, it should be
- * used within a try-with-resources block to ensure the context is properly
closed and removed from the thread-local
- * storage.
- *
- * Sometimes it is necessary to copy the state of the {@link
QueryThreadContext} from one thread to another. In this
- * case, use the {@link #createMemento()} method to capture the state of the
{@link QueryThreadContext} in the current
- * thread and then use the {@link #open(Memento)} method to initialize the
context in the target thread with the state
- * captured in the {@link Memento} object. The API may be a bit cumbersome,
but it ensures that the state is always
- * copied between threads in a safe way and makes it impossible to use the
{@link QueryThreadContext} from another
- * thread by mistake.
- *
- * It is guaranteed that all server and broker threads running SSE and MSE
will have this context initialized as soon
- * as the request is received. Ingestion threads that use the query execution
stack will also have this context
- * initialized.
- */
-public class QueryThreadContext {
- private static final Logger LOGGER =
LoggerFactory.getLogger(QueryThreadContext.class);
- private static final ThreadLocal<Instance> THREAD_LOCAL = new
ThreadLocal<>();
- public static volatile boolean _strictMode;
- private static final FakeInstance FAKE_INSTANCE = new FakeInstance();
-
- static {
- // This is a hack to know if assertions are enabled or not
- boolean assertEnabled = false;
- //CHECKSTYLE:OFF
- assert assertEnabled = true;
- //CHECKSTYLE:ON
- _strictMode = assertEnabled;
- }
- /**
- * Private constructor to prevent instantiation.
- *
- * Use {@link #open(String)} to initialize the context instead.
- */
- private QueryThreadContext() {
- }
+/// The [QueryThreadContext] class is a thread-local context for storing
common query-related information associated to
+/// the current thread.
+///
+/// It is used to pass information between different layers of the query
execution stack without changing the method
+/// signatures. This is also used to populate the [org.slf4j.MDC] context for
logging.
+///
+/// Use [#open] to initialize the empty context. As any other [AutoCloseable]
object, it should be used within a
+/// try-with-resources block to ensure the context is properly closed and
removed from the thread-local storage and
+/// resource usage accountant.
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class QueryThreadContext implements AutoCloseable {
+ // Check if query should be terminated, and sample resource usage per 8192
records
+ public static final int CHECK_TERMINATION_AND_SAMPLE_USAGE_RECORD_MASK =
0x1FFF;
- /**
- * Sets the strict mode of the {@link QueryThreadContext} from the given
configuration.
- */
- public static void onStartup(PinotConfiguration conf) {
- String mode =
conf.getProperty(CommonConstants.Query.CONFIG_OF_QUERY_CONTEXT_MODE);
- if ("strict".equalsIgnoreCase(mode)) {
- _strictMode = true;
- return;
- }
- if (mode != null && !mode.isEmpty()) {
- throw new IllegalArgumentException("Invalid value '" + mode + "' for "
- + CommonConstants.Query.CONFIG_OF_QUERY_CONTEXT_MODE + ". Expected
'strict' or empty");
+ private static final ThreadLocal<QueryThreadContext> THREAD_LOCAL = new
ThreadLocal<>();
+
+ private final QueryExecutionContext _executionContext;
+ private final MseWorkerInfo _mseWorkerInfo;
+ private final ThreadResourceUsageAccountant _resourceUsageAccountant;
+
+ private QueryThreadContext(QueryExecutionContext executionContext, @Nullable
MseWorkerInfo mseWorkerInfo,
+ ThreadResourceUsageAccountant resourceUsageAccountant) {
+ _executionContext = executionContext;
+ _mseWorkerInfo = mseWorkerInfo;
+ _resourceUsageAccountant = resourceUsageAccountant;
+
LoggerConstants.REQUEST_ID_KEY.registerInMdc(Long.toString(executionContext.getRequestId()));
+
LoggerConstants.CORRELATION_ID_KEY.registerInMdc(executionContext.getCid());
+ if (mseWorkerInfo != null) {
+
LoggerConstants.STAGE_ID_KEY.registerInMdc(Integer.toString(mseWorkerInfo.getStageId()));
+
LoggerConstants.WORKER_ID_KEY.registerInMdc(Integer.toString(mseWorkerInfo.getWorkerId()));
}
}
- /**
- * Returns {@code true} if the {@link QueryThreadContext} is in strict mode.
- *
- * In strict mode, if the {@link QueryThreadContext} is not initialized, an
{@link IllegalStateException} will be
- * thrown when setter and getter methods are used.
- * In non-strict mode, a warning will be logged and the fake instance will
be returned.
- *
- * @see #onStartup(PinotConfiguration)
- */
- public static boolean isStrictMode() {
- return _strictMode;
+ public QueryExecutionContext getExecutionContext() {
+ return _executionContext;
}
- private static Instance get() {
- Instance instance = THREAD_LOCAL.get();
- if (instance == null) {
- String errorMessage = "QueryThreadContext is not initialized";
- if (_strictMode) {
- LOGGER.error(errorMessage);
- throw new IllegalStateException(errorMessage);
- } else {
- LOGGER.debug(errorMessage);
- // in non-strict mode, return the fake instance
- return FAKE_INSTANCE;
- }
- }
- return instance;
+ @Nullable
+ public MseWorkerInfo getMseWorkerInfo() {
+ return _mseWorkerInfo;
}
- /**
- * Returns {@code true} if the {@link QueryThreadContext} is initialized in
the current thread.
- *
- * Initializing the context means that the {@link #open(String)} method was
called and the returned object is not
- * closed yet.
- */
- public static boolean isInitialized() {
- return THREAD_LOCAL.get() != null;
+ @JsonIgnore
+ public ThreadResourceUsageAccountant getResourceUsageAccountant() {
+ return _resourceUsageAccountant;
}
- /**
- * Captures the state of the {@link QueryThreadContext} in the current
thread.
- *
- * This method is used to capture the state of the {@link
QueryThreadContext} in the current thread so that it can be
- * restored later in another thread.
- *
- * @return a {@link Memento} object that captures the state of the {@link
QueryThreadContext}
- * @throws IllegalStateException if the {@link QueryThreadContext} is not
initialized
- * @see #open(Memento)
- */
- public static Memento createMemento() {
- return new Memento(get());
+ private void sampleUsageInternal() {
+ _resourceUsageAccountant.sampleUsage();
}
- /**
- * Initializes the {@link QueryThreadContext} with default values.
- *
- * This method will throw an {@link IllegalStateException} if the {@link
QueryThreadContext} is already initialized.
- * That indicates an error in the code. Older context must be closed before
opening a new one.
- *
- * @return an {@link AutoCloseable} object that should be used within a
try-with-resources block
- * @throws IllegalStateException if the {@link QueryThreadContext} is
already initialized.
- */
- @VisibleForTesting
- public static CloseableContext open() {
- return open("unknown");
+ private void checkTerminationInternal(Supplier<String> scopeSupplier) {
+ checkTerminationInternal(scopeSupplier,
_executionContext.getActiveDeadlineMs());
}
- public static CloseableContext open(String instanceId) {
- CloseableContext open = open((Memento) null);
- get()._instanceId = instanceId;
- return open;
+ private void checkTerminationInternal(Supplier<String> scopeSupplier, long
deadlineMs) {
+ QueryException terminateException =
_executionContext.getTerminateException();
+ if (terminateException != null) {
+ throw terminateException;
+ }
+ if (Thread.interrupted()) {
+ throw new EarlyTerminationException("Interrupted on: " +
scopeSupplier.get());
+ }
+ if (System.currentTimeMillis() >= deadlineMs) {
+ throw QueryErrorCode.EXECUTION_TIMEOUT.asException("Timing out on: " +
scopeSupplier.get());
+ }
}
- public static CloseableContext openFromRequestMetadata(String instanceId,
Map<String, String> requestMetadata) {
- CloseableContext open = open(instanceId);
- String cid =
requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.CORRELATION_ID);
- long requestId =
Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
- if (cid == null) {
- cid = Long.toString(requestId);
+ /// Closes the {@link QueryThreadContext} and removes it from the
thread-local storage and MDC context.
+ @Override
+ public void close() {
+ _resourceUsageAccountant.clear();
+ THREAD_LOCAL.remove();
+ LoggerConstants.REQUEST_ID_KEY.unregisterFromMdc();
+ LoggerConstants.CORRELATION_ID_KEY.unregisterFromMdc();
+ if (_mseWorkerInfo != null) {
+ LoggerConstants.STAGE_ID_KEY.unregisterFromMdc();
+ LoggerConstants.WORKER_ID_KEY.unregisterFromMdc();
}
- QueryThreadContext.setIds(requestId, cid);
- long timeoutMs =
Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
- long extraPassiveTimeoutMs = Long.parseLong(requestMetadata.getOrDefault(
-
CommonConstants.Broker.Request.QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS, "0"));
- long startTimeMs = System.currentTimeMillis();
- QueryThreadContext.setStartTimeMs(startTimeMs);
- QueryThreadContext.setActiveDeadlineMs(startTimeMs + timeoutMs);
- QueryThreadContext.setPassiveDeadlineMs(startTimeMs + timeoutMs +
extraPassiveTimeoutMs);
-
- return open;
}
- /**
- * Initializes the {@link QueryThreadContext} with the state captured in the
given {@link Memento} object, if any.
- *
- * This method will throw an {@link IllegalStateException} if the {@link
QueryThreadContext} is already initialized.
- * That indicates an error in the code. Older context must be closed before
opening a new one.
- *
- * Values that were set in the {@link Memento} object will be set in the
{@link QueryThreadContext} and therefore
- * they couldn't be set again in the current thread (at least until the
returned {@link AutoCloseable} is closed).
- *
- * @param memento the {@link Memento} object to capture the state from
- * (if {@code null}, the context will be initialized with
default values)
- * @return an {@link AutoCloseable} object that should be used within a
try-with-resources block
- * @throws IllegalStateException if the {@link QueryThreadContext} is
already initialized.
- */
- public static CloseableContext open(@Nullable Memento memento) {
- if (THREAD_LOCAL.get() != null) {
- String errorMessage = "QueryThreadContext is already initialized";
- if (_strictMode) {
- LOGGER.error(errorMessage);
- throw new IllegalStateException("QueryThreadContext is already
initialized");
- } else {
- LOGGER.debug(errorMessage);
- return FAKE_INSTANCE;
- }
- }
+ /// Opens a new [QueryThreadContext] for the current thread and add it to
the thread-local storage.
+ public static QueryThreadContext open(QueryExecutionContext executionContext,
+ ThreadResourceUsageAccountant accountant) {
+ return open(executionContext, null, accountant);
+ }
- Instance context = new Instance();
- if (memento != null) {
- context.setStartTimeMs(memento._startTimeMs);
- context.setActiveDeadlineMs(memento._activeDeadlineMs);
- context.setPassiveDeadlineMs(memento._passiveDeadlineMs);
- context.setBrokerId(memento._brokerId);
- context.setRequestId(memento._requestId);
- context.setCid(memento._cid);
- context.setSql(memento._sql);
- context.setQueryEngine(memento._queryEngine);
- context.setInstanceId(memento._instanceId);
- }
+ /// Opens a new [QueryThreadContext] for the current thread and add it to
the thread-local storage.
+ public static QueryThreadContext open(QueryExecutionContext
executionContext, @Nullable MseWorkerInfo mseWorkerInfo,
+ ThreadResourceUsageAccountant accountant) {
+ QueryThreadContext threadContext = new
QueryThreadContext(executionContext, mseWorkerInfo, accountant);
+ THREAD_LOCAL.set(threadContext);
+ accountant.setupTask(threadContext);
+ return threadContext;
+ }
- THREAD_LOCAL.set(context);
+ @VisibleForTesting
+ public static QueryThreadContext openForSseTest() {
+ return open(QueryExecutionContext.forSseTest(),
DefaultThreadResourceUsageAccountant.INSTANCE);
+ }
- return context;
+ @VisibleForTesting
+ public static QueryThreadContext openForMseTest() {
+ return open(QueryExecutionContext.forMseTest(), new MseWorkerInfo(0, 0),
+ DefaultThreadResourceUsageAccountant.INSTANCE);
}
- /**
- * Returns a new {@link ExecutorService} whose tasks will be executed with
the {@link QueryThreadContext} initialized
- * with the state of the thread submitting the tasks.
- *
- * @param executorService the {@link ExecutorService} to decorate
- */
- public static ExecutorService contextAwareExecutorService(ExecutorService
executorService) {
- return contextAwareExecutorService(executorService,
QueryThreadContext::createMemento);
+ /// Returns the [QueryThreadContext] for the current thread.
+ public static QueryThreadContext get() {
+ QueryThreadContext threadContext = THREAD_LOCAL.get();
+ assert threadContext != null;
+ return threadContext;
Review Comment:
I want to make it strict to avoid bugs very hard to detect.
`QueryThreadContext` holds very important info, and we rely on it to cancel the
query. Integration test should be able to catch the problem when it is not set
up properly
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]