Jackie-Jiang commented on code in PR #16728:
URL: https://github.com/apache/pinot/pull/16728#discussion_r2350080965


##########
pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java:
##########
@@ -18,680 +18,257 @@
  */
 package org.apache.pinot.spi.query;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Supplier;
-import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
 import javax.annotation.Nullable;
-import org.apache.pinot.spi.env.PinotConfiguration;
+import 
org.apache.pinot.spi.accounting.ThreadAccounting.DefaultThreadResourceUsageAccountant;
+import org.apache.pinot.spi.accounting.ThreadResourceUsageAccountant;
+import org.apache.pinot.spi.exception.EarlyTerminationException;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.exception.QueryException;
 import org.apache.pinot.spi.executor.DecoratorExecutorService;
 import org.apache.pinot.spi.trace.LoggerConstants;
-import org.apache.pinot.spi.utils.CommonConstants;
-import org.apache.pinot.spi.utils.JsonUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
 
-/**
- * The {@code QueryThreadContext} class is a thread-local context for storing 
common query-related information
- * associated to the current thread.
- *
- * <p>It is used to pass information between different layers of the query 
execution stack without changing the
- * method signatures. This is also used to populate the {@link MDC} context 
for logging.
- *
- * Use {@link #open(String)} to initialize the empty context. As any other 
{@link AutoCloseable} object, it should be
- * used within a try-with-resources block to ensure the context is properly 
closed and removed from the thread-local
- * storage.
- *
- * Sometimes it is necessary to copy the state of the {@link 
QueryThreadContext} from one thread to another. In this
- * case, use the {@link #createMemento()} method to capture the state of the 
{@link QueryThreadContext} in the current
- * thread and then use the {@link #open(Memento)} method to initialize the 
context in the target thread with the state
- * captured in the {@link Memento} object. The API may be a bit cumbersome, 
but it ensures that the state is always
- * copied between threads in a safe way and makes it impossible to use the 
{@link QueryThreadContext} from another
- * thread by mistake.
- *
- * It is guaranteed that all server and broker threads running SSE and MSE 
will have this context initialized as soon
- * as the request is received. Ingestion threads that use the query execution 
stack will also have this context
- * initialized.
- */
-public class QueryThreadContext {
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryThreadContext.class);
-  private static final ThreadLocal<Instance> THREAD_LOCAL = new 
ThreadLocal<>();
-  public static volatile boolean _strictMode;
-  private static final FakeInstance FAKE_INSTANCE = new FakeInstance();
-
-  static {
-    // This is a hack to know if assertions are enabled or not
-    boolean assertEnabled = false;
-    //CHECKSTYLE:OFF
-    assert assertEnabled = true;
-    //CHECKSTYLE:ON
-    _strictMode = assertEnabled;
-  }
 
