gortiz commented on code in PR #16728:
URL: https://github.com/apache/pinot/pull/16728#discussion_r2324595199


##########
pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java:
##########
@@ -18,680 +18,257 @@
  */
 package org.apache.pinot.spi.query;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
-import org.apache.pinot.spi.env.PinotConfiguration;
+import 
org.apache.pinot.spi.accounting.ThreadAccounting.DefaultThreadResourceUsageAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.exception.EarlyTerminationException;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.spi.executor.DecoratorExecutorService;
 import org.apache.pinot.spi.trace.LoggerConstants;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
 
-/**
- * The {@code QueryThreadContext} class is a thread-local context for storing 
common query-related information
- * associated to the current thread.
- *
- * <p>It is used to pass information between different layers of the query 
execution stack without changing the
- * method signatures. This is also used to populate the {@link MDC} context 
for logging.
- *
- * Use {@link #open(String)} to initialize the empty context. As any other 
{@link AutoCloseable} object, it should be
- * used within a try-with-resources block to ensure the context is properly 
closed and removed from the thread-local
- * storage.
- *
- * Sometimes it is necessary to copy the state of the {@link 
QueryThreadContext} from one thread to another. In this
- * case, use the {@link #createMemento()} method to capture the state of the 
{@link QueryThreadContext} in the current
- * thread and then use the {@link #open(Memento)} method to initialize the 
context in the target thread with the state
- * captured in the {@link Memento} object. The API may be a bit cumbersome, 
but it ensures that the state is always
- * copied between threads in a safe way and makes it impossible to use the 
{@link QueryThreadContext} from another
- * thread by mistake.
- *
- * It is guaranteed that all server and broker threads running SSE and MSE 
will have this context initialized as soon
- * as the request is received. Ingestion threads that use the query execution 
stack will also have this context
- * initialized.
- */
-public class QueryThreadContext {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryThreadContext.class);
-  private static final ThreadLocal<Instance> THREAD_LOCAL = new 
ThreadLocal<>();
-  public static volatile boolean _strictMode;
-  private static final FakeInstance FAKE_INSTANCE = new FakeInstance();
-
-  static {
-    // This is a hack to know if assertions are enabled or not
-    boolean assertEnabled = false;
-    //CHECKSTYLE:OFF
-    assert assertEnabled = true;
-    //CHECKSTYLE:ON
-    _strictMode = assertEnabled;
-  }
 
-  /**
-   * Private constructor to prevent instantiation.
-   *
-   * Use {@link #open(String)} to initialize the context instead.
-   */
-  private QueryThreadContext() {
-  }
+/// The [QueryThreadContext] class is a thread-local context for storing 
common query-related information associated to
+/// the current thread.
+///
+/// It is used to pass information between different layers of the query 
execution stack without changing the method
+/// signatures. This is also used to populate the [org.slf4j.MDC] context for 
logging.
+///
+/// Use [#open] to initialize the empty context. As any other [AutoCloseable] 
object, it should be used within a
+/// try-with-resources block to ensure the context is properly closed and 
removed from the thread-local storage and
+/// resource usage accountant.
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class QueryThreadContext implements AutoCloseable {
+  // Check if query should be terminated, and sample resource usage per 8192 
records
+  public static final int CHECK_TERMINATION_AND_SAMPLE_USAGE_RECORD_MASK = 
0x1FFF;
 
-  /**
-   * Sets the strict mode of the {@link QueryThreadContext} from the given 
configuration.
-   */
-  public static void onStartup(PinotConfiguration conf) {
-    String mode = 
conf.getProperty(CommonConstants.Query.CONFIG_OF_QUERY_CONTEXT_MODE);
-    if ("strict".equalsIgnoreCase(mode)) {
-      _strictMode = true;
-      return;
-    }
-    if (mode != null && !mode.isEmpty()) {
-      throw new IllegalArgumentException("Invalid value '" + mode + "' for "
-          + CommonConstants.Query.CONFIG_OF_QUERY_CONTEXT_MODE + ". Expected 
'strict' or empty");
+  private static final ThreadLocal<QueryThreadContext> THREAD_LOCAL = new 
ThreadLocal<>();
+
+  private final QueryExecutionContext _executionContext;
+  private final MseWorkerInfo _mseWorkerInfo;

Review Comment:
   This can be nullable



##########
pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java:
##########
@@ -18,680 +18,257 @@
  */
 package org.apache.pinot.spi.query;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
-import org.apache.pinot.spi.env.PinotConfiguration;
+import 
org.apache.pinot.spi.accounting.ThreadAccounting.DefaultThreadResourceUsageAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.exception.EarlyTerminationException;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.spi.executor.DecoratorExecutorService;
 import org.apache.pinot.spi.trace.LoggerConstants;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
 
-/**
- * The {@code QueryThreadContext} class is a thread-local context for storing 
common query-related information
- * associated to the current thread.
- *
- * <p>It is used to pass information between different layers of the query 
execution stack without changing the
- * method signatures. This is also used to populate the {@link MDC} context 
for logging.
- *
- * Use {@link #open(String)} to initialize the empty context. As any other 
{@link AutoCloseable} object, it should be
- * used within a try-with-resources block to ensure the context is properly 
closed and removed from the thread-local
- * storage.
- *
- * Sometimes it is necessary to copy the state of the {@link 
QueryThreadContext} from one thread to another. In this
- * case, use the {@link #createMemento()} method to capture the state of the 
{@link QueryThreadContext} in the current
- * thread and then use the {@link #open(Memento)} method to initialize the 
context in the target thread with the state
- * captured in the {@link Memento} object. The API may be a bit cumbersome, 
but it ensures that the state is always
- * copied between threads in a safe way and makes it impossible to use the 
{@link QueryThreadContext} from another
- * thread by mistake.
- *
- * It is guaranteed that all server and broker threads running SSE and MSE 
will have this context initialized as soon
- * as the request is received. Ingestion threads that use the query execution 
stack will also have this context
- * initialized.
- */
-public class QueryThreadContext {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryThreadContext.class);
-  private static final ThreadLocal<Instance> THREAD_LOCAL = new 
ThreadLocal<>();
-  public static volatile boolean _strictMode;
-  private static final FakeInstance FAKE_INSTANCE = new FakeInstance();
-
-  static {
-    // This is a hack to know if assertions are enabled or not
-    boolean assertEnabled = false;
-    //CHECKSTYLE:OFF
-    assert assertEnabled = true;
-    //CHECKSTYLE:ON
-    _strictMode = assertEnabled;
-  }
 
-  /**
-   * Private constructor to prevent instantiation.
-   *
-   * Use {@link #open(String)} to initialize the context instead.
-   */
-  private QueryThreadContext() {
-  }
+/// The [QueryThreadContext] class is a thread-local context for storing 
common query-related information associated to
+/// the current thread.
+///
+/// It is used to pass information between different layers of the query 
execution stack without changing the method
+/// signatures. This is also used to populate the [org.slf4j.MDC] context for 
logging.
+///
+/// Use [#open] to initialize the empty context. As any other [AutoCloseable] 
object, it should be used within a
+/// try-with-resources block to ensure the context is properly closed and 
removed from the thread-local storage and
+/// resource usage accountant.
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class QueryThreadContext implements AutoCloseable {
+  // Check if query should be terminated, and sample resource usage per 8192 
records
+  public static final int CHECK_TERMINATION_AND_SAMPLE_USAGE_RECORD_MASK = 
0x1FFF;
 
-  /**
-   * Sets the strict mode of the {@link QueryThreadContext} from the given 
configuration.
-   */
-  public static void onStartup(PinotConfiguration conf) {
-    String mode = 
conf.getProperty(CommonConstants.Query.CONFIG_OF_QUERY_CONTEXT_MODE);
-    if ("strict".equalsIgnoreCase(mode)) {
-      _strictMode = true;
-      return;
-    }
-    if (mode != null && !mode.isEmpty()) {
-      throw new IllegalArgumentException("Invalid value '" + mode + "' for "
-          + CommonConstants.Query.CONFIG_OF_QUERY_CONTEXT_MODE + ". Expected 
'strict' or empty");
+  private static final ThreadLocal<QueryThreadContext> THREAD_LOCAL = new 
ThreadLocal<>();
+
+  private final QueryExecutionContext _executionContext;
+  private final MseWorkerInfo _mseWorkerInfo;
+  private final ThreadResourceUsageAccountant _resourceUsageAccountant;
+
+  private QueryThreadContext(QueryExecutionContext executionContext, @Nullable 
MseWorkerInfo mseWorkerInfo,
+      ThreadResourceUsageAccountant resourceUsageAccountant) {
+    _executionContext = executionContext;
+    _mseWorkerInfo = mseWorkerInfo;
+    _resourceUsageAccountant = resourceUsageAccountant;
+    
LoggerConstants.REQUEST_ID_KEY.registerInMdc(Long.toString(executionContext.getRequestId()));
+    
LoggerConstants.CORRELATION_ID_KEY.registerInMdc(executionContext.getCid());
+    if (mseWorkerInfo != null) {
+      
LoggerConstants.STAGE_ID_KEY.registerInMdc(Integer.toString(mseWorkerInfo.getStageId()));
+      
LoggerConstants.WORKER_ID_KEY.registerInMdc(Integer.toString(mseWorkerInfo.getWorkerId()));
     }
   }
 
-  /**
-   * Returns {@code true} if the {@link QueryThreadContext} is in strict mode.
-   *
-   * In strict mode, if the {@link QueryThreadContext} is not initialized, an 
{@link IllegalStateException} will be
-   * thrown when setter and getter methods are used.
-   * In non-strict mode, a warning will be logged and the fake instance will 
be returned.
-   *
-   * @see #onStartup(PinotConfiguration)
-   */
-  public static boolean isStrictMode() {
-    return _strictMode;
+  public QueryExecutionContext getExecutionContext() {
+    return _executionContext;
   }
 
-  private static Instance get() {
-    Instance instance = THREAD_LOCAL.get();
-    if (instance == null) {
-      String errorMessage = "QueryThreadContext is not initialized";
-      if (_strictMode) {
-        LOGGER.error(errorMessage);
-        throw new IllegalStateException(errorMessage);
-      } else {
-        LOGGER.debug(errorMessage);
-        // in non-strict mode, return the fake instance
-        return FAKE_INSTANCE;
-      }
-    }
-    return instance;
+  @Nullable
+  public MseWorkerInfo getMseWorkerInfo() {
+    return _mseWorkerInfo;
   }
 
-  /**
-   * Returns {@code true} if the {@link QueryThreadContext} is initialized in 
the current thread.
-   *
-   * Initializing the context means that the {@link #open(String)} method was 
called and the returned object is not
-   * closed yet.
-   */
-  public static boolean isInitialized() {
-    return THREAD_LOCAL.get() != null;
+  @JsonIgnore
+  public ThreadResourceUsageAccountant getResourceUsageAccountant() {
+    return _resourceUsageAccountant;
   }
 
-  /**
-   * Captures the state of the {@link QueryThreadContext} in the current 
thread.
-   *
-   * This method is used to capture the state of the {@link 
QueryThreadContext} in the current thread so that it can be
-   * restored later in another thread.
-   *
-   * @return a {@link Memento} object that captures the state of the {@link 
QueryThreadContext}
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   * @see #open(Memento)
-   */
-  public static Memento createMemento() {
-    return new Memento(get());
+  private void sampleUsageInternal() {
+    _resourceUsageAccountant.sampleUsage();
   }
 
-  /**
-   * Initializes the {@link QueryThreadContext} with default values.
-   *
-   * This method will throw an {@link IllegalStateException} if the {@link 
QueryThreadContext} is already initialized.
-   * That indicates an error in the code. Older context must be closed before 
opening a new one.
-   *
-   * @return an {@link AutoCloseable} object that should be used within a 
try-with-resources block
-   * @throws IllegalStateException if the {@link QueryThreadContext} is 
already initialized.
-   */
-  @VisibleForTesting
-  public static CloseableContext open() {
-    return open("unknown");
+  private void checkTerminationInternal(Supplier<String> scopeSupplier) {
+    checkTerminationInternal(scopeSupplier, 
_executionContext.getActiveDeadlineMs());
   }
 
-  public static CloseableContext open(String instanceId) {
-    CloseableContext open = open((Memento) null);
-    get()._instanceId = instanceId;
-    return open;
+  private void checkTerminationInternal(Supplier<String> scopeSupplier, long 
deadlineMs) {
+    QueryException terminateException = 
_executionContext.getTerminateException();
+    if (terminateException != null) {
+      throw terminateException;
+    }
+    if (Thread.interrupted()) {
+      throw new EarlyTerminationException("Interrupted on: " + 
scopeSupplier.get());
+    }
+    if (System.currentTimeMillis() >= deadlineMs) {
+      throw QueryErrorCode.EXECUTION_TIMEOUT.asException("Timing out on: " + 
scopeSupplier.get());
+    }
   }
 
-  public static CloseableContext openFromRequestMetadata(String instanceId, 
Map<String, String> requestMetadata) {
-    CloseableContext open = open(instanceId);
-    String cid = 
requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.CORRELATION_ID);
-    long requestId = 
Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
-    if (cid == null) {
-      cid = Long.toString(requestId);
+  /// Closes the {@link QueryThreadContext} and removes it from the 
thread-local storage and MDC context.
+  @Override
+  public void close() {
+    _resourceUsageAccountant.clear();
+    THREAD_LOCAL.remove();
+    LoggerConstants.REQUEST_ID_KEY.unregisterFromMdc();
+    LoggerConstants.CORRELATION_ID_KEY.unregisterFromMdc();
+    if (_mseWorkerInfo != null) {
+      LoggerConstants.STAGE_ID_KEY.unregisterFromMdc();
+      LoggerConstants.WORKER_ID_KEY.unregisterFromMdc();
     }
-    QueryThreadContext.setIds(requestId, cid);
-    long timeoutMs = 
Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
-    long extraPassiveTimeoutMs = Long.parseLong(requestMetadata.getOrDefault(
-        
CommonConstants.Broker.Request.QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS, "0"));
-    long startTimeMs = System.currentTimeMillis();
-    QueryThreadContext.setStartTimeMs(startTimeMs);
-    QueryThreadContext.setActiveDeadlineMs(startTimeMs + timeoutMs);
-    QueryThreadContext.setPassiveDeadlineMs(startTimeMs + timeoutMs + 
extraPassiveTimeoutMs);
-
-    return open;
   }
 
-  /**
-   * Initializes the {@link QueryThreadContext} with the state captured in the 
given {@link Memento} object, if any.
-   *
-   * This method will throw an {@link IllegalStateException} if the {@link 
QueryThreadContext} is already initialized.
-   * That indicates an error in the code. Older context must be closed before 
opening a new one.
-   *
-   * Values that were set in the {@link Memento} object will be set in the 
{@link QueryThreadContext} and therefore
-   * they couldn't be set again in the current thread (at least until the 
returned {@link AutoCloseable} is closed).
-   *
-   * @param memento the {@link Memento} object to capture the state from
-   *                (if {@code null}, the context will be initialized with 
default values)
-   * @return an {@link AutoCloseable} object that should be used within a 
try-with-resources block
-   * @throws IllegalStateException if the {@link QueryThreadContext} is 
already initialized.
-   */
-  public static CloseableContext open(@Nullable Memento memento) {
-    if (THREAD_LOCAL.get() != null) {
-      String errorMessage = "QueryThreadContext is already initialized";
-      if (_strictMode) {
-        LOGGER.error(errorMessage);
-        throw new IllegalStateException("QueryThreadContext is already 
initialized");
-      } else {
-        LOGGER.debug(errorMessage);
-        return FAKE_INSTANCE;
-      }
-    }
+  /// Opens a new [QueryThreadContext] for the current thread and add it to 
the thread-local storage.
+  public static QueryThreadContext open(QueryExecutionContext executionContext,
+      ThreadResourceUsageAccountant accountant) {
+    return open(executionContext, null, accountant);
+  }
 
-    Instance context = new Instance();
-    if (memento != null) {
-      context.setStartTimeMs(memento._startTimeMs);
-      context.setActiveDeadlineMs(memento._activeDeadlineMs);
-      context.setPassiveDeadlineMs(memento._passiveDeadlineMs);
-      context.setBrokerId(memento._brokerId);
-      context.setRequestId(memento._requestId);
-      context.setCid(memento._cid);
-      context.setSql(memento._sql);
-      context.setQueryEngine(memento._queryEngine);
-      context.setInstanceId(memento._instanceId);
-    }
+  /// Opens a new [QueryThreadContext] for the current thread and add it to 
the thread-local storage.
+  public static QueryThreadContext open(QueryExecutionContext 
executionContext, @Nullable MseWorkerInfo mseWorkerInfo,
+      ThreadResourceUsageAccountant accountant) {
+    QueryThreadContext threadContext = new 
QueryThreadContext(executionContext, mseWorkerInfo, accountant);
+    THREAD_LOCAL.set(threadContext);
+    accountant.setupTask(threadContext);
+    return threadContext;
+  }
 
-    THREAD_LOCAL.set(context);
+  @VisibleForTesting
+  public static QueryThreadContext openForSseTest() {
+    return open(QueryExecutionContext.forSseTest(), 
DefaultThreadResourceUsageAccountant.INSTANCE);
+  }
 
-    return context;
+  @VisibleForTesting
+  public static QueryThreadContext openForMseTest() {
+    return open(QueryExecutionContext.forMseTest(), new MseWorkerInfo(0, 0),
+        DefaultThreadResourceUsageAccountant.INSTANCE);
   }
 
-  /**
-   * Returns a new {@link ExecutorService} whose tasks will be executed with 
the {@link QueryThreadContext} initialized
-   * with the state of the thread submitting the tasks.
-   *
-   * @param executorService the {@link ExecutorService} to decorate
-   */
-  public static ExecutorService contextAwareExecutorService(ExecutorService 
executorService) {
-    return contextAwareExecutorService(executorService, 
QueryThreadContext::createMemento);
+  /// Returns the [QueryThreadContext] for the current thread.
+  public static QueryThreadContext get() {
+    QueryThreadContext threadContext = THREAD_LOCAL.get();
+    assert threadContext != null;
+    return threadContext;
   }
 
-  /**
-   * Returns a new {@link ExecutorService} whose tasks will be executed with 
the {@link QueryThreadContext} initialized
-   * with the state captured in the given {@link Memento} object.
-   *
-   * @param executorService the {@link ExecutorService} to decorate
-   * @param mementoSupplier a supplier that provides the {@link Memento} 
object to capture the state from
-   */
-  public static ExecutorService contextAwareExecutorService(
-      ExecutorService executorService,
-      Supplier<Memento> mementoSupplier) {
-    return new DecoratorExecutorService(executorService) {
+  /// Returns the [QueryThreadContext] for the current thread, or `null` if 
not available.
+  @Nullable
+  public static QueryThreadContext getIfAvailable() {
+    return THREAD_LOCAL.get();
+  }
+
+  /// Returns a new [ExecutorService] whose tasks will be executed with the 
[QueryThreadContext] initialized with the
+  /// state of the thread submitting the tasks.
+  public static ExecutorService contextAwareExecutorService(ExecutorService 
executorService) {
+    return new DecoratorExecutorService(executorService, future -> 
get().getExecutionContext().addTask(future)) {

Review Comment:
   I have no data, but I'm a bit worried about the contention this may 
generate, given in MSE we call this method several times in a row when we 
receive a query with multiple stages on the same server.
   
   The good news is that this is not difficult to verify. We can run benchmarks 
with JFR enabled and study the contention



##########
pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java:
##########
@@ -18,680 +18,257 @@
  */
 package org.apache.pinot.spi.query;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
-import org.apache.pinot.spi.env.PinotConfiguration;
+import 
org.apache.pinot.spi.accounting.ThreadAccounting.DefaultThreadResourceUsageAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.exception.EarlyTerminationException;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.spi.executor.DecoratorExecutorService;
 import org.apache.pinot.spi.trace.LoggerConstants;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
 
-/**
- * The {@code QueryThreadContext} class is a thread-local context for storing 
common query-related information
- * associated to the current thread.
- *
- * <p>It is used to pass information between different layers of the query 
execution stack without changing the
- * method signatures. This is also used to populate the {@link MDC} context 
for logging.
- *
- * Use {@link #open(String)} to initialize the empty context. As any other 
{@link AutoCloseable} object, it should be
- * used within a try-with-resources block to ensure the context is properly 
closed and removed from the thread-local
- * storage.
- *
- * Sometimes it is necessary to copy the state of the {@link 
QueryThreadContext} from one thread to another. In this
- * case, use the {@link #createMemento()} method to capture the state of the 
{@link QueryThreadContext} in the current
- * thread and then use the {@link #open(Memento)} method to initialize the 
context in the target thread with the state
- * captured in the {@link Memento} object. The API may be a bit cumbersome, 
but it ensures that the state is always
- * copied between threads in a safe way and makes it impossible to use the 
{@link QueryThreadContext} from another
- * thread by mistake.
- *
- * It is guaranteed that all server and broker threads running SSE and MSE 
will have this context initialized as soon
- * as the request is received. Ingestion threads that use the query execution 
stack will also have this context
- * initialized.
- */
-public class QueryThreadContext {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryThreadContext.class);
-  private static final ThreadLocal<Instance> THREAD_LOCAL = new 
ThreadLocal<>();
-  public static volatile boolean _strictMode;
-  private static final FakeInstance FAKE_INSTANCE = new FakeInstance();
-
-  static {
-    // This is a hack to know if assertions are enabled or not
-    boolean assertEnabled = false;
-    //CHECKSTYLE:OFF
-    assert assertEnabled = true;
-    //CHECKSTYLE:ON
-    _strictMode = assertEnabled;
-  }
 
-  /**
-   * Private constructor to prevent instantiation.
-   *
-   * Use {@link #open(String)} to initialize the context instead.
-   */
-  private QueryThreadContext() {
-  }
+/// The [QueryThreadContext] class is a thread-local context for storing 
common query-related information associated to
+/// the current thread.
+///
+/// It is used to pass information between different layers of the query 
execution stack without changing the method
+/// signatures. This is also used to populate the [org.slf4j.MDC] context for 
logging.
+///
+/// Use [#open] to initialize the empty context. As any other [AutoCloseable] 
object, it should be used within a
+/// try-with-resources block to ensure the context is properly closed and 
removed from the thread-local storage and
+/// resource usage accountant.
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class QueryThreadContext implements AutoCloseable {
+  // Check if query should be terminated, and sample resource usage per 8192 
records
+  public static final int CHECK_TERMINATION_AND_SAMPLE_USAGE_RECORD_MASK = 
0x1FFF;
 
-  /**
-   * Sets the strict mode of the {@link QueryThreadContext} from the given 
configuration.
-   */
-  public static void onStartup(PinotConfiguration conf) {
-    String mode = 
conf.getProperty(CommonConstants.Query.CONFIG_OF_QUERY_CONTEXT_MODE);
-    if ("strict".equalsIgnoreCase(mode)) {
-      _strictMode = true;
-      return;
-    }
-    if (mode != null && !mode.isEmpty()) {
-      throw new IllegalArgumentException("Invalid value '" + mode + "' for "
-          + CommonConstants.Query.CONFIG_OF_QUERY_CONTEXT_MODE + ". Expected 
'strict' or empty");
+  private static final ThreadLocal<QueryThreadContext> THREAD_LOCAL = new 
ThreadLocal<>();
+
+  private final QueryExecutionContext _executionContext;
+  private final MseWorkerInfo _mseWorkerInfo;
+  private final ThreadResourceUsageAccountant _resourceUsageAccountant;
+
+  private QueryThreadContext(QueryExecutionContext executionContext, @Nullable 
MseWorkerInfo mseWorkerInfo,
+      ThreadResourceUsageAccountant resourceUsageAccountant) {
+    _executionContext = executionContext;
+    _mseWorkerInfo = mseWorkerInfo;
+    _resourceUsageAccountant = resourceUsageAccountant;
+    
LoggerConstants.REQUEST_ID_KEY.registerInMdc(Long.toString(executionContext.getRequestId()));
+    
LoggerConstants.CORRELATION_ID_KEY.registerInMdc(executionContext.getCid());
+    if (mseWorkerInfo != null) {
+      
LoggerConstants.STAGE_ID_KEY.registerInMdc(Integer.toString(mseWorkerInfo.getStageId()));
+      
LoggerConstants.WORKER_ID_KEY.registerInMdc(Integer.toString(mseWorkerInfo.getWorkerId()));
     }
   }
 
-  /**
-   * Returns {@code true} if the {@link QueryThreadContext} is in strict mode.
-   *
-   * In strict mode, if the {@link QueryThreadContext} is not initialized, an 
{@link IllegalStateException} will be
-   * thrown when setter and getter methods are used.
-   * In non-strict mode, a warning will be logged and the fake instance will 
be returned.
-   *
-   * @see #onStartup(PinotConfiguration)
-   */
-  public static boolean isStrictMode() {
-    return _strictMode;
+  public QueryExecutionContext getExecutionContext() {
+    return _executionContext;
   }
 
-  private static Instance get() {
-    Instance instance = THREAD_LOCAL.get();
-    if (instance == null) {
-      String errorMessage = "QueryThreadContext is not initialized";
-      if (_strictMode) {
-        LOGGER.error(errorMessage);
-        throw new IllegalStateException(errorMessage);
-      } else {
-        LOGGER.debug(errorMessage);
-        // in non-strict mode, return the fake instance
-        return FAKE_INSTANCE;
-      }
-    }
-    return instance;
+  @Nullable
+  public MseWorkerInfo getMseWorkerInfo() {
+    return _mseWorkerInfo;
   }
 
-  /**
-   * Returns {@code true} if the {@link QueryThreadContext} is initialized in 
the current thread.
-   *
-   * Initializing the context means that the {@link #open(String)} method was 
called and the returned object is not
-   * closed yet.
-   */
-  public static boolean isInitialized() {
-    return THREAD_LOCAL.get() != null;
+  @JsonIgnore
+  public ThreadResourceUsageAccountant getResourceUsageAccountant() {
+    return _resourceUsageAccountant;
   }
 
-  /**
-   * Captures the state of the {@link QueryThreadContext} in the current 
thread.
-   *
-   * This method is used to capture the state of the {@link 
QueryThreadContext} in the current thread so that it can be
-   * restored later in another thread.
-   *
-   * @return a {@link Memento} object that captures the state of the {@link 
QueryThreadContext}
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   * @see #open(Memento)
-   */
-  public static Memento createMemento() {
-    return new Memento(get());
+  private void sampleUsageInternal() {
+    _resourceUsageAccountant.sampleUsage();
   }
 
-  /**
-   * Initializes the {@link QueryThreadContext} with default values.
-   *
-   * This method will throw an {@link IllegalStateException} if the {@link 
QueryThreadContext} is already initialized.
-   * That indicates an error in the code. Older context must be closed before 
opening a new one.
-   *
-   * @return an {@link AutoCloseable} object that should be used within a 
try-with-resources block
-   * @throws IllegalStateException if the {@link QueryThreadContext} is 
already initialized.
-   */
-  @VisibleForTesting
-  public static CloseableContext open() {
-    return open("unknown");
+  private void checkTerminationInternal(Supplier<String> scopeSupplier) {
+    checkTerminationInternal(scopeSupplier, 
_executionContext.getActiveDeadlineMs());
   }
 
-  public static CloseableContext open(String instanceId) {
-    CloseableContext open = open((Memento) null);
-    get()._instanceId = instanceId;
-    return open;
+  private void checkTerminationInternal(Supplier<String> scopeSupplier, long 
deadlineMs) {
+    QueryException terminateException = 
_executionContext.getTerminateException();
+    if (terminateException != null) {
+      throw terminateException;
+    }
+    if (Thread.interrupted()) {
+      throw new EarlyTerminationException("Interrupted on: " + 
scopeSupplier.get());
+    }
+    if (System.currentTimeMillis() >= deadlineMs) {
+      throw QueryErrorCode.EXECUTION_TIMEOUT.asException("Timing out on: " + 
scopeSupplier.get());
+    }
   }
 
-  public static CloseableContext openFromRequestMetadata(String instanceId, 
Map<String, String> requestMetadata) {
-    CloseableContext open = open(instanceId);
-    String cid = 
requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.CORRELATION_ID);
-    long requestId = 
Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
-    if (cid == null) {
-      cid = Long.toString(requestId);
+  /// Closes the {@link QueryThreadContext} and removes it from the 
thread-local storage and MDC context.
+  @Override
+  public void close() {
+    _resourceUsageAccountant.clear();
+    THREAD_LOCAL.remove();
+    LoggerConstants.REQUEST_ID_KEY.unregisterFromMdc();
+    LoggerConstants.CORRELATION_ID_KEY.unregisterFromMdc();
+    if (_mseWorkerInfo != null) {
+      LoggerConstants.STAGE_ID_KEY.unregisterFromMdc();
+      LoggerConstants.WORKER_ID_KEY.unregisterFromMdc();
     }
-    QueryThreadContext.setIds(requestId, cid);
-    long timeoutMs = 
Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
-    long extraPassiveTimeoutMs = Long.parseLong(requestMetadata.getOrDefault(
-        
CommonConstants.Broker.Request.QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS, "0"));
-    long startTimeMs = System.currentTimeMillis();
-    QueryThreadContext.setStartTimeMs(startTimeMs);
-    QueryThreadContext.setActiveDeadlineMs(startTimeMs + timeoutMs);
-    QueryThreadContext.setPassiveDeadlineMs(startTimeMs + timeoutMs + 
extraPassiveTimeoutMs);
-
-    return open;
   }
 
-  /**
-   * Initializes the {@link QueryThreadContext} with the state captured in the 
given {@link Memento} object, if any.
-   *
-   * This method will throw an {@link IllegalStateException} if the {@link 
QueryThreadContext} is already initialized.
-   * That indicates an error in the code. Older context must be closed before 
opening a new one.
-   *
-   * Values that were set in the {@link Memento} object will be set in the 
{@link QueryThreadContext} and therefore
-   * they couldn't be set again in the current thread (at least until the 
returned {@link AutoCloseable} is closed).
-   *
-   * @param memento the {@link Memento} object to capture the state from
-   *                (if {@code null}, the context will be initialized with 
default values)
-   * @return an {@link AutoCloseable} object that should be used within a 
try-with-resources block
-   * @throws IllegalStateException if the {@link QueryThreadContext} is 
already initialized.
-   */
-  public static CloseableContext open(@Nullable Memento memento) {
-    if (THREAD_LOCAL.get() != null) {
-      String errorMessage = "QueryThreadContext is already initialized";
-      if (_strictMode) {
-        LOGGER.error(errorMessage);
-        throw new IllegalStateException("QueryThreadContext is already 
initialized");
-      } else {
-        LOGGER.debug(errorMessage);
-        return FAKE_INSTANCE;
-      }
-    }
+  /// Opens a new [QueryThreadContext] for the current thread and add it to 
the thread-local storage.
+  public static QueryThreadContext open(QueryExecutionContext executionContext,
+      ThreadResourceUsageAccountant accountant) {
+    return open(executionContext, null, accountant);
+  }
 
-    Instance context = new Instance();
-    if (memento != null) {
-      context.setStartTimeMs(memento._startTimeMs);
-      context.setActiveDeadlineMs(memento._activeDeadlineMs);
-      context.setPassiveDeadlineMs(memento._passiveDeadlineMs);
-      context.setBrokerId(memento._brokerId);
-      context.setRequestId(memento._requestId);
-      context.setCid(memento._cid);
-      context.setSql(memento._sql);
-      context.setQueryEngine(memento._queryEngine);
-      context.setInstanceId(memento._instanceId);
-    }
+  /// Opens a new [QueryThreadContext] for the current thread and add it to 
the thread-local storage.
+  public static QueryThreadContext open(QueryExecutionContext 
executionContext, @Nullable MseWorkerInfo mseWorkerInfo,
+      ThreadResourceUsageAccountant accountant) {
+    QueryThreadContext threadContext = new 
QueryThreadContext(executionContext, mseWorkerInfo, accountant);
+    THREAD_LOCAL.set(threadContext);
+    accountant.setupTask(threadContext);
+    return threadContext;
+  }
 
-    THREAD_LOCAL.set(context);
+  @VisibleForTesting
+  public static QueryThreadContext openForSseTest() {
+    return open(QueryExecutionContext.forSseTest(), 
DefaultThreadResourceUsageAccountant.INSTANCE);
+  }
 
-    return context;
+  @VisibleForTesting
+  public static QueryThreadContext openForMseTest() {
+    return open(QueryExecutionContext.forMseTest(), new MseWorkerInfo(0, 0),
+        DefaultThreadResourceUsageAccountant.INSTANCE);
   }
 
-  /**
-   * Returns a new {@link ExecutorService} whose tasks will be executed with 
the {@link QueryThreadContext} initialized
-   * with the state of the thread submitting the tasks.
-   *
-   * @param executorService the {@link ExecutorService} to decorate
-   */
-  public static ExecutorService contextAwareExecutorService(ExecutorService 
executorService) {
-    return contextAwareExecutorService(executorService, 
QueryThreadContext::createMemento);
+  /// Returns the [QueryThreadContext] for the current thread.
+  public static QueryThreadContext get() {
+    QueryThreadContext threadContext = THREAD_LOCAL.get();
+    assert threadContext != null;
+    return threadContext;
   }
 
-  /**
-   * Returns a new {@link ExecutorService} whose tasks will be executed with 
the {@link QueryThreadContext} initialized
-   * with the state captured in the given {@link Memento} object.
-   *
-   * @param executorService the {@link ExecutorService} to decorate
-   * @param mementoSupplier a supplier that provides the {@link Memento} 
object to capture the state from
-   */
-  public static ExecutorService contextAwareExecutorService(
-      ExecutorService executorService,
-      Supplier<Memento> mementoSupplier) {
-    return new DecoratorExecutorService(executorService) {
+  /// Returns the [QueryThreadContext] for the current thread, or `null` if 
not available.
+  @Nullable
+  public static QueryThreadContext getIfAvailable() {
+    return THREAD_LOCAL.get();
+  }
+
+  /// Returns a new [ExecutorService] whose tasks will be executed with the 
[QueryThreadContext] initialized with the
+  /// state of the thread submitting the tasks.
+  public static ExecutorService contextAwareExecutorService(ExecutorService 
executorService) {
+    return new DecoratorExecutorService(executorService, future -> 
get().getExecutionContext().addTask(future)) {
       @Override
       protected <T> Callable<T> decorate(Callable<T> task) {
-        Memento memento = mementoSupplier.get();
+        QueryThreadContext parentThreadContext = get();
         return () -> {
-          try (CloseableContext ignored = open(memento)) {
+          try (QueryThreadContext ignore = 
open(parentThreadContext._executionContext,
+              parentThreadContext._mseWorkerInfo, 
parentThreadContext._resourceUsageAccountant)) {
             return task.call();
           }
         };
       }
 
       @Override
       protected Runnable decorate(Runnable task) {
-        Memento memento = mementoSupplier.get();
+        QueryThreadContext parentThreadContext = get();
         return () -> {
-          try (CloseableContext ignored = open(memento)) {
+          try (QueryThreadContext ignore = 
open(parentThreadContext._executionContext,
+              parentThreadContext._mseWorkerInfo, 
parentThreadContext._resourceUsageAccountant)) {
             task.run();
           }
         };
       }
     };
   }
 
-  /**
-   * Returns the start time of the query in milliseconds.
-   *
-   * The default value of 0 means the start time is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static long getStartTimeMs() {
-    return get().getStartTimeMs();
-  }
-
-  /**
-   * Sets the start time of the query in milliseconds since epoch.
-   *
-   * The start time can only be set once.
-   * @throws IllegalStateException if start time is already set or if the 
{@link QueryThreadContext} is not initialized
-   */
-  public static void setStartTimeMs(long startTimeMs) {
-    get().setStartTimeMs(startTimeMs);
-  }
-
-  /**
-   * Returns the active deadline time of the query in milliseconds since epoch.
-   *
-   * The default value of 0 means the deadline is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static long getActiveDeadlineMs() {
-    return get().getActiveDeadlineMs();
-  }
-
-  /**
-   * Sets the active deadline time of the query in milliseconds since epoch.
-   *
-   * The deadline can only be set once.
-   * @throws IllegalStateException if deadline is already set or if the {@link 
QueryThreadContext} is not initialized
-   */
-  public static void setActiveDeadlineMs(long activeDeadlineMs) {
-    get().setActiveDeadlineMs(activeDeadlineMs);
-  }
-
-  /**
-   * Returns the passive deadline time of the query in milliseconds since 
epoch.
-   *
-   * The default value of 0 means the deadline is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static long getPassiveDeadlineMs() {
-    return get().getPassiveDeadlineMs();
-  }
-
-  /**
-   * Sets the passive deadline time of the query in milliseconds since epoch.
-   *
-   * The deadline can only be set once.
-   * @throws IllegalStateException if deadline is already set or if the {@link 
QueryThreadContext} is not initialized
-   */
-  public static void setPassiveDeadlineMs(long passiveDeadlineMs) {
-    get().setPassiveDeadlineMs(passiveDeadlineMs);
-  }
-
-  /**
-   * Returns the timeout of the query in milliseconds.
-   *
-   * The default value of 0 means the timeout is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static String getBrokerId() {
-    return get().getBrokerId();
-  }
-
-  /**
-   * Sets the broker id of the query.
-   *
-   * The broker id can only be set once.
-   * @throws IllegalStateException if broker id is already set or if the 
{@link QueryThreadContext} is not initialized
-   */
-  public static void setBrokerId(String brokerId) {
-    get().setBrokerId(brokerId);
-  }
-
-  /**
-   * Returns the request id of the query.
-   *
-   * The request id is used to identify query phases across different systems.
-   * Contrary to the correlation id, a single logical query can have multiple 
request ids.
-   * This is because the request id is changed at different stages of the 
query (e.g. real-time and offline parts of
-   * the query or different leaf operations in MSE).
-   *
-   * Also remember that neither request nor correlation id are guaranteed to 
be unique.
-   *
-   * The default value of 0 means the request id is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static long getRequestId() {
-    return get().getRequestId();
-  }
-
-  /**
-   * Returns the correlation id of the query.
-   *
-   * Correlation id is used to track queries across different systems.
-   * This id can be supplied by the client or generated by the system (usually 
the broker). It is not guaranteed to be
-   * unique. Users can use the same correlation id for multiple queries to 
track a single logical workflow on their
-   * side.
-   *
-   * What is guaranteed is that the correlation id will be consistent across 
all the phases of the query. Remember that
-   * it is not the case for the request id, which can change at different 
stages of the query (e.g. real-time and
-   * offline parts of the query or different leaf operations in MSE).
-   *
-   * Also remember that neither request nor correlation id are guaranteed to 
be unique.
-   *
-   * The default value of {@code null} means the correlation id is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static String getCid() {
-    return get().getCid();
-  }
-
-  /**
-   * Sets the request id and correlation id of the query.
-   *
-   * Both requests can only be set once.
-   *
-   * Setting this value also registers it in the MDC context with the key 
{@link LoggerConstants#REQUEST_ID_KEY} and
-   * {@link LoggerConstants#CORRELATION_ID_KEY}.
-   * @throws IllegalStateException if any of the ids are already set or if the 
{@link QueryThreadContext} is not
-   * initialized
-   */
-  public static void setIds(long requestId, String cid) {
-    Instance instance = get();
-    instance.setRequestId(requestId);
-    instance.setCid(cid);
-  }
-
-  /**
-   * Returns the SQL of the query.
-   *
-   * The default value of {@code null} means the SQL is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static String getSql() {
-    return get().getSql();
-  }
-
-  /**
-   * Sets the SQL of the query.
-   *
-   * The SQL can only be set once.
-   * @throws IllegalStateException if sql is already set or if the {@link 
QueryThreadContext} is not initialized
-   */
-  public static void setSql(String sql) {
-    get().setSql(sql);
-  }
-
-  /**
-   * Returns the query engine that is being used.
-   *
-   * The default value of {@code null} means the query engine is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static String getQueryEngine() {
-    return get().getQueryEngine();
-  }
-
-  /**
-   * Sets the query engine that is being used.
-   *
-   * The query engine can only be set once.
-   * @throws IllegalStateException if query engine is already set or if the 
{@link QueryThreadContext} is not
-   * initialized
-   */
-  public static void setQueryEngine(String queryEngine) {
-    get().setQueryEngine(queryEngine);
-  }
-
-  /**
-   * Returns the instanceid of the query.
-   *
-   * This is usually the id that identifies the server, broker, controller, 
etc.
-   *
-   * The default value of {@code null} means the instanceid is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static String getInstanceId() {
-    return get().getInstanceId();
-  }
-
-  /**
-   * This private class stores the actual state of the {@link 
QueryThreadContext} in a safe way.
-   *
-   * As part of the paranoid design of the {@link QueryThreadContext}, this 
class is used to store the state of the
-   * {@link QueryThreadContext} in a way that is safe to copy between threads.
-   * Callers never have access to this class directly, but only through the 
{@link QueryThreadContext} static methods.
-   * This is designed this way to ensure that the state is always copied 
between threads in a safe way.
-   * Given it is impossible for a caller to access this class directly, it can 
never be copied from one thread to
-   * another by mistake.
-   * This forces to use the {@link Memento} class to capture the state of the 
{@link QueryThreadContext} in the current
-   * thread and then use the {@link #open(Memento)} method to initialize the 
context in the target thread with the state
-   * captured in the {@link Memento} object.
-   */
-  private static class Instance implements CloseableContext {
-    private long _startTimeMs;
-    private long _activeDeadlineMs;
-    private long _passiveDeadlineMs;
-    private String _brokerId;
-    private long _requestId;
-    private String _cid;
-    private String _sql;
-    private String _queryEngine;
-    private String _instanceId;
-
-    public long getStartTimeMs() {
-      return _startTimeMs;
-    }
-
-    public void setStartTimeMs(long startTimeMs) {
-      Preconditions.checkState(getStartTimeMs() == 0, "Start time already set 
to %s, cannot set again",
-          getStartTimeMs());
-      _startTimeMs = startTimeMs;
-    }
-
-    public long getActiveDeadlineMs() {
-      return _activeDeadlineMs;
-    }
-
-    public void setActiveDeadlineMs(long activeDeadlineMs) {
-      Preconditions.checkState(getActiveDeadlineMs() == 0, "Deadline already 
set to %s, cannot set again",
-          getActiveDeadlineMs());
-      _activeDeadlineMs = activeDeadlineMs;
-    }
-
-    public long getPassiveDeadlineMs() {
-      return _passiveDeadlineMs;
-    }
-
-    public void setPassiveDeadlineMs(long passiveDeadlineMs) {
-      Preconditions.checkState(getPassiveDeadlineMs() == 0, "Passive deadline 
already set to %s, cannot set again",
-          getPassiveDeadlineMs());
-      _passiveDeadlineMs = passiveDeadlineMs;
-    }
-
-    public String getBrokerId() {
-      return _brokerId;
-    }
-
-    public void setBrokerId(String brokerId) {
-      Preconditions.checkState(getBrokerId() == null, "Broker id already set 
to %s, cannot set again",
-          getBrokerId());
-      _brokerId = brokerId;
-    }
-
-    public long getRequestId() {
-      return _requestId;
-    }
-
-    public void setRequestId(long requestId) {
-      Preconditions.checkState(getRequestId() == 0, "Request id already set to 
%s, cannot set again", getRequestId());
-      LoggerConstants.REQUEST_ID_KEY.registerInMdc(Long.toString(requestId));
-      _requestId = requestId;
-    }
-
-    public String getCid() {
-      return _cid;
-    }
-
-    public void setCid(String cid) {
-      Preconditions.checkState(getCid() == null, "Correlation id already set 
to %s, cannot set again",
-          getCid());
-      LoggerConstants.CORRELATION_ID_KEY.registerInMdc(cid);
-      _cid = cid;
-    }
-
-    public String getSql() {
-      return _sql;
-    }
-
-    public void setSql(String sql) {
-      Preconditions.checkState(getSql() == null, "SQL already set to %s, 
cannot set again",
-          getSql());
-      _sql = sql;
-    }
-
-    public String getQueryEngine() {
-      return _queryEngine;
-    }
-
-    public void setQueryEngine(String queryType) {
-      Preconditions.checkState(getQueryEngine() == null, "Query type already 
set to %s, cannot set again",
-          getQueryEngine());
-      _queryEngine = queryType;
-    }
-
-    public void setInstanceId(String instanceId) {
-      Preconditions.checkState(_instanceId == null, "Service id already set to 
%s, cannot set again",
-          getInstanceId());
-      _instanceId = instanceId;
-    }
-
-    public String getInstanceId() {
-      return _instanceId;
-    }
-
-    @Override
-    public String toString() {
-      try {
-        return JsonUtils.objectToString(this);
-      } catch (JsonProcessingException e) {
-        return "Failed to convert QueryThreadContext to JSON: " + 
e.getMessage();
-      }
-    }
-
-    /**
-     * Closes the {@link QueryThreadContext} and removes it from the 
thread-local storage and MDC context.
-     */
-    @Override
-    public void close() {
-      THREAD_LOCAL.remove();
-      if (_requestId != 0) {
-        LoggerConstants.REQUEST_ID_KEY.unregisterFromMdc();
-      }
-      if (_cid != null) {
-        LoggerConstants.CORRELATION_ID_KEY.unregisterFromMdc();
-      }
+  /// Checks if the query should be terminated.
+  public static void checkTermination(Supplier<String> scopeSupplier) {

Review Comment:
   nit: We should add a comment describing the scopeSupplier. AFAIU it is used 
to provide some context in case the query is terminated. Something like that 
would be good enough



##########
pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java:
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.query;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.exception.QueryException;
+import org.apache.pinot.spi.utils.CommonConstants.Accounting;
+import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
+import org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys;
+import 
org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys.TimeSeries;
+
+
+/// The context for query execution. It should be shared across all the 
threads executing the same query
+/// ([QueryThreadContext#getExecutionContext()] returns the same instance). It 
tracks the tasks (as [Future]) executing
+/// the query, and provides a way to terminate the query by cancelling all the 
tasks.
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class QueryExecutionContext {
+
+  public enum QueryType {
+    SSE,  // Single-stage engine
+    MSE,  // Multi-stage engine
+    TSE   // Time-series engine
+  }
+
+  private final QueryType _queryType;
+  private final long _requestId;
+  private final String _cid;
+  private final String _workloadName;
+  private final long _startTimeMs;
+  private final long _activeDeadlineMs;
+  private final long _passiveDeadlineMs;
+  private final String _brokerId;
+  private final String _instanceId;
+
+  private final List<Future<?>> _tasks = new ArrayList<>();

Review Comment:
   nit: It would be cool to add a `javax.annotation.concurrent.GuardedBy` 
annotation



##########
pinot-core/src/main/java/org/apache/pinot/core/operator/BaseOperator.java:
##########
@@ -59,6 +53,18 @@ public ExplainInfo getExplainInfo() {
     return new ExplainInfo(getExplainName(), attributeBuilder.build(), 
getChildrenExplainInfo());
   }
 
+  protected void checkTermination() {
+    QueryThreadContext.checkTermination(this::getExplainName);
+  }
+
+  protected void checkTerminationAndSampleUsage() {
+    QueryThreadContext.checkTerminationAndSampleUsage(this::getExplainName);
+  }
+
+  protected void checkTerminationAndSampleUsagePeriodically(int 
numRecordsProcessed) {
+    
QueryThreadContext.checkTerminationAndSampleUsagePeriodically(numRecordsProcessed,
 this::getExplainName);
+  }

Review Comment:
   All these lambdas will allocate each time they are called. JIT _may_ remove 
the allocation, but we could also allocate an attribute so we have exactly one 
copy per operator instance (or in most cases, per class, as this attribute may 
be static)



##########
pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java:
##########
@@ -18,680 +18,257 @@
  */
 package org.apache.pinot.spi.query;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
-import org.apache.pinot.spi.env.PinotConfiguration;
+import 
org.apache.pinot.spi.accounting.ThreadAccounting.DefaultThreadResourceUsageAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.exception.EarlyTerminationException;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.spi.executor.DecoratorExecutorService;
 import org.apache.pinot.spi.trace.LoggerConstants;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
 
-/**
- * The {@code QueryThreadContext} class is a thread-local context for storing 
common query-related information
- * associated to the current thread.
- *
- * <p>It is used to pass information between different layers of the query 
execution stack without changing the
- * method signatures. This is also used to populate the {@link MDC} context 
for logging.
- *
- * Use {@link #open(String)} to initialize the empty context. As any other 
{@link AutoCloseable} object, it should be
- * used within a try-with-resources block to ensure the context is properly 
closed and removed from the thread-local
- * storage.
- *
- * Sometimes it is necessary to copy the state of the {@link 
QueryThreadContext} from one thread to another. In this
- * case, use the {@link #createMemento()} method to capture the state of the 
{@link QueryThreadContext} in the current
- * thread and then use the {@link #open(Memento)} method to initialize the 
context in the target thread with the state
- * captured in the {@link Memento} object. The API may be a bit cumbersome, 
but it ensures that the state is always
- * copied between threads in a safe way and makes it impossible to use the 
{@link QueryThreadContext} from another
- * thread by mistake.
- *
- * It is guaranteed that all server and broker threads running SSE and MSE 
will have this context initialized as soon
- * as the request is received. Ingestion threads that use the query execution 
stack will also have this context
- * initialized.
- */
-public class QueryThreadContext {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryThreadContext.class);
-  private static final ThreadLocal<Instance> THREAD_LOCAL = new 
ThreadLocal<>();
-  public static volatile boolean _strictMode;
-  private static final FakeInstance FAKE_INSTANCE = new FakeInstance();
-
-  static {
-    // This is a hack to know if assertions are enabled or not
-    boolean assertEnabled = false;
-    //CHECKSTYLE:OFF
-    assert assertEnabled = true;
-    //CHECKSTYLE:ON
-    _strictMode = assertEnabled;
-  }
 
-  /**
-   * Private constructor to prevent instantiation.
-   *
-   * Use {@link #open(String)} to initialize the context instead.
-   */
-  private QueryThreadContext() {
-  }
+/// The [QueryThreadContext] class is a thread-local context for storing 
common query-related information associated to
+/// the current thread.
+///
+/// It is used to pass information between different layers of the query 
execution stack without changing the method
+/// signatures. This is also used to populate the [org.slf4j.MDC] context for 
logging.
+///
+/// Use [#open] to initialize the empty context. As any other [AutoCloseable] 
object, it should be used within a
+/// try-with-resources block to ensure the context is properly closed and 
removed from the thread-local storage and
+/// resource usage accountant.
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class QueryThreadContext implements AutoCloseable {
+  // Check if query should be terminated, and sample resource usage per 8192 
records
+  public static final int CHECK_TERMINATION_AND_SAMPLE_USAGE_RECORD_MASK = 
0x1FFF;
 
-  /**
-   * Sets the strict mode of the {@link QueryThreadContext} from the given 
configuration.
-   */
-  public static void onStartup(PinotConfiguration conf) {
-    String mode = 
conf.getProperty(CommonConstants.Query.CONFIG_OF_QUERY_CONTEXT_MODE);
-    if ("strict".equalsIgnoreCase(mode)) {
-      _strictMode = true;
-      return;
-    }
-    if (mode != null && !mode.isEmpty()) {
-      throw new IllegalArgumentException("Invalid value '" + mode + "' for "
-          + CommonConstants.Query.CONFIG_OF_QUERY_CONTEXT_MODE + ". Expected 
'strict' or empty");
+  private static final ThreadLocal<QueryThreadContext> THREAD_LOCAL = new 
ThreadLocal<>();
+
+  private final QueryExecutionContext _executionContext;
+  private final MseWorkerInfo _mseWorkerInfo;
+  private final ThreadResourceUsageAccountant _resourceUsageAccountant;
+
+  private QueryThreadContext(QueryExecutionContext executionContext, @Nullable 
MseWorkerInfo mseWorkerInfo,
+      ThreadResourceUsageAccountant resourceUsageAccountant) {
+    _executionContext = executionContext;
+    _mseWorkerInfo = mseWorkerInfo;
+    _resourceUsageAccountant = resourceUsageAccountant;
+    
LoggerConstants.REQUEST_ID_KEY.registerInMdc(Long.toString(executionContext.getRequestId()));
+    
LoggerConstants.CORRELATION_ID_KEY.registerInMdc(executionContext.getCid());
+    if (mseWorkerInfo != null) {
+      
LoggerConstants.STAGE_ID_KEY.registerInMdc(Integer.toString(mseWorkerInfo.getStageId()));
+      
LoggerConstants.WORKER_ID_KEY.registerInMdc(Integer.toString(mseWorkerInfo.getWorkerId()));
     }
   }
 
-  /**
-   * Returns {@code true} if the {@link QueryThreadContext} is in strict mode.
-   *
-   * In strict mode, if the {@link QueryThreadContext} is not initialized, an 
{@link IllegalStateException} will be
-   * thrown when setter and getter methods are used.
-   * In non-strict mode, a warning will be logged and the fake instance will 
be returned.
-   *
-   * @see #onStartup(PinotConfiguration)
-   */
-  public static boolean isStrictMode() {
-    return _strictMode;
+  public QueryExecutionContext getExecutionContext() {
+    return _executionContext;
   }
 
-  private static Instance get() {
-    Instance instance = THREAD_LOCAL.get();
-    if (instance == null) {
-      String errorMessage = "QueryThreadContext is not initialized";
-      if (_strictMode) {
-        LOGGER.error(errorMessage);
-        throw new IllegalStateException(errorMessage);
-      } else {
-        LOGGER.debug(errorMessage);
-        // in non-strict mode, return the fake instance
-        return FAKE_INSTANCE;
-      }
-    }
-    return instance;
+  @Nullable
+  public MseWorkerInfo getMseWorkerInfo() {
+    return _mseWorkerInfo;
   }
 
-  /**
-   * Returns {@code true} if the {@link QueryThreadContext} is initialized in 
the current thread.
-   *
-   * Initializing the context means that the {@link #open(String)} method was 
called and the returned object is not
-   * closed yet.
-   */
-  public static boolean isInitialized() {
-    return THREAD_LOCAL.get() != null;
+  @JsonIgnore
+  public ThreadResourceUsageAccountant getResourceUsageAccountant() {
+    return _resourceUsageAccountant;
   }
 
-  /**
-   * Captures the state of the {@link QueryThreadContext} in the current 
thread.
-   *
-   * This method is used to capture the state of the {@link 
QueryThreadContext} in the current thread so that it can be
-   * restored later in another thread.
-   *
-   * @return a {@link Memento} object that captures the state of the {@link 
QueryThreadContext}
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   * @see #open(Memento)
-   */
-  public static Memento createMemento() {
-    return new Memento(get());
+  private void sampleUsageInternal() {
+    _resourceUsageAccountant.sampleUsage();
   }
 
-  /**
-   * Initializes the {@link QueryThreadContext} with default values.
-   *
-   * This method will throw an {@link IllegalStateException} if the {@link 
QueryThreadContext} is already initialized.
-   * That indicates an error in the code. Older context must be closed before 
opening a new one.
-   *
-   * @return an {@link AutoCloseable} object that should be used within a 
try-with-resources block
-   * @throws IllegalStateException if the {@link QueryThreadContext} is 
already initialized.
-   */
-  @VisibleForTesting
-  public static CloseableContext open() {
-    return open("unknown");
+  private void checkTerminationInternal(Supplier<String> scopeSupplier) {
+    checkTerminationInternal(scopeSupplier, 
_executionContext.getActiveDeadlineMs());
   }
 
-  public static CloseableContext open(String instanceId) {
-    CloseableContext open = open((Memento) null);
-    get()._instanceId = instanceId;
-    return open;
+  private void checkTerminationInternal(Supplier<String> scopeSupplier, long 
deadlineMs) {
+    QueryException terminateException = 
_executionContext.getTerminateException();
+    if (terminateException != null) {
+      throw terminateException;
+    }
+    if (Thread.interrupted()) {
+      throw new EarlyTerminationException("Interrupted on: " + 
scopeSupplier.get());
+    }
+    if (System.currentTimeMillis() >= deadlineMs) {
+      throw QueryErrorCode.EXECUTION_TIMEOUT.asException("Timing out on: " + 
scopeSupplier.get());
+    }
   }
 
-  public static CloseableContext openFromRequestMetadata(String instanceId, 
Map<String, String> requestMetadata) {
-    CloseableContext open = open(instanceId);
-    String cid = 
requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.CORRELATION_ID);
-    long requestId = 
Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
-    if (cid == null) {
-      cid = Long.toString(requestId);
+  /// Closes the {@link QueryThreadContext} and removes it from the 
thread-local storage and MDC context.
+  @Override
+  public void close() {
+    _resourceUsageAccountant.clear();
+    THREAD_LOCAL.remove();
+    LoggerConstants.REQUEST_ID_KEY.unregisterFromMdc();
+    LoggerConstants.CORRELATION_ID_KEY.unregisterFromMdc();
+    if (_mseWorkerInfo != null) {
+      LoggerConstants.STAGE_ID_KEY.unregisterFromMdc();
+      LoggerConstants.WORKER_ID_KEY.unregisterFromMdc();
     }
-    QueryThreadContext.setIds(requestId, cid);
-    long timeoutMs = 
Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
-    long extraPassiveTimeoutMs = Long.parseLong(requestMetadata.getOrDefault(
-        
CommonConstants.Broker.Request.QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS, "0"));
-    long startTimeMs = System.currentTimeMillis();
-    QueryThreadContext.setStartTimeMs(startTimeMs);
-    QueryThreadContext.setActiveDeadlineMs(startTimeMs + timeoutMs);
-    QueryThreadContext.setPassiveDeadlineMs(startTimeMs + timeoutMs + 
extraPassiveTimeoutMs);
-
-    return open;
   }
 
-  /**
-   * Initializes the {@link QueryThreadContext} with the state captured in the 
given {@link Memento} object, if any.
-   *
-   * This method will throw an {@link IllegalStateException} if the {@link 
QueryThreadContext} is already initialized.
-   * That indicates an error in the code. Older context must be closed before 
opening a new one.
-   *
-   * Values that were set in the {@link Memento} object will be set in the 
{@link QueryThreadContext} and therefore
-   * they couldn't be set again in the current thread (at least until the 
returned {@link AutoCloseable} is closed).
-   *
-   * @param memento the {@link Memento} object to capture the state from
-   *                (if {@code null}, the context will be initialized with 
default values)
-   * @return an {@link AutoCloseable} object that should be used within a 
try-with-resources block
-   * @throws IllegalStateException if the {@link QueryThreadContext} is 
already initialized.
-   */
-  public static CloseableContext open(@Nullable Memento memento) {
-    if (THREAD_LOCAL.get() != null) {
-      String errorMessage = "QueryThreadContext is already initialized";
-      if (_strictMode) {
-        LOGGER.error(errorMessage);
-        throw new IllegalStateException("QueryThreadContext is already 
initialized");
-      } else {
-        LOGGER.debug(errorMessage);
-        return FAKE_INSTANCE;
-      }
-    }
+  /// Opens a new [QueryThreadContext] for the current thread and add it to 
the thread-local storage.
+  public static QueryThreadContext open(QueryExecutionContext executionContext,
+      ThreadResourceUsageAccountant accountant) {
+    return open(executionContext, null, accountant);
+  }
 
-    Instance context = new Instance();
-    if (memento != null) {
-      context.setStartTimeMs(memento._startTimeMs);
-      context.setActiveDeadlineMs(memento._activeDeadlineMs);
-      context.setPassiveDeadlineMs(memento._passiveDeadlineMs);
-      context.setBrokerId(memento._brokerId);
-      context.setRequestId(memento._requestId);
-      context.setCid(memento._cid);
-      context.setSql(memento._sql);
-      context.setQueryEngine(memento._queryEngine);
-      context.setInstanceId(memento._instanceId);
-    }
+  /// Opens a new [QueryThreadContext] for the current thread and add it to 
the thread-local storage.
+  public static QueryThreadContext open(QueryExecutionContext 
executionContext, @Nullable MseWorkerInfo mseWorkerInfo,
+      ThreadResourceUsageAccountant accountant) {
+    QueryThreadContext threadContext = new 
QueryThreadContext(executionContext, mseWorkerInfo, accountant);
+    THREAD_LOCAL.set(threadContext);
+    accountant.setupTask(threadContext);
+    return threadContext;
+  }
 
-    THREAD_LOCAL.set(context);
+  @VisibleForTesting
+  public static QueryThreadContext openForSseTest() {
+    return open(QueryExecutionContext.forSseTest(), 
DefaultThreadResourceUsageAccountant.INSTANCE);
+  }
 
-    return context;
+  @VisibleForTesting
+  public static QueryThreadContext openForMseTest() {
+    return open(QueryExecutionContext.forMseTest(), new MseWorkerInfo(0, 0),
+        DefaultThreadResourceUsageAccountant.INSTANCE);
   }
 
-  /**
-   * Returns a new {@link ExecutorService} whose tasks will be executed with 
the {@link QueryThreadContext} initialized
-   * with the state of the thread submitting the tasks.
-   *
-   * @param executorService the {@link ExecutorService} to decorate
-   */
-  public static ExecutorService contextAwareExecutorService(ExecutorService 
executorService) {
-    return contextAwareExecutorService(executorService, 
QueryThreadContext::createMemento);
+  /// Returns the [QueryThreadContext] for the current thread.
+  public static QueryThreadContext get() {
+    QueryThreadContext threadContext = THREAD_LOCAL.get();
+    assert threadContext != null;
+    return threadContext;

Review Comment:
   Are we sure an assertion is enough? Our tests don't verify all possible 
paths, so I think it is possible to end up having production cases where the 
thread context is not initialized. This is why in the older code we returned a 
dummy object in that case. This means we won't be able to get some context 
information, but most of the time the query can progress.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java:
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.query;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.exception.QueryException;
+import org.apache.pinot.spi.utils.CommonConstants.Accounting;
+import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
+import org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys;
+import 
org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys.TimeSeries;
+
+
+/// The context for query execution. It should be shared across all the 
threads executing the same query
+/// ([QueryThreadContext#getExecutionContext()] returns the same instance). It 
tracks the tasks (as [Future]) executing
+/// the query, and provides a way to terminate the query by cancelling all the 
tasks.
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class QueryExecutionContext {
+
+  public enum QueryType {
+    SSE,  // Single-stage engine
+    MSE,  // Multi-stage engine
+    TSE   // Time-series engine
+  }
+
+  private final QueryType _queryType;
+  private final long _requestId;
+  private final String _cid;
+  private final String _workloadName;
+  private final long _startTimeMs;
+  private final long _activeDeadlineMs;
+  private final long _passiveDeadlineMs;
+  private final String _brokerId;
+  private final String _instanceId;
+
+  private final List<Future<?>> _tasks = new ArrayList<>();
+  private volatile QueryException _terminateException;
+
+  public QueryExecutionContext(QueryType queryType, long requestId, String 
cid, String workloadName, long startTimeMs,
+      long activeDeadlineMs, long passiveDeadlineMs, String brokerId, String 
instanceId) {
+    _queryType = queryType;
+    _requestId = requestId;
+    _cid = cid;
+    _workloadName = workloadName;
+    _startTimeMs = startTimeMs;
+    _activeDeadlineMs = activeDeadlineMs;
+    _passiveDeadlineMs = passiveDeadlineMs;
+    _brokerId = brokerId;
+    _instanceId = instanceId;
+  }
+
+  public static QueryExecutionContext forMseServerRequest(Map<String, String> 
requestMetadata, String instanceId) {
+    long startTimeMs = System.currentTimeMillis();
+    long requestId = 
Long.parseLong(requestMetadata.get(MetadataKeys.REQUEST_ID));
+    String cid = requestMetadata.get(MetadataKeys.CORRELATION_ID);
+    if (cid == null) {
+      cid = Long.toString(requestId);
+    }
+    String workloadName = 
requestMetadata.getOrDefault(QueryOptionKey.WORKLOAD_NAME, 
Accounting.DEFAULT_WORKLOAD_NAME);
+    long timeoutMs = 
Long.parseLong(requestMetadata.get(QueryOptionKey.TIMEOUT_MS));

Review Comment:
   I think we need to protect this code for cases where timeout is not included 
or not a number



##########
pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java:
##########
@@ -18,680 +18,257 @@
  */
 package org.apache.pinot.spi.query;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
-import org.apache.pinot.spi.env.PinotConfiguration;
+import 
org.apache.pinot.spi.accounting.ThreadAccounting.DefaultThreadResourceUsageAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.exception.EarlyTerminationException;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.spi.executor.DecoratorExecutorService;
 import org.apache.pinot.spi.trace.LoggerConstants;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
 
-/**
- * The {@code QueryThreadContext} class is a thread-local context for storing 
common query-related information
- * associated to the current thread.
- *
- * <p>It is used to pass information between different layers of the query 
execution stack without changing the
- * method signatures. This is also used to populate the {@link MDC} context 
for logging.
- *
- * Use {@link #open(String)} to initialize the empty context. As any other 
{@link AutoCloseable} object, it should be
- * used within a try-with-resources block to ensure the context is properly 
closed and removed from the thread-local
- * storage.
- *
- * Sometimes it is necessary to copy the state of the {@link 
QueryThreadContext} from one thread to another. In this
- * case, use the {@link #createMemento()} method to capture the state of the 
{@link QueryThreadContext} in the current
- * thread and then use the {@link #open(Memento)} method to initialize the 
context in the target thread with the state
- * captured in the {@link Memento} object. The API may be a bit cumbersome, 
but it ensures that the state is always
- * copied between threads in a safe way and makes it impossible to use the 
{@link QueryThreadContext} from another
- * thread by mistake.
- *
- * It is guaranteed that all server and broker threads running SSE and MSE 
will have this context initialized as soon
- * as the request is received. Ingestion threads that use the query execution 
stack will also have this context
- * initialized.
- */
-public class QueryThreadContext {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryThreadContext.class);
-  private static final ThreadLocal<Instance> THREAD_LOCAL = new 
ThreadLocal<>();
-  public static volatile boolean _strictMode;
-  private static final FakeInstance FAKE_INSTANCE = new FakeInstance();
-
-  static {
-    // This is a hack to know if assertions are enabled or not
-    boolean assertEnabled = false;
-    //CHECKSTYLE:OFF
-    assert assertEnabled = true;
-    //CHECKSTYLE:ON
-    _strictMode = assertEnabled;
-  }
 
-  /**
-   * Private constructor to prevent instantiation.
-   *
-   * Use {@link #open(String)} to initialize the context instead.
-   */
-  private QueryThreadContext() {
-  }
+/// The [QueryThreadContext] class is a thread-local context for storing 
common query-related information associated to
+/// the current thread.
+///
+/// It is used to pass information between different layers of the query 
execution stack without changing the method
+/// signatures. This is also used to populate the [org.slf4j.MDC] context for 
logging.
+///
+/// Use [#open] to initialize the empty context. As any other [AutoCloseable] 
object, it should be used within a
+/// try-with-resources block to ensure the context is properly closed and 
removed from the thread-local storage and
+/// resource usage accountant.
+@JsonInclude(JsonInclude.Include.NON_NULL)

Review Comment:
   I don't think we want to json-serialize this class. We for sure don't want 
to deserialize it and using Jackson to serialize means we need to take care of 
the attributes we use here, as they should be serializable. For example here 
you are removing `_resourceUsageAccountant`.



##########
pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java:
##########
@@ -452,11 +451,9 @@ public String cancelQuery(
       @DefaultValue("3000") int timeoutMs,
       @ApiParam(value = "Return server responses for troubleshooting") 
@QueryParam("verbose") @DefaultValue("false")
       boolean verbose) {
-    try (QueryThreadContext.CloseableContext closeMe = 
QueryThreadContext.open(_instanceId)) {
+    try {
       Map<String, Integer> serverResponses = verbose ? new HashMap<>() : null;
       if (isClient) {
-        long reqId = _requestHandler.getRequestIdByClientId(id).orElse(-1L);
-        QueryThreadContext.setIds(reqId, id);

Review Comment:
   This is a con of this refactor. With the previous model we had the ability 
to partially create the thread local object. This means that any log printed 
after this line would include the CID, which is may be very useful.
   
   It is not a critial problem, but something that would be great if we find a 
way to keep



##########
pinot-spi/src/main/java/org/apache/pinot/spi/accounting/ThreadAccounting.java:
##########
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.accounting;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.utils.CommonConstants.Accounting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Global registration for thread accounting implementations.
+ */
+public class ThreadAccounting {
+  private ThreadAccounting() {
+  }
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ThreadAccounting.class);
+
+  /// This is the registration point for broker side 
[ThreadResourceUsageAccountant] implementations. Only one
+  /// accountant can be registered to avoid the overhead of polymorphic calls, 
and it must be registered before the
+  /// first call to [#getBrokerAccountant()]. Keep separate registry for 
broker and server accountant to allow different
+  /// implementations to be registered when both broker and server are in the 
same JVM.

Review Comment:
   How does it work in testing environments or quickstarts where we have more 
than one Pinot service (brokers, servers, etc)?
   
   Given we are going to rely on QueryThreadContext so much after this PR... 
why don't we go one extra step and create something like a 
_ServiceThreadContext_ that contain this kind of instance information? We can 
register it in the same way we register the QueryThreadContext and it could be 
used to substitute these static attributes we have been forced to use



##########
pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java:
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.query;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.exception.QueryException;
+import org.apache.pinot.spi.utils.CommonConstants.Accounting;
+import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
+import org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys;
+import 
org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys.TimeSeries;
+
+
+/// The context for query execution. It should be shared across all the 
threads executing the same query
+/// ([QueryThreadContext#getExecutionContext()] returns the same instance). It 
tracks the tasks (as [Future]) executing
+/// the query, and provides a way to terminate the query by cancelling all the 
tasks.
+@JsonInclude(JsonInclude.Include.NON_NULL)

Review Comment:
   Same as QueryThreadContext. I don't think using Jackson to serialize/print 
this class is a good idea. It means we need to take care of the 
attributes/properties we introduce to this class. For example here tasks are 
not designed to be serialized with Jackson. In this case it is fine because we 
don't have a getter for that attribute, but in case we add it for whatever 
reason, we would have the side effect of including these tasks into the 
serialized object.
   
   Instead I think having our own `toString` would be better.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryExecutionContext.java:
##########
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.query;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.google.common.annotations.VisibleForTesting;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+import javax.annotation.Nullable;
+import org.apache.pinot.spi.exception.QueryException;
+import org.apache.pinot.spi.utils.CommonConstants.Accounting;
+import 
org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
+import org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys;
+import 
org.apache.pinot.spi.utils.CommonConstants.Query.Request.MetadataKeys.TimeSeries;
+
+
+/// The context for query execution. It should be shared across all the 
threads executing the same query
+/// ([QueryThreadContext#getExecutionContext()] returns the same instance). It 
tracks the tasks (as [Future]) executing
+/// the query, and provides a way to terminate the query by cancelling all the 
tasks.
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class QueryExecutionContext {
+
+  public enum QueryType {
+    SSE,  // Single-stage engine
+    MSE,  // Multi-stage engine
+    TSE   // Time-series engine
+  }
+
+  private final QueryType _queryType;
+  private final long _requestId;
+  private final String _cid;
+  private final String _workloadName;
+  private final long _startTimeMs;
+  private final long _activeDeadlineMs;
+  private final long _passiveDeadlineMs;
+  private final String _brokerId;
+  private final String _instanceId;

Review Comment:
   I don't think most of this stuff belongs to the _execution_ context. I may 
be wrong, but given the termination methods and the name, I would assume 
_execution_ means the operators are being executed. Most of these attributes 
are set at parsing/scheduling time, right? I would expect them to be stored in 
QueryThreadContext



##########
pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java:
##########
@@ -18,680 +18,257 @@
  */
 package org.apache.pinot.spi.query;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
-import org.apache.pinot.spi.env.PinotConfiguration;
+import 
org.apache.pinot.spi.accounting.ThreadAccounting.DefaultThreadResourceUsageAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.exception.EarlyTerminationException;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.spi.executor.DecoratorExecutorService;
 import org.apache.pinot.spi.trace.LoggerConstants;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
 
-/**
- * The {@code QueryThreadContext} class is a thread-local context for storing 
common query-related information
- * associated to the current thread.
- *
- * <p>It is used to pass information between different layers of the query 
execution stack without changing the
- * method signatures. This is also used to populate the {@link MDC} context 
for logging.
- *
- * Use {@link #open(String)} to initialize the empty context. As any other 
{@link AutoCloseable} object, it should be
- * used within a try-with-resources block to ensure the context is properly 
closed and removed from the thread-local
- * storage.
- *
- * Sometimes it is necessary to copy the state of the {@link 
QueryThreadContext} from one thread to another. In this
- * case, use the {@link #createMemento()} method to capture the state of the 
{@link QueryThreadContext} in the current
- * thread and then use the {@link #open(Memento)} method to initialize the 
context in the target thread with the state
- * captured in the {@link Memento} object. The API may be a bit cumbersome, 
but it ensures that the state is always
- * copied between threads in a safe way and makes it impossible to use the 
{@link QueryThreadContext} from another
- * thread by mistake.
- *
- * It is guaranteed that all server and broker threads running SSE and MSE 
will have this context initialized as soon
- * as the request is received. Ingestion threads that use the query execution 
stack will also have this context
- * initialized.
- */
-public class QueryThreadContext {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryThreadContext.class);
-  private static final ThreadLocal<Instance> THREAD_LOCAL = new 
ThreadLocal<>();
-  public static volatile boolean _strictMode;
-  private static final FakeInstance FAKE_INSTANCE = new FakeInstance();
-
-  static {
-    // This is a hack to know if assertions are enabled or not
-    boolean assertEnabled = false;
-    //CHECKSTYLE:OFF
-    assert assertEnabled = true;
-    //CHECKSTYLE:ON
-    _strictMode = assertEnabled;
-  }
 
-  /**
-   * Private constructor to prevent instantiation.
-   *
-   * Use {@link #open(String)} to initialize the context instead.
-   */
-  private QueryThreadContext() {
-  }
+/// The [QueryThreadContext] class is a thread-local context for storing 
common query-related information associated to
+/// the current thread.
+///
+/// It is used to pass information between different layers of the query 
execution stack without changing the method
+/// signatures. This is also used to populate the [org.slf4j.MDC] context for 
logging.
+///
+/// Use [#open] to initialize the empty context. As any other [AutoCloseable] 
object, it should be used within a
+/// try-with-resources block to ensure the context is properly closed and 
removed from the thread-local storage and
+/// resource usage accountant.
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class QueryThreadContext implements AutoCloseable {
+  // Check if query should be terminated, and sample resource usage per 8192 
records
+  public static final int CHECK_TERMINATION_AND_SAMPLE_USAGE_RECORD_MASK = 
0x1FFF;
 
-  /**
-   * Sets the strict mode of the {@link QueryThreadContext} from the given 
configuration.
-   */
-  public static void onStartup(PinotConfiguration conf) {
-    String mode = 
conf.getProperty(CommonConstants.Query.CONFIG_OF_QUERY_CONTEXT_MODE);
-    if ("strict".equalsIgnoreCase(mode)) {
-      _strictMode = true;
-      return;
-    }
-    if (mode != null && !mode.isEmpty()) {
-      throw new IllegalArgumentException("Invalid value '" + mode + "' for "
-          + CommonConstants.Query.CONFIG_OF_QUERY_CONTEXT_MODE + ". Expected 
'strict' or empty");
+  private static final ThreadLocal<QueryThreadContext> THREAD_LOCAL = new 
ThreadLocal<>();
+
+  private final QueryExecutionContext _executionContext;
+  private final MseWorkerInfo _mseWorkerInfo;
+  private final ThreadResourceUsageAccountant _resourceUsageAccountant;
+
+  private QueryThreadContext(QueryExecutionContext executionContext, @Nullable 
MseWorkerInfo mseWorkerInfo,
+      ThreadResourceUsageAccountant resourceUsageAccountant) {
+    _executionContext = executionContext;
+    _mseWorkerInfo = mseWorkerInfo;
+    _resourceUsageAccountant = resourceUsageAccountant;
+    
LoggerConstants.REQUEST_ID_KEY.registerInMdc(Long.toString(executionContext.getRequestId()));
+    
LoggerConstants.CORRELATION_ID_KEY.registerInMdc(executionContext.getCid());
+    if (mseWorkerInfo != null) {
+      
LoggerConstants.STAGE_ID_KEY.registerInMdc(Integer.toString(mseWorkerInfo.getStageId()));
+      
LoggerConstants.WORKER_ID_KEY.registerInMdc(Integer.toString(mseWorkerInfo.getWorkerId()));
     }
   }
 
-  /**
-   * Returns {@code true} if the {@link QueryThreadContext} is in strict mode.
-   *
-   * In strict mode, if the {@link QueryThreadContext} is not initialized, an 
{@link IllegalStateException} will be
-   * thrown when setter and getter methods are used.
-   * In non-strict mode, a warning will be logged and the fake instance will 
be returned.
-   *
-   * @see #onStartup(PinotConfiguration)
-   */
-  public static boolean isStrictMode() {
-    return _strictMode;
+  public QueryExecutionContext getExecutionContext() {
+    return _executionContext;
   }
 
-  private static Instance get() {
-    Instance instance = THREAD_LOCAL.get();
-    if (instance == null) {
-      String errorMessage = "QueryThreadContext is not initialized";
-      if (_strictMode) {
-        LOGGER.error(errorMessage);
-        throw new IllegalStateException(errorMessage);
-      } else {
-        LOGGER.debug(errorMessage);
-        // in non-strict mode, return the fake instance
-        return FAKE_INSTANCE;
-      }
-    }
-    return instance;
+  @Nullable
+  public MseWorkerInfo getMseWorkerInfo() {
+    return _mseWorkerInfo;
   }
 
-  /**
-   * Returns {@code true} if the {@link QueryThreadContext} is initialized in 
the current thread.
-   *
-   * Initializing the context means that the {@link #open(String)} method was 
called and the returned object is not
-   * closed yet.
-   */
-  public static boolean isInitialized() {
-    return THREAD_LOCAL.get() != null;
+  @JsonIgnore
+  public ThreadResourceUsageAccountant getResourceUsageAccountant() {
+    return _resourceUsageAccountant;
   }
 
-  /**
-   * Captures the state of the {@link QueryThreadContext} in the current 
thread.
-   *
-   * This method is used to capture the state of the {@link 
QueryThreadContext} in the current thread so that it can be
-   * restored later in another thread.
-   *
-   * @return a {@link Memento} object that captures the state of the {@link 
QueryThreadContext}
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   * @see #open(Memento)
-   */
-  public static Memento createMemento() {
-    return new Memento(get());
+  private void sampleUsageInternal() {
+    _resourceUsageAccountant.sampleUsage();
   }
 
-  /**
-   * Initializes the {@link QueryThreadContext} with default values.
-   *
-   * This method will throw an {@link IllegalStateException} if the {@link 
QueryThreadContext} is already initialized.
-   * That indicates an error in the code. Older context must be closed before 
opening a new one.
-   *
-   * @return an {@link AutoCloseable} object that should be used within a 
try-with-resources block
-   * @throws IllegalStateException if the {@link QueryThreadContext} is 
already initialized.
-   */
-  @VisibleForTesting
-  public static CloseableContext open() {
-    return open("unknown");
+  private void checkTerminationInternal(Supplier<String> scopeSupplier) {
+    checkTerminationInternal(scopeSupplier, 
_executionContext.getActiveDeadlineMs());
   }
 
-  public static CloseableContext open(String instanceId) {
-    CloseableContext open = open((Memento) null);
-    get()._instanceId = instanceId;
-    return open;
+  private void checkTerminationInternal(Supplier<String> scopeSupplier, long 
deadlineMs) {
+    QueryException terminateException = 
_executionContext.getTerminateException();
+    if (terminateException != null) {
+      throw terminateException;
+    }
+    if (Thread.interrupted()) {
+      throw new EarlyTerminationException("Interrupted on: " + 
scopeSupplier.get());
+    }
+    if (System.currentTimeMillis() >= deadlineMs) {
+      throw QueryErrorCode.EXECUTION_TIMEOUT.asException("Timing out on: " + 
scopeSupplier.get());
+    }
   }
 
-  public static CloseableContext openFromRequestMetadata(String instanceId, 
Map<String, String> requestMetadata) {
-    CloseableContext open = open(instanceId);
-    String cid = 
requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.CORRELATION_ID);
-    long requestId = 
Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
-    if (cid == null) {
-      cid = Long.toString(requestId);
+  /// Closes the {@link QueryThreadContext} and removes it from the 
thread-local storage and MDC context.
+  @Override
+  public void close() {
+    _resourceUsageAccountant.clear();
+    THREAD_LOCAL.remove();
+    LoggerConstants.REQUEST_ID_KEY.unregisterFromMdc();
+    LoggerConstants.CORRELATION_ID_KEY.unregisterFromMdc();
+    if (_mseWorkerInfo != null) {
+      LoggerConstants.STAGE_ID_KEY.unregisterFromMdc();
+      LoggerConstants.WORKER_ID_KEY.unregisterFromMdc();
     }
-    QueryThreadContext.setIds(requestId, cid);
-    long timeoutMs = 
Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
-    long extraPassiveTimeoutMs = Long.parseLong(requestMetadata.getOrDefault(
-        
CommonConstants.Broker.Request.QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS, "0"));
-    long startTimeMs = System.currentTimeMillis();
-    QueryThreadContext.setStartTimeMs(startTimeMs);
-    QueryThreadContext.setActiveDeadlineMs(startTimeMs + timeoutMs);
-    QueryThreadContext.setPassiveDeadlineMs(startTimeMs + timeoutMs + 
extraPassiveTimeoutMs);
-
-    return open;
   }
 
-  /**
-   * Initializes the {@link QueryThreadContext} with the state captured in the 
given {@link Memento} object, if any.
-   *
-   * This method will throw an {@link IllegalStateException} if the {@link 
QueryThreadContext} is already initialized.
-   * That indicates an error in the code. Older context must be closed before 
opening a new one.
-   *
-   * Values that were set in the {@link Memento} object will be set in the 
{@link QueryThreadContext} and therefore
-   * they couldn't be set again in the current thread (at least until the 
returned {@link AutoCloseable} is closed).
-   *
-   * @param memento the {@link Memento} object to capture the state from
-   *                (if {@code null}, the context will be initialized with 
default values)
-   * @return an {@link AutoCloseable} object that should be used within a 
try-with-resources block
-   * @throws IllegalStateException if the {@link QueryThreadContext} is 
already initialized.
-   */
-  public static CloseableContext open(@Nullable Memento memento) {
-    if (THREAD_LOCAL.get() != null) {
-      String errorMessage = "QueryThreadContext is already initialized";
-      if (_strictMode) {
-        LOGGER.error(errorMessage);
-        throw new IllegalStateException("QueryThreadContext is already 
initialized");
-      } else {
-        LOGGER.debug(errorMessage);
-        return FAKE_INSTANCE;
-      }
-    }
+  /// Opens a new [QueryThreadContext] for the current thread and add it to 
the thread-local storage.
+  public static QueryThreadContext open(QueryExecutionContext executionContext,
+      ThreadResourceUsageAccountant accountant) {
+    return open(executionContext, null, accountant);
+  }
 
-    Instance context = new Instance();
-    if (memento != null) {
-      context.setStartTimeMs(memento._startTimeMs);
-      context.setActiveDeadlineMs(memento._activeDeadlineMs);
-      context.setPassiveDeadlineMs(memento._passiveDeadlineMs);
-      context.setBrokerId(memento._brokerId);
-      context.setRequestId(memento._requestId);
-      context.setCid(memento._cid);
-      context.setSql(memento._sql);
-      context.setQueryEngine(memento._queryEngine);
-      context.setInstanceId(memento._instanceId);
-    }
+  /// Opens a new [QueryThreadContext] for the current thread and add it to 
the thread-local storage.
+  public static QueryThreadContext open(QueryExecutionContext 
executionContext, @Nullable MseWorkerInfo mseWorkerInfo,
+      ThreadResourceUsageAccountant accountant) {
+    QueryThreadContext threadContext = new 
QueryThreadContext(executionContext, mseWorkerInfo, accountant);
+    THREAD_LOCAL.set(threadContext);
+    accountant.setupTask(threadContext);
+    return threadContext;
+  }
 
-    THREAD_LOCAL.set(context);
+  @VisibleForTesting
+  public static QueryThreadContext openForSseTest() {
+    return open(QueryExecutionContext.forSseTest(), 
DefaultThreadResourceUsageAccountant.INSTANCE);
+  }
 
-    return context;
+  @VisibleForTesting
+  public static QueryThreadContext openForMseTest() {
+    return open(QueryExecutionContext.forMseTest(), new MseWorkerInfo(0, 0),
+        DefaultThreadResourceUsageAccountant.INSTANCE);
   }
 
-  /**
-   * Returns a new {@link ExecutorService} whose tasks will be executed with 
the {@link QueryThreadContext} initialized
-   * with the state of the thread submitting the tasks.
-   *
-   * @param executorService the {@link ExecutorService} to decorate
-   */
-  public static ExecutorService contextAwareExecutorService(ExecutorService 
executorService) {
-    return contextAwareExecutorService(executorService, 
QueryThreadContext::createMemento);
+  /// Returns the [QueryThreadContext] for the current thread.
+  public static QueryThreadContext get() {
+    QueryThreadContext threadContext = THREAD_LOCAL.get();
+    assert threadContext != null;
+    return threadContext;
   }
 
-  /**
-   * Returns a new {@link ExecutorService} whose tasks will be executed with 
the {@link QueryThreadContext} initialized
-   * with the state captured in the given {@link Memento} object.
-   *
-   * @param executorService the {@link ExecutorService} to decorate
-   * @param mementoSupplier a supplier that provides the {@link Memento} 
object to capture the state from
-   */
-  public static ExecutorService contextAwareExecutorService(
-      ExecutorService executorService,
-      Supplier<Memento> mementoSupplier) {
-    return new DecoratorExecutorService(executorService) {
+  /// Returns the [QueryThreadContext] for the current thread, or `null` if 
not available.
+  @Nullable
+  public static QueryThreadContext getIfAvailable() {
+    return THREAD_LOCAL.get();
+  }
+
+  /// Returns a new [ExecutorService] whose tasks will be executed with the 
[QueryThreadContext] initialized with the
+  /// state of the thread submitting the tasks.
+  public static ExecutorService contextAwareExecutorService(ExecutorService 
executorService) {
+    return new DecoratorExecutorService(executorService, future -> 
get().getExecutionContext().addTask(future)) {
       @Override
       protected <T> Callable<T> decorate(Callable<T> task) {
-        Memento memento = mementoSupplier.get();
+        QueryThreadContext parentThreadContext = get();
         return () -> {
-          try (CloseableContext ignored = open(memento)) {
+          try (QueryThreadContext ignore = 
open(parentThreadContext._executionContext,
+              parentThreadContext._mseWorkerInfo, 
parentThreadContext._resourceUsageAccountant)) {
             return task.call();
           }
         };
       }
 
       @Override
       protected Runnable decorate(Runnable task) {
-        Memento memento = mementoSupplier.get();
+        QueryThreadContext parentThreadContext = get();
         return () -> {
-          try (CloseableContext ignored = open(memento)) {
+          try (QueryThreadContext ignore = 
open(parentThreadContext._executionContext,
+              parentThreadContext._mseWorkerInfo, 
parentThreadContext._resourceUsageAccountant)) {
             task.run();
           }
         };
       }
     };
   }
 
-  /**
-   * Returns the start time of the query in milliseconds.
-   *
-   * The default value of 0 means the start time is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static long getStartTimeMs() {
-    return get().getStartTimeMs();
-  }
-
-  /**
-   * Sets the start time of the query in milliseconds since epoch.
-   *
-   * The start time can only be set once.
-   * @throws IllegalStateException if start time is already set or if the 
{@link QueryThreadContext} is not initialized
-   */
-  public static void setStartTimeMs(long startTimeMs) {
-    get().setStartTimeMs(startTimeMs);
-  }
-
-  /**
-   * Returns the active deadline time of the query in milliseconds since epoch.
-   *
-   * The default value of 0 means the deadline is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static long getActiveDeadlineMs() {
-    return get().getActiveDeadlineMs();
-  }
-
-  /**
-   * Sets the active deadline time of the query in milliseconds since epoch.
-   *
-   * The deadline can only be set once.
-   * @throws IllegalStateException if deadline is already set or if the {@link 
QueryThreadContext} is not initialized
-   */
-  public static void setActiveDeadlineMs(long activeDeadlineMs) {
-    get().setActiveDeadlineMs(activeDeadlineMs);
-  }
-
-  /**
-   * Returns the passive deadline time of the query in milliseconds since 
epoch.
-   *
-   * The default value of 0 means the deadline is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static long getPassiveDeadlineMs() {
-    return get().getPassiveDeadlineMs();
-  }
-
-  /**
-   * Sets the passive deadline time of the query in milliseconds since epoch.
-   *
-   * The deadline can only be set once.
-   * @throws IllegalStateException if deadline is already set or if the {@link 
QueryThreadContext} is not initialized
-   */
-  public static void setPassiveDeadlineMs(long passiveDeadlineMs) {
-    get().setPassiveDeadlineMs(passiveDeadlineMs);
-  }
-
-  /**
-   * Returns the timeout of the query in milliseconds.
-   *
-   * The default value of 0 means the timeout is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static String getBrokerId() {
-    return get().getBrokerId();
-  }
-
-  /**
-   * Sets the broker id of the query.
-   *
-   * The broker id can only be set once.
-   * @throws IllegalStateException if broker id is already set or if the 
{@link QueryThreadContext} is not initialized
-   */
-  public static void setBrokerId(String brokerId) {
-    get().setBrokerId(brokerId);
-  }
-
-  /**
-   * Returns the request id of the query.
-   *
-   * The request id is used to identify query phases across different systems.
-   * Contrary to the correlation id, a single logical query can have multiple 
request ids.
-   * This is because the request id is changed at different stages of the 
query (e.g. real-time and offline parts of
-   * the query or different leaf operations in MSE).
-   *
-   * Also remember that neither request nor correlation id are guaranteed to 
be unique.
-   *
-   * The default value of 0 means the request id is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static long getRequestId() {
-    return get().getRequestId();
-  }
-
-  /**
-   * Returns the correlation id of the query.
-   *
-   * Correlation id is used to track queries across different systems.
-   * This id can be supplied by the client or generated by the system (usually 
the broker). It is not guaranteed to be
-   * unique. Users can use the same correlation id for multiple queries to 
track a single logical workflow on their
-   * side.
-   *
-   * What is guaranteed is that the correlation id will be consistent across 
all the phases of the query. Remember that
-   * it is not the case for the request id, which can change at different 
stages of the query (e.g. real-time and
-   * offline parts of the query or different leaf operations in MSE).
-   *
-   * Also remember that neither request nor correlation id are guaranteed to 
be unique.
-   *
-   * The default value of {@code null} means the correlation id is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static String getCid() {
-    return get().getCid();
-  }
-
-  /**
-   * Sets the request id and correlation id of the query.
-   *
-   * Both requests can only be set once.
-   *
-   * Setting this value also registers it in the MDC context with the key 
{@link LoggerConstants#REQUEST_ID_KEY} and
-   * {@link LoggerConstants#CORRELATION_ID_KEY}.
-   * @throws IllegalStateException if any of the ids are already set or if the 
{@link QueryThreadContext} is not
-   * initialized
-   */
-  public static void setIds(long requestId, String cid) {
-    Instance instance = get();
-    instance.setRequestId(requestId);
-    instance.setCid(cid);
-  }
-
-  /**
-   * Returns the SQL of the query.
-   *
-   * The default value of {@code null} means the SQL is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static String getSql() {
-    return get().getSql();
-  }
-
-  /**
-   * Sets the SQL of the query.
-   *
-   * The SQL can only be set once.
-   * @throws IllegalStateException if sql is already set or if the {@link 
QueryThreadContext} is not initialized
-   */
-  public static void setSql(String sql) {
-    get().setSql(sql);
-  }
-
-  /**
-   * Returns the query engine that is being used.
-   *
-   * The default value of {@code null} means the query engine is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static String getQueryEngine() {
-    return get().getQueryEngine();
-  }
-
-  /**
-   * Sets the query engine that is being used.
-   *
-   * The query engine can only be set once.
-   * @throws IllegalStateException if query engine is already set or if the 
{@link QueryThreadContext} is not
-   * initialized
-   */
-  public static void setQueryEngine(String queryEngine) {
-    get().setQueryEngine(queryEngine);
-  }
-
-  /**
-   * Returns the instanceid of the query.
-   *
-   * This is usually the id that identifies the server, broker, controller, 
etc.
-   *
-   * The default value of {@code null} means the instanceid is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static String getInstanceId() {
-    return get().getInstanceId();
-  }
-
-  /**
-   * This private class stores the actual state of the {@link 
QueryThreadContext} in a safe way.
-   *
-   * As part of the paranoid design of the {@link QueryThreadContext}, this 
class is used to store the state of the
-   * {@link QueryThreadContext} in a way that is safe to copy between threads.
-   * Callers never have access to this class directly, but only through the 
{@link QueryThreadContext} static methods.
-   * This is designed this way to ensure that the state is always copied 
between threads in a safe way.
-   * Given it is impossible for a caller to access this class directly, it can 
never be copied from one thread to
-   * another by mistake.
-   * This forces to use the {@link Memento} class to capture the state of the 
{@link QueryThreadContext} in the current
-   * thread and then use the {@link #open(Memento)} method to initialize the 
context in the target thread with the state
-   * captured in the {@link Memento} object.
-   */
-  private static class Instance implements CloseableContext {
-    private long _startTimeMs;
-    private long _activeDeadlineMs;
-    private long _passiveDeadlineMs;
-    private String _brokerId;
-    private long _requestId;
-    private String _cid;
-    private String _sql;
-    private String _queryEngine;
-    private String _instanceId;
-
-    public long getStartTimeMs() {
-      return _startTimeMs;
-    }
-
-    public void setStartTimeMs(long startTimeMs) {
-      Preconditions.checkState(getStartTimeMs() == 0, "Start time already set 
to %s, cannot set again",
-          getStartTimeMs());
-      _startTimeMs = startTimeMs;
-    }
-
-    public long getActiveDeadlineMs() {
-      return _activeDeadlineMs;
-    }
-
-    public void setActiveDeadlineMs(long activeDeadlineMs) {
-      Preconditions.checkState(getActiveDeadlineMs() == 0, "Deadline already 
set to %s, cannot set again",
-          getActiveDeadlineMs());
-      _activeDeadlineMs = activeDeadlineMs;
-    }
-
-    public long getPassiveDeadlineMs() {
-      return _passiveDeadlineMs;
-    }
-
-    public void setPassiveDeadlineMs(long passiveDeadlineMs) {
-      Preconditions.checkState(getPassiveDeadlineMs() == 0, "Passive deadline 
already set to %s, cannot set again",
-          getPassiveDeadlineMs());
-      _passiveDeadlineMs = passiveDeadlineMs;
-    }
-
-    public String getBrokerId() {
-      return _brokerId;
-    }
-
-    public void setBrokerId(String brokerId) {
-      Preconditions.checkState(getBrokerId() == null, "Broker id already set 
to %s, cannot set again",
-          getBrokerId());
-      _brokerId = brokerId;
-    }
-
-    public long getRequestId() {
-      return _requestId;
-    }
-
-    public void setRequestId(long requestId) {
-      Preconditions.checkState(getRequestId() == 0, "Request id already set to 
%s, cannot set again", getRequestId());
-      LoggerConstants.REQUEST_ID_KEY.registerInMdc(Long.toString(requestId));
-      _requestId = requestId;
-    }
-
-    public String getCid() {
-      return _cid;
-    }
-
-    public void setCid(String cid) {
-      Preconditions.checkState(getCid() == null, "Correlation id already set 
to %s, cannot set again",
-          getCid());
-      LoggerConstants.CORRELATION_ID_KEY.registerInMdc(cid);
-      _cid = cid;
-    }
-
-    public String getSql() {
-      return _sql;
-    }
-
-    public void setSql(String sql) {
-      Preconditions.checkState(getSql() == null, "SQL already set to %s, 
cannot set again",
-          getSql());
-      _sql = sql;
-    }
-
-    public String getQueryEngine() {
-      return _queryEngine;
-    }
-
-    public void setQueryEngine(String queryType) {
-      Preconditions.checkState(getQueryEngine() == null, "Query type already 
set to %s, cannot set again",
-          getQueryEngine());
-      _queryEngine = queryType;
-    }
-
-    public void setInstanceId(String instanceId) {
-      Preconditions.checkState(_instanceId == null, "Service id already set to 
%s, cannot set again",
-          getInstanceId());
-      _instanceId = instanceId;
-    }
-
-    public String getInstanceId() {
-      return _instanceId;
-    }
-
-    @Override
-    public String toString() {
-      try {
-        return JsonUtils.objectToString(this);
-      } catch (JsonProcessingException e) {
-        return "Failed to convert QueryThreadContext to JSON: " + 
e.getMessage();
-      }
-    }
-
-    /**
-     * Closes the {@link QueryThreadContext} and removes it from the 
thread-local storage and MDC context.
-     */
-    @Override
-    public void close() {
-      THREAD_LOCAL.remove();
-      if (_requestId != 0) {
-        LoggerConstants.REQUEST_ID_KEY.unregisterFromMdc();
-      }
-      if (_cid != null) {
-        LoggerConstants.CORRELATION_ID_KEY.unregisterFromMdc();
-      }
+  /// Checks if the query should be terminated.
+  public static void checkTermination(Supplier<String> scopeSupplier) {
+    QueryThreadContext threadContext = THREAD_LOCAL.get();
+    // NOTE: In production code, threadContext should never be null. It might 
be null in tests when QueryThreadContext
+    //       is not set up.
+    if (threadContext != null) {
+      threadContext.checkTerminationInternal(scopeSupplier);
     }
   }
 
-  private static class FakeInstance extends Instance {
-    @Override
-    public void setStartTimeMs(long startTimeMs) {
-      LOGGER.debug("Setting start time to {} in a fake context", startTimeMs);
-    }
-
-    @Override
-    public void setActiveDeadlineMs(long activeDeadlineMs) {
-      LOGGER.debug("Setting active deadline to {} in a fake context", 
activeDeadlineMs);
+  /// Checks if the query should be terminated.
+  public static void checkTermination(Supplier<String> scopeSupplier, long 
deadlineMs) {
+    QueryThreadContext threadContext = THREAD_LOCAL.get();
+    // NOTE: In production code, threadContext should never be null. It might 
be null in tests when QueryThreadContext
+    //       is not set up.
+    if (threadContext != null) {
+      threadContext.checkTerminationInternal(scopeSupplier, deadlineMs);
     }
+  }
 
-    @Override
-    public void setPassiveDeadlineMs(long passiveDeadlineMs) {
-      LOGGER.debug("Setting passive deadline to {} in a fake context", 
passiveDeadlineMs);
+  /// Samples the resource usage for the current thread and account it to the 
query.
+  public static void sampleUsage() {
+    QueryThreadContext threadContext = THREAD_LOCAL.get();
+    // NOTE: In production code, threadContext should never be null. It might 
be null in tests when QueryThreadContext
+    //       is not set up.
+    if (threadContext != null) {
+    threadContext.sampleUsageInternal();

Review Comment:
   nit: tabulation



##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java:
##########
@@ -891,12 +894,10 @@ private CompileResult compileRequest(long requestId, 
String query, SqlNodeAndOpt
       if (ParserUtils.canCompileWithMultiStageEngine(query, database, 
_tableCache)) {
         return new CompileResult(new 
BrokerResponseNative(QueryErrorCode.SQL_PARSING,
             "It seems that the query is only supported by the multi-stage 
query engine, please retry the query "
-                + "using "
-                + "the multi-stage query engine "
+                + "using " + "the multi-stage query engine "

Review Comment:
   nit: format



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to