-  /**
-   * Private constructor to prevent instantiation.
-   *
-   * Use {@link #open(String)} to initialize the context instead.
-   */
-  private QueryThreadContext() {
-  }
+/// The [QueryThreadContext] class is a thread-local context for storing 
common query-related information associated to
+/// the current thread.
+///
+/// It is used to pass information between different layers of the query 
execution stack without changing the method
+/// signatures. This is also used to populate the [org.slf4j.MDC] context for 
logging.
+///
+/// Use [#open] to initialize the empty context. As any other [AutoCloseable] 
object, it should be used within a
+/// try-with-resources block to ensure the context is properly closed and 
removed from the thread-local storage and
+/// resource usage accountant.
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class QueryThreadContext implements AutoCloseable {
+  // Check if query should be terminated, and sample resource usage per 8192 
records
+  public static final int CHECK_TERMINATION_AND_SAMPLE_USAGE_RECORD_MASK = 
0x1FFF;
 
-  /**
-   * Sets the strict mode of the {@link QueryThreadContext} from the given 
configuration.
-   */
-  public static void onStartup(PinotConfiguration conf) {
-    String mode = 
conf.getProperty(CommonConstants.Query.CONFIG_OF_QUERY_CONTEXT_MODE);
-    if ("strict".equalsIgnoreCase(mode)) {
-      _strictMode = true;
-      return;
-    }
-    if (mode != null && !mode.isEmpty()) {
-      throw new IllegalArgumentException("Invalid value '" + mode + "' for "
-          + CommonConstants.Query.CONFIG_OF_QUERY_CONTEXT_MODE + ". Expected 
'strict' or empty");
+  private static final ThreadLocal<QueryThreadContext> THREAD_LOCAL = new 
ThreadLocal<>();
+
+  private final QueryExecutionContext _executionContext;
+  private final MseWorkerInfo _mseWorkerInfo;
+  private final ThreadResourceUsageAccountant _resourceUsageAccountant;
+
+  private QueryThreadContext(QueryExecutionContext executionContext, @Nullable 
MseWorkerInfo mseWorkerInfo,
+      ThreadResourceUsageAccountant resourceUsageAccountant) {
+    _executionContext = executionContext;
+    _mseWorkerInfo = mseWorkerInfo;
+    _resourceUsageAccountant = resourceUsageAccountant;
+    
LoggerConstants.REQUEST_ID_KEY.registerInMdc(Long.toString(executionContext.getRequestId()));
+    
LoggerConstants.CORRELATION_ID_KEY.registerInMdc(executionContext.getCid());
+    if (mseWorkerInfo != null) {
+      
LoggerConstants.STAGE_ID_KEY.registerInMdc(Integer.toString(mseWorkerInfo.getStageId()));
+      
LoggerConstants.WORKER_ID_KEY.registerInMdc(Integer.toString(mseWorkerInfo.getWorkerId()));
     }
   }
 
-  /**
-   * Returns {@code true} if the {@link QueryThreadContext} is in strict mode.
-   *
-   * In strict mode, if the {@link QueryThreadContext} is not initialized, an 
{@link IllegalStateException} will be
-   * thrown when setter and getter methods are used.
-   * In non-strict mode, a warning will be logged and the fake instance will 
be returned.
-   *
-   * @see #onStartup(PinotConfiguration)
-   */
-  public static boolean isStrictMode() {
-    return _strictMode;
+  public QueryExecutionContext getExecutionContext() {
+    return _executionContext;
   }
 
-  private static Instance get() {
-    Instance instance = THREAD_LOCAL.get();
-    if (instance == null) {
-      String errorMessage = "QueryThreadContext is not initialized";
-      if (_strictMode) {
-        LOGGER.error(errorMessage);
-        throw new IllegalStateException(errorMessage);
-      } else {
-        LOGGER.debug(errorMessage);
-        // in non-strict mode, return the fake instance
-        return FAKE_INSTANCE;
-      }
-    }
-    return instance;
+  @Nullable
+  public MseWorkerInfo getMseWorkerInfo() {
+    return _mseWorkerInfo;
   }
 
-  /**
-   * Returns {@code true} if the {@link QueryThreadContext} is initialized in 
the current thread.
-   *
-   * Initializing the context means that the {@link #open(String)} method was 
called and the returned object is not
-   * closed yet.
-   */
-  public static boolean isInitialized() {
-    return THREAD_LOCAL.get() != null;
+  @JsonIgnore
+  public ThreadResourceUsageAccountant getResourceUsageAccountant() {
+    return _resourceUsageAccountant;
   }
 
-  /**
-   * Captures the state of the {@link QueryThreadContext} in the current 
thread.
-   *
-   * This method is used to capture the state of the {@link 
QueryThreadContext} in the current thread so that it can be
-   * restored later in another thread.
-   *
-   * @return a {@link Memento} object that captures the state of the {@link 
QueryThreadContext}
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   * @see #open(Memento)
-   */
-  public static Memento createMemento() {
-    return new Memento(get());
+  private void sampleUsageInternal() {
+    _resourceUsageAccountant.sampleUsage();
   }
 
-  /**
-   * Initializes the {@link QueryThreadContext} with default values.
-   *
-   * This method will throw an {@link IllegalStateException} if the {@link 
QueryThreadContext} is already initialized.
-   * That indicates an error in the code. Older context must be closed before 
opening a new one.
-   *
-   * @return an {@link AutoCloseable} object that should be used within a 
try-with-resources block
-   * @throws IllegalStateException if the {@link QueryThreadContext} is 
already initialized.
-   */
-  @VisibleForTesting
-  public static CloseableContext open() {
-    return open("unknown");
+  private void checkTerminationInternal(Supplier<String> scopeSupplier) {
+    checkTerminationInternal(scopeSupplier, 
_executionContext.getActiveDeadlineMs());
   }
 
-  public static CloseableContext open(String instanceId) {
-    CloseableContext open = open((Memento) null);
-    get()._instanceId = instanceId;
-    return open;
+  private void checkTerminationInternal(Supplier<String> scopeSupplier, long 
deadlineMs) {
+    QueryException terminateException = 
_executionContext.getTerminateException();
+    if (terminateException != null) {
+      throw terminateException;
+    }
+    if (Thread.interrupted()) {
+      throw new EarlyTerminationException("Interrupted on: " + 
scopeSupplier.get());
+    }
+    if (System.currentTimeMillis() >= deadlineMs) {
+      throw QueryErrorCode.EXECUTION_TIMEOUT.asException("Timing out on: " + 
scopeSupplier.get());
+    }
   }
 
-  public static CloseableContext openFromRequestMetadata(String instanceId, 
Map<String, String> requestMetadata) {
-    CloseableContext open = open(instanceId);
-    String cid = 
requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.CORRELATION_ID);
-    long requestId = 
Long.parseLong(requestMetadata.get(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID));
-    if (cid == null) {
-      cid = Long.toString(requestId);
+  /// Closes the {@link QueryThreadContext} and removes it from the 
thread-local storage and MDC context.
+  @Override
+  public void close() {
+    _resourceUsageAccountant.clear();
+    THREAD_LOCAL.remove();
+    LoggerConstants.REQUEST_ID_KEY.unregisterFromMdc();
+    LoggerConstants.CORRELATION_ID_KEY.unregisterFromMdc();
+    if (_mseWorkerInfo != null) {
+      LoggerConstants.STAGE_ID_KEY.unregisterFromMdc();
+      LoggerConstants.WORKER_ID_KEY.unregisterFromMdc();
     }
-    QueryThreadContext.setIds(requestId, cid);
-    long timeoutMs = 
Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
-    long extraPassiveTimeoutMs = Long.parseLong(requestMetadata.getOrDefault(
-        
CommonConstants.Broker.Request.QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS, "0"));
-    long startTimeMs = System.currentTimeMillis();
-    QueryThreadContext.setStartTimeMs(startTimeMs);
-    QueryThreadContext.setActiveDeadlineMs(startTimeMs + timeoutMs);
-    QueryThreadContext.setPassiveDeadlineMs(startTimeMs + timeoutMs + 
extraPassiveTimeoutMs);
-
-    return open;
   }
 
-  /**
-   * Initializes the {@link QueryThreadContext} with the state captured in the 
given {@link Memento} object, if any.
-   *
-   * This method will throw an {@link IllegalStateException} if the {@link 
QueryThreadContext} is already initialized.
-   * That indicates an error in the code. Older context must be closed before 
opening a new one.
-   *
-   * Values that were set in the {@link Memento} object will be set in the 
{@link QueryThreadContext} and therefore
-   * they couldn't be set again in the current thread (at least until the 
returned {@link AutoCloseable} is closed).
-   *
-   * @param memento the {@link Memento} object to capture the state from
-   *                (if {@code null}, the context will be initialized with 
default values)
-   * @return an {@link AutoCloseable} object that should be used within a 
try-with-resources block
-   * @throws IllegalStateException if the {@link QueryThreadContext} is 
already initialized.
-   */
-  public static CloseableContext open(@Nullable Memento memento) {
-    if (THREAD_LOCAL.get() != null) {
-      String errorMessage = "QueryThreadContext is already initialized";
-      if (_strictMode) {
-        LOGGER.error(errorMessage);
-        throw new IllegalStateException("QueryThreadContext is already 
initialized");
-      } else {
-        LOGGER.debug(errorMessage);
-        return FAKE_INSTANCE;
-      }
-    }
+  /// Opens a new [QueryThreadContext] for the current thread and add it to 
the thread-local storage.
+  public static QueryThreadContext open(QueryExecutionContext executionContext,
+      ThreadResourceUsageAccountant accountant) {
+    return open(executionContext, null, accountant);
+  }
 
-    Instance context = new Instance();
-    if (memento != null) {
-      context.setStartTimeMs(memento._startTimeMs);
-      context.setActiveDeadlineMs(memento._activeDeadlineMs);
-      context.setPassiveDeadlineMs(memento._passiveDeadlineMs);
-      context.setBrokerId(memento._brokerId);
-      context.setRequestId(memento._requestId);
-      context.setCid(memento._cid);
-      context.setSql(memento._sql);
-      context.setQueryEngine(memento._queryEngine);
-      context.setInstanceId(memento._instanceId);
-    }
+  /// Opens a new [QueryThreadContext] for the current thread and add it to 
the thread-local storage.
+  public static QueryThreadContext open(QueryExecutionContext 
executionContext, @Nullable MseWorkerInfo mseWorkerInfo,
+      ThreadResourceUsageAccountant accountant) {
+    QueryThreadContext threadContext = new 
QueryThreadContext(executionContext, mseWorkerInfo, accountant);
+    THREAD_LOCAL.set(threadContext);
+    accountant.setupTask(threadContext);
+    return threadContext;
+  }
 
-    THREAD_LOCAL.set(context);
+  @VisibleForTesting
+  public static QueryThreadContext openForSseTest() {
+    return open(QueryExecutionContext.forSseTest(), 
DefaultThreadResourceUsageAccountant.INSTANCE);
+  }
 
-    return context;
+  @VisibleForTesting
+  public static QueryThreadContext openForMseTest() {
+    return open(QueryExecutionContext.forMseTest(), new MseWorkerInfo(0, 0),
+        DefaultThreadResourceUsageAccountant.INSTANCE);
   }
 
-  /**
-   * Returns a new {@link ExecutorService} whose tasks will be executed with 
the {@link QueryThreadContext} initialized
-   * with the state of the thread submitting the tasks.
-   *
-   * @param executorService the {@link ExecutorService} to decorate
-   */
-  public static ExecutorService contextAwareExecutorService(ExecutorService 
executorService) {
-    return contextAwareExecutorService(executorService, 
QueryThreadContext::createMemento);
+  /// Returns the [QueryThreadContext] for the current thread.
+  public static QueryThreadContext get() {
+    QueryThreadContext threadContext = THREAD_LOCAL.get();
+    assert threadContext != null;
+    return threadContext;
   }
 
-  /**
-   * Returns a new {@link ExecutorService} whose tasks will be executed with 
the {@link QueryThreadContext} initialized
-   * with the state captured in the given {@link Memento} object.
-   *
-   * @param executorService the {@link ExecutorService} to decorate
-   * @param mementoSupplier a supplier that provides the {@link Memento} 
object to capture the state from
-   */
-  public static ExecutorService contextAwareExecutorService(
-      ExecutorService executorService,
-      Supplier<Memento> mementoSupplier) {
-    return new DecoratorExecutorService(executorService) {
+  /// Returns the [QueryThreadContext] for the current thread, or `null` if 
not available.
+  @Nullable
+  public static QueryThreadContext getIfAvailable() {
+    return THREAD_LOCAL.get();
+  }
+
+  /// Returns a new [ExecutorService] whose tasks will be executed with the 
[QueryThreadContext] initialized with the
+  /// state of the thread submitting the tasks.
+  public static ExecutorService contextAwareExecutorService(ExecutorService 
executorService) {
+    return new DecoratorExecutorService(executorService, future -> 
get().getExecutionContext().addTask(future)) {
       @Override
       protected <T> Callable<T> decorate(Callable<T> task) {
-        Memento memento = mementoSupplier.get();
+        QueryThreadContext parentThreadContext = get();
         return () -> {
-          try (CloseableContext ignored = open(memento)) {
+          try (QueryThreadContext ignore = 
open(parentThreadContext._executionContext,
+              parentThreadContext._mseWorkerInfo, 
parentThreadContext._resourceUsageAccountant)) {
             return task.call();
           }
         };
       }
 
       @Override
       protected Runnable decorate(Runnable task) {
-        Memento memento = mementoSupplier.get();
+        QueryThreadContext parentThreadContext = get();
         return () -> {
-          try (CloseableContext ignored = open(memento)) {
+          try (QueryThreadContext ignore = 
open(parentThreadContext._executionContext,
+              parentThreadContext._mseWorkerInfo, 
parentThreadContext._resourceUsageAccountant)) {
             task.run();
           }
         };
       }
     };
   }
 
-  /**
-   * Returns the start time of the query in milliseconds.
-   *
-   * The default value of 0 means the start time is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static long getStartTimeMs() {
-    return get().getStartTimeMs();
-  }
-
-  /**
-   * Sets the start time of the query in milliseconds since epoch.
-   *
-   * The start time can only be set once.
-   * @throws IllegalStateException if start time is already set or if the 
{@link QueryThreadContext} is not initialized
-   */
-  public static void setStartTimeMs(long startTimeMs) {
-    get().setStartTimeMs(startTimeMs);
-  }
-
-  /**
-   * Returns the active deadline time of the query in milliseconds since epoch.
-   *
-   * The default value of 0 means the deadline is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static long getActiveDeadlineMs() {
-    return get().getActiveDeadlineMs();
-  }
-
-  /**
-   * Sets the active deadline time of the query in milliseconds since epoch.
-   *
-   * The deadline can only be set once.
-   * @throws IllegalStateException if deadline is already set or if the {@link 
QueryThreadContext} is not initialized
-   */
-  public static void setActiveDeadlineMs(long activeDeadlineMs) {
-    get().setActiveDeadlineMs(activeDeadlineMs);
-  }
-
-  /**
-   * Returns the passive deadline time of the query in milliseconds since 
epoch.
-   *
-   * The default value of 0 means the deadline is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static long getPassiveDeadlineMs() {
-    return get().getPassiveDeadlineMs();
-  }
-
-  /**
-   * Sets the passive deadline time of the query in milliseconds since epoch.
-   *
-   * The deadline can only be set once.
-   * @throws IllegalStateException if deadline is already set or if the {@link 
QueryThreadContext} is not initialized
-   */
-  public static void setPassiveDeadlineMs(long passiveDeadlineMs) {
-    get().setPassiveDeadlineMs(passiveDeadlineMs);
-  }
-
-  /**
-   * Returns the timeout of the query in milliseconds.
-   *
-   * The default value of 0 means the timeout is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static String getBrokerId() {
-    return get().getBrokerId();
-  }
-
-  /**
-   * Sets the broker id of the query.
-   *
-   * The broker id can only be set once.
-   * @throws IllegalStateException if broker id is already set or if the 
{@link QueryThreadContext} is not initialized
-   */
-  public static void setBrokerId(String brokerId) {
-    get().setBrokerId(brokerId);
-  }
-
-  /**
-   * Returns the request id of the query.
-   *
-   * The request id is used to identify query phases across different systems.
-   * Contrary to the correlation id, a single logical query can have multiple 
request ids.
-   * This is because the request id is changed at different stages of the 
query (e.g. real-time and offline parts of
-   * the query or different leaf operations in MSE).
-   *
-   * Also remember that neither request nor correlation id are guaranteed to 
be unique.
-   *
-   * The default value of 0 means the request id is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static long getRequestId() {
-    return get().getRequestId();
-  }
-
-  /**
-   * Returns the correlation id of the query.
-   *
-   * Correlation id is used to track queries across different systems.
-   * This id can be supplied by the client or generated by the system (usually 
the broker). It is not guaranteed to be
-   * unique. Users can use the same correlation id for multiple queries to 
track a single logical workflow on their
-   * side.
-   *
-   * What is guaranteed is that the correlation id will be consistent across 
all the phases of the query. Remember that
-   * it is not the case for the request id, which can change at different 
stages of the query (e.g. real-time and
-   * offline parts of the query or different leaf operations in MSE).
-   *
-   * Also remember that neither request nor correlation id are guaranteed to 
be unique.
-   *
-   * The default value of {@code null} means the correlation id is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static String getCid() {
-    return get().getCid();
-  }
-
-  /**
-   * Sets the request id and correlation id of the query.
-   *
-   * Both requests can only be set once.
-   *
-   * Setting this value also registers it in the MDC context with the key 
{@link LoggerConstants#REQUEST_ID_KEY} and
-   * {@link LoggerConstants#CORRELATION_ID_KEY}.
-   * @throws IllegalStateException if any of the ids are already set or if the 
{@link QueryThreadContext} is not
-   * initialized
-   */
-  public static void setIds(long requestId, String cid) {
-    Instance instance = get();
-    instance.setRequestId(requestId);
-    instance.setCid(cid);
-  }
-
-  /**
-   * Returns the SQL of the query.
-   *
-   * The default value of {@code null} means the SQL is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static String getSql() {
-    return get().getSql();
-  }
-
-  /**
-   * Sets the SQL of the query.
-   *
-   * The SQL can only be set once.
-   * @throws IllegalStateException if sql is already set or if the {@link 
QueryThreadContext} is not initialized
-   */
-  public static void setSql(String sql) {
-    get().setSql(sql);
-  }
-
-  /**
-   * Returns the query engine that is being used.
-   *
-   * The default value of {@code null} means the query engine is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static String getQueryEngine() {
-    return get().getQueryEngine();
-  }
-
-  /**
-   * Sets the query engine that is being used.
-   *
-   * The query engine can only be set once.
-   * @throws IllegalStateException if query engine is already set or if the 
{@link QueryThreadContext} is not
-   * initialized
-   */
-  public static void setQueryEngine(String queryEngine) {
-    get().setQueryEngine(queryEngine);
-  }
-
-  /**
-   * Returns the instanceid of the query.
-   *
-   * This is usually the id that identifies the server, broker, controller, 
etc.
-   *
-   * The default value of {@code null} means the instanceid is not set.
-   * @throws IllegalStateException if the {@link QueryThreadContext} is not 
initialized
-   */
-  public static String getInstanceId() {
-    return get().getInstanceId();
-  }
-
-  /**
-   * This private class stores the actual state of the {@link 
QueryThreadContext} in a safe way.
-   *
-   * As part of the paranoid design of the {@link QueryThreadContext}, this 
class is used to store the state of the
-   * {@link QueryThreadContext} in a way that is safe to copy between threads.
-   * Callers never have access to this class directly, but only through the 
{@link QueryThreadContext} static methods.
-   * This is designed this way to ensure that the state is always copied 
between threads in a safe way.
-   * Given it is impossible for a caller to access this class directly, it can 
never be copied from one thread to
-   * another by mistake.
-   * This forces to use the {@link Memento} class to capture the state of the 
{@link QueryThreadContext} in the current
-   * thread and then use the {@link #open(Memento)} method to initialize the 
context in the target thread with the state
-   * captured in the {@link Memento} object.
-   */
-  private static class Instance implements CloseableContext {
-    private long _startTimeMs;
-    private long _activeDeadlineMs;
-    private long _passiveDeadlineMs;
-    private String _brokerId;
-    private long _requestId;
-    private String _cid;
-    private String _sql;
-    private String _queryEngine;
-    private String _instanceId;
-
-    public long getStartTimeMs() {
-      return _startTimeMs;
-    }
-
-    public void setStartTimeMs(long startTimeMs) {
-      Preconditions.checkState(getStartTimeMs() == 0, "Start time already set 
to %s, cannot set again",
-          getStartTimeMs());
-      _startTimeMs = startTimeMs;
-    }
-
-    public long getActiveDeadlineMs() {
-      return _activeDeadlineMs;
-    }
-
-    public void setActiveDeadlineMs(long activeDeadlineMs) {
-      Preconditions.checkState(getActiveDeadlineMs() == 0, "Deadline already 
set to %s, cannot set again",
-          getActiveDeadlineMs());
-      _activeDeadlineMs = activeDeadlineMs;
-    }
-
-    public long getPassiveDeadlineMs() {
-      return _passiveDeadlineMs;
-    }
-
-    public void setPassiveDeadlineMs(long passiveDeadlineMs) {
-      Preconditions.checkState(getPassiveDeadlineMs() == 0, "Passive deadline 
already set to %s, cannot set again",
-          getPassiveDeadlineMs());
-      _passiveDeadlineMs = passiveDeadlineMs;
-    }
-
-    public String getBrokerId() {
-      return _brokerId;
-    }
-
-    public void setBrokerId(String brokerId) {
-      Preconditions.checkState(getBrokerId() == null, "Broker id already set 
to %s, cannot set again",
-          getBrokerId());
-      _brokerId = brokerId;
-    }
-
-    public long getRequestId() {
-      return _requestId;
-    }
-
-    public void setRequestId(long requestId) {
-      Preconditions.checkState(getRequestId() == 0, "Request id already set to 
%s, cannot set again", getRequestId());
-      LoggerConstants.REQUEST_ID_KEY.registerInMdc(Long.toString(requestId));
-      _requestId = requestId;
-    }
-
-    public String getCid() {
-      return _cid;
-    }
-
-    public void setCid(String cid) {
-      Preconditions.checkState(getCid() == null, "Correlation id already set 
to %s, cannot set again",
-          getCid());
-      LoggerConstants.CORRELATION_ID_KEY.registerInMdc(cid);
-      _cid = cid;
-    }
-
-    public String getSql() {
-      return _sql;
-    }
-
-    public void setSql(String sql) {
-      Preconditions.checkState(getSql() == null, "SQL already set to %s, 
cannot set again",
-          getSql());
-      _sql = sql;
-    }
-
-    public String getQueryEngine() {
-      return _queryEngine;
-    }
-
-    public void setQueryEngine(String queryType) {
-      Preconditions.checkState(getQueryEngine() == null, "Query type already 
set to %s, cannot set again",
-          getQueryEngine());
-      _queryEngine = queryType;
-    }
-
-    public void setInstanceId(String instanceId) {
-      Preconditions.checkState(_instanceId == null, "Service id already set to 
%s, cannot set again",
-          getInstanceId());
-      _instanceId = instanceId;
-    }
-
-    public String getInstanceId() {
-      return _instanceId;
-    }
-
-    @Override
-    public String toString() {
-      try {
-        return JsonUtils.objectToString(this);
-      } catch (JsonProcessingException e) {
-        return "Failed to convert QueryThreadContext to JSON: " + 
e.getMessage();
-      }
-    }
-
-    /**
-     * Closes the {@link QueryThreadContext} and removes it from the 
thread-local storage and MDC context.
-     */
-    @Override
-    public void close() {
-      THREAD_LOCAL.remove();
-      if (_requestId != 0) {
-        LoggerConstants.REQUEST_ID_KEY.unregisterFromMdc();
-      }
-      if (_cid != null) {
-        LoggerConstants.CORRELATION_ID_KEY.unregisterFromMdc();
-      }
+  /// Checks if the query should be terminated.
+  public static void checkTermination(Supplier<String> scopeSupplier) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to