This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b061c555a4 Direct mse query server (#15445)
b061c555a4 is described below

commit b061c555a427efa8db83e77c342c6a027093d4c7
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Thu May 22 14:04:16 2025 +0200

    Direct mse query server (#15445)
---
 .../apache/pinot/query/runtime/QueryRunner.java    | 117 +++---
 .../runtime/executor/OpChainSchedulerService.java  |   3 +-
 .../pinot/query/service/server/QueryServer.java    | 412 ++++++++++++---------
 .../apache/pinot/query/QueryServerEnclosure.java   |   4 +-
 .../apache/pinot/spi/query/QueryThreadContext.java |   3 +-
 .../apache/pinot/spi/utils/CommonConstants.java    |  32 ++
 6 files changed, 347 insertions(+), 224 deletions(-)

diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 19282ae7a2..8609af8899 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.query.runtime;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import io.grpc.stub.StreamObserver;
@@ -28,6 +27,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
 import java.util.function.BiConsumer;
@@ -217,11 +217,6 @@ public class QueryRunner {
     LOGGER.info("Initialized QueryRunner with hostname: {}, port: {}", 
hostname, port);
   }
 
-  @VisibleForTesting
-  public ExecutorService getExecutorService() {
-    return _executorService;
-  }
-
   public void start() {
     _mailboxService.start();
     _leafQueryExecutor.start();
@@ -233,14 +228,27 @@ public class QueryRunner {
     ExecutorServiceUtils.close(_executorService);
   }
 
-  /**
-   * Execute a {@link StagePlan}.
-   *
-   * <p>This execution entry point should be asynchronously called by the 
request handler and caller should not wait
-   * for results/exceptions.</p>
-   */
-  public void processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan, 
Map<String, String> requestMetadata,
-      @Nullable ThreadExecutionContext parentContext) {
+  /// Asynchronously executes a [StagePlan].
+  ///
+  /// This method will not block the current thread but use 
[#_executorService] instead.
+  /// If any error happened during the asynchronous execution, an error block 
will be sent to all receiver mailboxes.
+  public CompletableFuture<Void> processQuery(WorkerMetadata workerMetadata, 
StagePlan stagePlan,
+      Map<String, String> requestMetadata, @Nullable ThreadExecutionContext 
parentContext) {
+    Runnable runnable = () -> processQueryBlocking(workerMetadata, stagePlan, 
requestMetadata, parentContext);
+    return CompletableFuture.runAsync(runnable, _executorService);
+  }
+
+  /// Executes a {@link StagePlan} pseudo-synchronously.
+  ///
+  /// First, the pipeline breaker is executed on the current thread. This is 
the blocking part of the method.
+  /// If the pipeline breaker execution fails, the current thread will send 
the error block to the receivers mailboxes.
+  ///
+  /// If the pipeline breaker success, the rest of the stage is asynchronously 
executed on the [#_opChainScheduler].
+  private void processQueryBlocking(WorkerMetadata workerMetadata, StagePlan 
stagePlan,
+      Map<String, String> requestMetadata, @Nullable ThreadExecutionContext 
parentContext) {
+    
MseWorkerThreadContext.setStageId(stagePlan.getStageMetadata().getStageId());
+    MseWorkerThreadContext.setWorkerId(workerMetadata.getWorkerId());
+
     long requestId = 
Long.parseLong(requestMetadata.get(MetadataKeys.REQUEST_ID));
     long timeoutMs = 
Long.parseLong(requestMetadata.get(QueryOptionKey.TIMEOUT_MS));
     long deadlineMs = System.currentTimeMillis() + timeoutMs;
@@ -256,36 +264,8 @@ public class QueryRunner {
     // Send error block to all the receivers if pipeline breaker fails
     if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock() 
!= null) {
       ErrorMseBlock errorBlock = pipelineBreakerResult.getErrorBlock();
-      int stageId = stageMetadata.getStageId();
-      LOGGER.error("Error executing pipeline breaker for request: {}, stage: 
{}, sending error block: {}", requestId,
-          stageId, errorBlock);
-      MailboxSendNode rootNode = (MailboxSendNode) stagePlan.getRootNode();
-      List<RoutingInfo> routingInfos = new ArrayList<>();
-      for (Integer receiverStageId : rootNode.getReceiverStageIds()) {
-        List<MailboxInfo> receiverMailboxInfos =
-            
workerMetadata.getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
-        List<RoutingInfo> stageRoutingInfos =
-            MailboxIdUtils.toRoutingInfos(requestId, stageId, 
workerMetadata.getWorkerId(), receiverStageId,
-                receiverMailboxInfos);
-        routingInfos.addAll(stageRoutingInfos);
-      }
-      for (RoutingInfo routingInfo : routingInfos) {
-        try {
-          StatMap<MailboxSendOperator.StatKey> statMap = new 
StatMap<>(MailboxSendOperator.StatKey.class);
-          SendingMailbox sendingMailbox = 
_mailboxService.getSendingMailbox(routingInfo.getHostname(),
-                routingInfo.getPort(), routingInfo.getMailboxId(), deadlineMs, 
statMap);
-            // TODO: Here we are breaking the stats invariants, sending errors 
without including the stats of the
-            //  current stage. We will need to fix this in future, but for 
now, we are sending the error block without
-            //  the stats.
-            sendingMailbox.send(errorBlock, Collections.emptyList());
-        } catch (TimeoutException e) {
-          LOGGER.warn("Timed out sending error block to mailbox: {} for 
request: {}, stage: {}",
-              routingInfo.getMailboxId(), requestId, stageId, e);
-        } catch (Exception e) {
-          LOGGER.error("Caught exception sending error block to mailbox: {} 
for request: {}, stage: {}",
-              routingInfo.getMailboxId(), requestId, stageId, e);
-        }
-      }
+      notifyErrorAfterSubmission(stageMetadata.getStageId(), errorBlock, 
requestId, workerMetadata, stagePlan,
+          deadlineMs);
       return;
     }
 
@@ -295,12 +275,52 @@ public class QueryRunner {
             workerMetadata, pipelineBreakerResult, parentContext, 
_sendStats.getAsBoolean());
     OpChain opChain;
     if (workerMetadata.isLeafStageWorker()) {
-      opChain =
-          ServerPlanRequestUtils.compileLeafStage(executionContext, stagePlan, 
_leafQueryExecutor, _executorService);
+      opChain = ServerPlanRequestUtils.compileLeafStage(executionContext, 
stagePlan,
+          _leafQueryExecutor, _executorService);
     } else {
       opChain = PlanNodeToOpChain.convert(stagePlan.getRootNode(), 
executionContext);
     }
-    _opChainScheduler.register(opChain);
+    try {
+      // This can fail if the executor rejects the task.
+      _opChainScheduler.register(opChain);
+    } catch (RuntimeException e) {
+      ErrorMseBlock errorBlock = ErrorMseBlock.fromException(e);
+      notifyErrorAfterSubmission(stageMetadata.getStageId(), errorBlock, 
requestId, workerMetadata, stagePlan,
+          deadlineMs);
+    }
+  }
+
+  private void notifyErrorAfterSubmission(int stageId, ErrorMseBlock 
errorBlock, long requestId,
+      WorkerMetadata workerMetadata, StagePlan stagePlan, long deadlineMs) {
+    LOGGER.error("Error executing pipeline breaker for request: {}, stage: {}, 
sending error block: {}", requestId,
+        stageId, errorBlock);
+    MailboxSendNode rootNode = (MailboxSendNode) stagePlan.getRootNode();
+    List<RoutingInfo> routingInfos = new ArrayList<>();
+    for (Integer receiverStageId : rootNode.getReceiverStageIds()) {
+      List<MailboxInfo> receiverMailboxInfos =
+          
workerMetadata.getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
+      List<RoutingInfo> stageRoutingInfos =
+          MailboxIdUtils.toRoutingInfos(requestId, stageId, 
workerMetadata.getWorkerId(), receiverStageId,
+              receiverMailboxInfos);
+      routingInfos.addAll(stageRoutingInfos);
+    }
+    for (RoutingInfo routingInfo : routingInfos) {
+      try {
+        StatMap<MailboxSendOperator.StatKey> statMap = new 
StatMap<>(MailboxSendOperator.StatKey.class);
+        SendingMailbox sendingMailbox = 
_mailboxService.getSendingMailbox(routingInfo.getHostname(),
+            routingInfo.getPort(), routingInfo.getMailboxId(), deadlineMs, 
statMap);
+        // TODO: Here we are breaking the stats invariants, sending errors 
without including the stats of the
+        //  current stage. We will need to fix this in future, but for now, we 
are sending the error block without
+        //  the stats.
+        sendingMailbox.send(errorBlock, Collections.emptyList());
+      } catch (TimeoutException e) {
+        LOGGER.warn("Timed out sending error block to mailbox: {} for request: 
{}, stage: {}",
+            routingInfo.getMailboxId(), requestId, stageId, e);
+      } catch (Exception e) {
+        LOGGER.error("Caught exception sending error block to mailbox: {} for 
request: {}, stage: {}",
+            routingInfo.getMailboxId(), requestId, stageId, e);
+      }
+    }
   }
 
   /**
@@ -463,8 +483,7 @@ public class QueryRunner {
     Map<String, String> opChainMetadata = 
consolidateMetadata(stageMetadata.getCustomProperties(), requestMetadata);
 
     if (PipelineBreakerExecutor.hasPipelineBreakers(stagePlan)) {
-      // TODO: Support pipeline breakers before merging this feature.
-      //  See https://github.com/apache/pinot/pull/13733#discussion_r1752031714
+      //TODO: See 
https://github.com/apache/pinot/pull/13733#discussion_r1752031714
       LOGGER.error("Pipeline breaker is not supported in explain query");
       return stagePlan;
     }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index f3daece353..2ed131b782 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -75,8 +75,6 @@ public class OpChainSchedulerService {
   }
 
   public void register(OpChain operatorChain) {
-    _opChainCache.put(operatorChain.getId(), operatorChain.getRoot());
-
     Future<?> scheduledFuture = _executorService.submit(new TraceRunnable() {
       @Override
       public void runJob() {
@@ -114,6 +112,7 @@ public class OpChainSchedulerService {
         }
       }
     });
+    _opChainCache.put(operatorChain.getId(), operatorChain.getRoot());
     _submittedOpChainMap.put(operatorChain.getId(), scheduledFuture);
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index 6ac7ca5598..d5f14d6da5 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -30,13 +30,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.function.BiFunction;
-import java.util.function.Consumer;
 import javax.annotation.Nullable;
 import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
 import org.apache.pinot.common.config.TlsConfig;
@@ -54,9 +51,9 @@ import org.apache.pinot.query.routing.StagePlan;
 import org.apache.pinot.query.routing.WorkerMetadata;
 import org.apache.pinot.query.runtime.QueryRunner;
 import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
-import org.apache.pinot.query.service.dispatch.QueryDispatcher;
 import org.apache.pinot.spi.accounting.ThreadExecutionContext;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.executor.ExecutorServiceUtils;
 import org.apache.pinot.spi.executor.MetricsExecutor;
 import org.apache.pinot.spi.query.QueryThreadContext;
 import org.apache.pinot.spi.trace.Tracing;
@@ -65,9 +62,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
-/**
- * {@link QueryServer} is the GRPC server that accepts query plan requests 
sent from {@link QueryDispatcher}.
- */
+/// QueryServer is theGRPC server that accepts MSE query plan request sent from
+/// [org.apache.pinot.query.service.dispatch.QueryDispatcher].
+///
+/// All endpoints are being called by the GRPC server threads, which must 
never be blocked.
+/// In some situations we need to run blocking code to resolve these requests, 
so we need to move the work to a
+/// different thread pool. See each method for details.
 public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase 
{
   private static final Logger LOGGER = 
LoggerFactory.getLogger(QueryServer.class);
   // TODO: Inbound messages can get quite large because we send the entire 
stage metadata map in each call.
@@ -81,7 +81,20 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
   // query submission service is only used for plan submission for now.
   // TODO: with complex query submission logic we should allow asynchronous 
query submission return instead of
   //   directly return from submission response observer.
-  private final ExecutorService _querySubmissionExecutorService;
+  private final ExecutorService _submissionExecutorService;
+  /// The thread executor service used to run explain plan requests.
+  /// In order to calculate explain plans we need to calculate the physical 
plan for each worker, which is a
+  /// non-blocking but heavier CPU operation than the normal query submission.
+  /// This is why we use a different thread pool for this operation: We don't 
want to block the query submission thread
+  /// pool.
+  /// Given the QPS for explain plans is much lower than the normal query 
submission, we can use a single thread for
+  /// this.
+  private final ExecutorService _explainExecutorService;
+  /// The thread executor service used to run time series query requests.
+  ///
+  /// In order to run time series we need to call 
[QueryRunner#processTimeSeriesQuery] which contrary to the normal MSE
+  /// methods, is a blocking method. This is why we need to use a different 
(and usually cached) thread pool for this.
+  private final ExecutorService _timeSeriesExecutorService;
 
   private Server _server = null;
 
@@ -95,36 +108,41 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
     _queryRunner = queryRunner;
     _tlsConfig = tlsConfig;
 
-    ExecutorService baseExecutor = Executors.newCachedThreadPool(
-        new NamedThreadFactory("query_submission_executor_on_" + _port + 
"_port"));
+    ExecutorService baseExecutorService = ExecutorServiceUtils.create(config,
+        CommonConstants.Server.MULTISTAGE_SUBMISSION_EXEC_CONFIG_PREFIX,
+        "query_submission_executor_on_" + _port + "_port",
+        CommonConstants.Server.DEFAULT_MULTISTAGE_SUBMISSION_EXEC_TYPE);
 
     ServerMetrics serverMetrics = ServerMetrics.get();
-    MetricsExecutor withMetrics = new MetricsExecutor(
-        baseExecutor,
+    _submissionExecutorService = new MetricsExecutor(
+        baseExecutorService,
         
serverMetrics.getMeteredValue(ServerMeter.MULTI_STAGE_SUBMISSION_STARTED_TASKS),
         
serverMetrics.getMeteredValue(ServerMeter.MULTI_STAGE_SUBMISSION_COMPLETED_TASKS));
-    _querySubmissionExecutorService = 
MseWorkerThreadContext.contextAwareExecutorService(
-        QueryThreadContext.contextAwareExecutorService(withMetrics));
+
+    NamedThreadFactory explainThreadFactory =
+        new NamedThreadFactory("query_explain_on_" + _port + "_port");
+    _explainExecutorService = 
Executors.newSingleThreadExecutor(explainThreadFactory);
+
+    ExecutorService baseTimeSeriesExecutorService = 
ExecutorServiceUtils.create(config,
+        CommonConstants.Server.MULTISTAGE_TIMESERIES_EXEC_CONFIG_PREFIX,
+        "query_ts_on_" + _port + "_port",
+        CommonConstants.Server.DEFAULT_TIMESERIES_EXEC_CONFIG_PREFIX);
+    _timeSeriesExecutorService = 
MseWorkerThreadContext.contextAwareExecutorService(
+        
QueryThreadContext.contextAwareExecutorService(baseTimeSeriesExecutorService));
   }
 
   public void start() {
     LOGGER.info("Starting QueryServer");
     try {
       if (_server == null) {
+        ServerBuilder<?> serverBuilder;
         if (_tlsConfig == null) {
-          _server = ServerBuilder
-              .forPort(_port)
-              .addService(this)
-              .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
-              .build();
+          serverBuilder = ServerBuilder.forPort(_port);
         } else {
-          _server = NettyServerBuilder
-              .forPort(_port)
-              .addService(this)
-              .sslContext(GrpcQueryServer.buildGrpcSslContext(_tlsConfig))
-              .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
-              .build();
+          serverBuilder = NettyServerBuilder.forPort(_port)
+              .sslContext(GrpcQueryServer.buildGrpcSslContext(_tlsConfig));
         }
+        _server = buildGrpcServer(serverBuilder);
         LOGGER.info("Initialized QueryServer on port: {}", _port);
       }
       _queryRunner.start();
@@ -134,6 +152,15 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
     }
   }
 
+  private <T extends ServerBuilder<T>> Server buildGrpcServer(ServerBuilder<T> 
builder) {
+    return builder
+         // By using directExecutor, GRPC doesn't need to manage its own 
thread pool
+        .directExecutor()
+        .addService(this)
+        .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
+        .build();
+  }
+
   public void shutdown() {
     LOGGER.info("Shutting down QueryServer");
     try {
@@ -142,12 +169,23 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
         _server.shutdown();
         _server.awaitTermination();
       }
-      _querySubmissionExecutorService.shutdown();
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
+
+    _submissionExecutorService.shutdown();
+    _explainExecutorService.shutdown();
+    _timeSeriesExecutorService.shutdown();
   }
 
+  /// Submits a query for executions.
+  ///
+  /// The request is deserialized on the GRPC thread and the rest of the work 
is done on the submission thread pool.
+  /// Given the submission code should be not blocking, we could directly use 
the GRPC thread pool, but we decide to
+  /// use a different thread pool to be able to time out the submission in the 
rare case it takes too long.
+  ///
+  // TODO: Study if that is actually needed. We could just use the GRPC thread 
pool and timeout once the execution
+  //   starts in the query runner.
   @Override
   public void submit(Worker.QueryRequest request, 
StreamObserver<Worker.QueryResponse> responseObserver) {
     Map<String, String> reqMetadata;
@@ -163,37 +201,92 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
       return;
     }
 
-    try (QueryThreadContext.CloseableContext queryTlClosable = 
QueryThreadContext.openFromRequestMetadata(reqMetadata);
+    long timeoutMs = 
Long.parseLong(reqMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
+
+    CompletableFuture.runAsync(() -> submitInternal(request, reqMetadata), 
_submissionExecutorService)
+        .orTimeout(timeoutMs, TimeUnit.MILLISECONDS)
+        .whenComplete((result, error) -> {
+          // this will always be called, either on the submission thread that 
finished the last or in the caller
+          // (GRPC) thread in the improbable case all submission tasks 
finished before the caller thread reaches
+          // this line
+          if (error != null) { // if there was an error submitting the 
request, return an error response
+            try (QueryThreadContext.CloseableContext qCtx = 
QueryThreadContext.openFromRequestMetadata(reqMetadata);
+                QueryThreadContext.CloseableContext mseCtx = 
MseWorkerThreadContext.open()) {
+              long requestId = QueryThreadContext.getRequestId();
+              LOGGER.error("Caught exception while submitting request: {}", 
requestId, error);
+              String errorMsg = "Caught exception while submitting request: " 
+ error.getMessage();
+              responseObserver.onNext(Worker.QueryResponse.newBuilder()
+                  
.putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR, 
errorMsg)
+                  .build());
+              responseObserver.onCompleted();
+            }
+          } else { // if the request was submitted successfully, return a 
success response
+            responseObserver.onNext(
+                Worker.QueryResponse.newBuilder()
+                    
.putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_OK, "")
+                    .build());
+            responseObserver.onCompleted();
+          }
+        });
+  }
+
+  /// Iterates over all the stage plans in the request and submits each worker 
to the query runner.
+  ///
+  /// This method should be called from the submission thread pool.
+  /// If any exception is thrown while submitting a worker, all the workers 
that have been started are cancelled and the
+  /// exception is thrown.
+  ///
+  /// Remember that this method doesn't track the status of the workers their 
self. In case of error while the worker is
+  /// running, the exception will be managed by the worker, which usually send 
the error downstream to the receiver
+  /// mailboxes. Therefore these error won't be reported here.
+  private void submitInternal(Worker.QueryRequest request, Map<String, String> 
reqMetadata) {
+    try (QueryThreadContext.CloseableContext qTlClosable = 
QueryThreadContext.openFromRequestMetadata(reqMetadata);
         QueryThreadContext.CloseableContext mseTlCloseable = 
MseWorkerThreadContext.open()) {
-      long requestId = QueryThreadContext.getRequestId();
       QueryThreadContext.setQueryEngine("mse");
+      List<Worker.StagePlan> protoStagePlans = request.getStagePlanList();
 
-      Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId), 
ThreadExecutionContext.TaskType.MSE);
-      ThreadExecutionContext parentContext = 
Tracing.getThreadAccountant().getThreadExecutionContext();
-      try {
-        forEachStage(request,
-            (stagePlan, workerMetadata) -> {
-              _queryRunner.processQuery(workerMetadata, stagePlan, 
reqMetadata, parentContext);
-              return null;
-            },
-            (ignored) -> {
-            });
-      } catch (ExecutionException | InterruptedException | TimeoutException | 
RuntimeException e) {
-        LOGGER.error("Caught exception while submitting request: {}", 
requestId, e);
-        String errorMsg = "Caught exception while submitting request: " + 
e.getMessage();
-        responseObserver.onNext(Worker.QueryResponse.newBuilder()
-            
.putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR, 
errorMsg)
-                .build());
-        responseObserver.onCompleted();
-        return;
-      } finally {
-        Tracing.ThreadAccountantOps.clear();
+      List<CompletableFuture<Void>> startedWorkers = new ArrayList<>();
+
+      for (Worker.StagePlan protoStagePlan : protoStagePlans) {
+        StagePlan stagePlan = deserializePlan(protoStagePlan);
+        StageMetadata stageMetadata = stagePlan.getStageMetadata();
+        List<WorkerMetadata> workerMetadataList = 
stageMetadata.getWorkerMetadataList();
+
+        for (WorkerMetadata workerMetadata : workerMetadataList) {
+          try {
+            CompletableFuture<Void> job = submitWorker(workerMetadata, 
stagePlan, reqMetadata);
+            startedWorkers.add(job);
+          } catch (RuntimeException e) {
+            startedWorkers.forEach(worker -> worker.cancel(true));
+            throw e;
+          }
+        }
       }
-      responseObserver.onNext(
-          Worker.QueryResponse.newBuilder()
-              
.putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_OK, "")
-              .build());
-      responseObserver.onCompleted();
+    }
+  }
+
+  /// Submits creates a new worker by submitting the stage plan to the query 
runner.
+  ///
+  /// The returned CompletableFuture will be completed when the opchain 
starts, which may take a while if there are
+  /// pipeline breakers.
+  /// Remember that the completable future may (and usually will) be completed 
*before* the opchain is finished.
+  ///
+  /// If there is an error scheduling creating the worker (ie too many workers 
are already running), the
+  /// CompletableFuture will be completed exceptionally. In that case it is up 
to the caller to deal with the error
+  /// (normally cancelling other already started workers and sending the error 
through GRPC)
+  private CompletableFuture<Void> submitWorker(WorkerMetadata workerMetadata, 
StagePlan stagePlan,
+      Map<String, String> reqMetadata) {
+    String requestIdStr = Long.toString(QueryThreadContext.getRequestId());
+
+    //TODO: Verify if this matches with what OOM protection expects. This 
method will not block for the query to
+    // finish, so it may be breaking some of the OOM protection assumptions.
+    Tracing.ThreadAccountantOps.setupRunner(requestIdStr, 
ThreadExecutionContext.TaskType.MSE);
+    ThreadExecutionContext parentContext = 
Tracing.getThreadAccountant().getThreadExecutionContext();
+
+    try {
+      return _queryRunner.processQuery(workerMetadata, stagePlan, reqMetadata, 
parentContext);
+    } finally {
+      Tracing.ThreadAccountantOps.clear();
     }
   }
 
@@ -212,49 +305,103 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
       return;
     }
 
-    try (QueryThreadContext.CloseableContext queryTlClosable = 
QueryThreadContext.openFromRequestMetadata(reqMetadata);
-        QueryThreadContext.CloseableContext mseTlCloseable = 
MseWorkerThreadContext.open()) {
-      try {
-        forEachStage(request,
-            (stagePlan, workerMetadata) -> 
_queryRunner.explainQuery(workerMetadata, stagePlan, reqMetadata),
-            (plans) -> {
-              Worker.ExplainResponse.Builder builder = 
Worker.ExplainResponse.newBuilder();
-              for (StagePlan plan : plans) {
-                ByteString rootAsBytes = 
PlanNodeSerializer.process(plan.getRootNode()).toByteString();
-
-                StageMetadata metadata = plan.getStageMetadata();
-                List<Worker.WorkerMetadata> protoWorkerMetadataList =
-                    
QueryPlanSerDeUtils.toProtoWorkerMetadataList(metadata.getWorkerMetadataList());
-
-                
builder.addStagePlan(Worker.StagePlan.newBuilder().setRootNode(rootAsBytes).setStageMetadata(
-                    
Worker.StageMetadata.newBuilder().setStageId(metadata.getStageId())
-                        .addAllWorkerMetadata(protoWorkerMetadataList)
-                        
.setCustomProperty(QueryPlanSerDeUtils.toProtoProperties(metadata.getCustomProperties()))));
-              }
-              
builder.putMetadata(CommonConstants.Explain.Response.ServerResponseStatus.STATUS_OK,
 "");
-              responseObserver.onNext(builder.build());
-            });
-      } catch (ExecutionException | InterruptedException | TimeoutException | 
RuntimeException e) {
-        long requestId = QueryThreadContext.getRequestId();
-        LOGGER.error("Caught exception while submitting request: {}", 
requestId, e);
-        String errorMsg = "Caught exception while submitting request: " + 
e.getMessage();
-        responseObserver.onNext(Worker.ExplainResponse.newBuilder()
-            
.putMetadata(CommonConstants.Explain.Response.ServerResponseStatus.STATUS_ERROR,
 errorMsg)
+    long timeoutMs = 
Long.parseLong(reqMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
+
+    CompletableFuture.runAsync(() -> explainInternal(request, 
responseObserver, reqMetadata), _explainExecutorService)
+        .orTimeout(timeoutMs, TimeUnit.MILLISECONDS)
+        .handle((result, error) -> {
+      if (error != null) {
+        try (QueryThreadContext.CloseableContext qCtx = 
QueryThreadContext.openFromRequestMetadata(reqMetadata);
+            QueryThreadContext.CloseableContext mseCtx = 
MseWorkerThreadContext.open()) {
+          long requestId = QueryThreadContext.getRequestId();
+          LOGGER.error("Caught exception while submitting request: {}", 
requestId, error);
+          String errorMsg = "Caught exception while submitting request: " + 
error.getMessage();
+          synchronized (responseObserver) {
+            responseObserver.onNext(Worker.ExplainResponse.newBuilder()
+                
.putMetadata(CommonConstants.Explain.Response.ServerResponseStatus.STATUS_ERROR,
 errorMsg)
                 .build());
-        responseObserver.onCompleted();
-        return;
+            responseObserver.onCompleted();
+          }
+        }
+      } else {
+        synchronized (responseObserver) {
+          responseObserver.onNext(
+              Worker.ExplainResponse.newBuilder()
+                  
.putMetadata(CommonConstants.Explain.Response.ServerResponseStatus.STATUS_OK, 
"")
+                  .build());
+          responseObserver.onCompleted();
+        }
+      }
+      return null;
+    });
+  }
+
+  /// Iterates over all the stage plans in the request and explains the query 
for each worker.
+  ///
+  /// This method should be called from the submission thread pool.
+  /// Each stage plan will be sent as a new response to the response observer.
+  /// In case of failure, this method will fail with an exception and the 
response observer won't be notified.
+  ///
+  /// Remember that in case of error some stages may have been explained and 
some not.
+  private void explainInternal(Worker.QueryRequest request, 
StreamObserver<Worker.ExplainResponse> responseObserver,
+      Map<String, String> reqMetadata) {
+    try (QueryThreadContext.CloseableContext qTlClosable = 
QueryThreadContext.openFromRequestMetadata(reqMetadata);
+        QueryThreadContext.CloseableContext mseTlCloseable = 
MseWorkerThreadContext.open()) {
+      // Explain the stage for each worker
+      BiFunction<StagePlan, WorkerMetadata, StagePlan> explainFun = 
(stagePlan, workerMetadata) ->
+          _queryRunner.explainQuery(workerMetadata, stagePlan, reqMetadata);
+
+      List<Worker.StagePlan> protoStagePlans = request.getStagePlanList();
+
+      for (Worker.StagePlan protoStagePlan : protoStagePlans) {
+        StagePlan stagePlan = deserializePlan(protoStagePlan);
+        StageMetadata stageMetadata = stagePlan.getStageMetadata();
+        List<WorkerMetadata> workerMetadataList = 
stageMetadata.getWorkerMetadataList();
+
+        Worker.ExplainResponse stageResponse = calculateExplainPlanForStage(
+            protoStagePlan, workerMetadataList.toArray(new WorkerMetadata[0]), 
reqMetadata);
+        synchronized (responseObserver) {
+          responseObserver.onNext(stageResponse);
+        }
       }
-      responseObserver.onNext(
-          Worker.ExplainResponse.newBuilder()
-              
.putMetadata(CommonConstants.Explain.Response.ServerResponseStatus.STATUS_OK, 
"")
-              .build());
-      responseObserver.onCompleted();
     }
   }
 
+  /// Calculates the explain plan for a stage, iterating over each worker and 
merging the results.
+  ///
+  /// The result is then returned as result.
+  private Worker.ExplainResponse calculateExplainPlanForStage(Worker.StagePlan 
protoStagePlan,
+      WorkerMetadata[] workerMetadataList, Map<String, String> reqMetadata) {
+    Worker.ExplainResponse.Builder builder = 
Worker.ExplainResponse.newBuilder();
+    StagePlan stagePlan = deserializePlan(protoStagePlan);
+    for (WorkerMetadata workerMetadata : workerMetadataList) {
+      StagePlan explainPlan = _queryRunner.explainQuery(workerMetadata, 
stagePlan, reqMetadata);
+
+      ByteString rootAsBytes = 
PlanNodeSerializer.process(explainPlan.getRootNode()).toByteString();
+
+      StageMetadata metadata = explainPlan.getStageMetadata();
+      List<Worker.WorkerMetadata> protoWorkerMetadataList =
+          
QueryPlanSerDeUtils.toProtoWorkerMetadataList(metadata.getWorkerMetadataList());
+
+      builder.addStagePlan(Worker.StagePlan.newBuilder()
+          .setRootNode(rootAsBytes)
+          .setStageMetadata(Worker.StageMetadata.newBuilder()
+              .setStageId(metadata.getStageId())
+              .addAllWorkerMetadata(protoWorkerMetadataList)
+              
.setCustomProperty(QueryPlanSerDeUtils.toProtoProperties(metadata.getCustomProperties()))));
+    }
+    
builder.putMetadata(CommonConstants.Explain.Response.ServerResponseStatus.STATUS_OK,
 "");
+    return builder.build();
+  }
+
   @Override
   public void submitTimeSeries(Worker.TimeSeriesQueryRequest request,
       StreamObserver<Worker.TimeSeriesResponse> responseObserver) {
+    CompletableFuture.runAsync(() -> submitTimeSeriesInternal(request, 
responseObserver), _timeSeriesExecutorService);
+  }
+
+  private void submitTimeSeriesInternal(Worker.TimeSeriesQueryRequest request,
+      StreamObserver<Worker.TimeSeriesResponse> responseObserver) {
     try (QueryThreadContext.CloseableContext queryTlClosable = 
QueryThreadContext.open();
         QueryThreadContext.CloseableContext mseTlCloseable = 
MseWorkerThreadContext.open()) {
       // TODO: populate the thread context with TSE information
@@ -263,6 +410,11 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
     }
   }
 
+  /// Executes a cancel request.
+  ///
+  /// Cancel requests should always be executed on different threads than the 
submission threads to be sure that
+  /// the cancel request is not waiting for the query submission to finish. 
Given these requests are not blocking, we
+  /// run them in the GRPC thread pool.
   @Override
   public void cancel(Worker.CancelRequest request, 
StreamObserver<Worker.CancelResponse> responseObserver) {
     long requestId = request.getRequestId();
@@ -272,6 +424,7 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
 
       Worker.CancelResponse.Builder cancelBuilder = 
Worker.CancelResponse.newBuilder();
       for (Map.Entry<Integer, MultiStageQueryStats.StageStats.Closed> 
statEntry : stats.entrySet()) {
+        // even we are using output streams here, these calls are non-blocking 
because we use in memory output streams
         try (UnsynchronizedByteArrayOutputStream baos = new 
UnsynchronizedByteArrayOutputStream.Builder().get();
             DataOutputStream daos = new DataOutputStream(baos)) {
           statEntry.getValue().serialize(daos);
@@ -290,93 +443,14 @@ public class QueryServer extends 
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
     responseObserver.onCompleted();
   }
 
-  private <W> void submitStage(
-      Worker.StagePlan protoStagePlan,
-      BiFunction<StagePlan, WorkerMetadata, W> submitFunction,
-      Consumer<W> consumer) {
-    StagePlan stagePlan;
+  private StagePlan deserializePlan(Worker.StagePlan protoStagePlan) {
     try {
-      stagePlan = QueryPlanSerDeUtils.fromProtoStagePlan(protoStagePlan);
+      return QueryPlanSerDeUtils.fromProtoStagePlan(protoStagePlan);
     } catch (Exception e) {
       long requestId = QueryThreadContext.getRequestId();
       throw new RuntimeException(
           String.format("Caught exception while deserializing stage plan for 
request: %d, stage: %d", requestId,
               protoStagePlan.getStageMetadata().getStageId()), e);
     }
-    StageMetadata stageMetadata = stagePlan.getStageMetadata();
-    MseWorkerThreadContext.setStageId(stageMetadata.getStageId());
-    List<WorkerMetadata> workerMetadataList = 
stageMetadata.getWorkerMetadataList();
-    int numWorkers = workerMetadataList.size();
-    CompletableFuture<W>[] workerSubmissionStubs = new 
CompletableFuture[numWorkers];
-    for (int j = 0; j < numWorkers; j++) {
-      WorkerMetadata workerMetadata = workerMetadataList.get(j);
-      workerSubmissionStubs[j] = CompletableFuture.supplyAsync(
-          () -> {
-            MseWorkerThreadContext.setWorkerId(workerMetadata.getWorkerId());
-            return submitFunction.apply(stagePlan, workerMetadata);
-          },
-          _querySubmissionExecutorService);
-    }
-
-    try {
-      long deadlineMs = QueryThreadContext.getDeadlineMs();
-      for (int j = 0; j < numWorkers; j++) {
-        W workerResult = workerSubmissionStubs[j].get(deadlineMs - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
-        consumer.accept(workerResult);
-      }
-    } catch (TimeoutException e) {
-      long requestId = QueryThreadContext.getRequestId();
-      throw new RuntimeException(
-          "Timeout while submitting request: " + requestId + ", stage: " + 
stageMetadata.getStageId(), e);
-    } catch (Exception e) {
-      long requestId = QueryThreadContext.getRequestId();
-      throw new RuntimeException(
-          "Caught exception while submitting request: " + requestId + ", 
stage: " + stageMetadata.getStageId()
-              + ": " + e.getMessage(), e);
-    } finally {
-      for (CompletableFuture<?> future : workerSubmissionStubs) {
-        if (!future.isDone()) {
-          future.cancel(true);
-        }
-      }
-    }
-  }
-
-  /**
-   * Submits each stage in the request to the workers and waits for all 
workers to complete,
-   * applying the submitFunction to each worker and the consumer to the list 
of results.
-   *
-   * @param request the query request
-   * @param submitFunction the function to apply to each worker.
-   * @param consumer the consumer to apply to the list of results. It can just 
ignore the results if not needed.
-   * @param <W> the type of the result returned by the submitFunction.
-   */
-  <W> void forEachStage(Worker.QueryRequest request,
-      BiFunction<StagePlan, WorkerMetadata, W> submitFunction, 
Consumer<List<W>> consumer)
-      throws ExecutionException, InterruptedException, TimeoutException {
-    List<Worker.StagePlan> protoStagePlans = request.getStagePlanList();
-    int numStages = protoStagePlans.size();
-    List<CompletableFuture<List<W>>> stageSubmissionStubs = new 
ArrayList<>(numStages);
-    for (Worker.StagePlan protoStagePlan : protoStagePlans) {
-      CompletableFuture<List<W>> future = CompletableFuture.supplyAsync(() -> {
-        List<W> plans = new ArrayList<>();
-        submitStage(protoStagePlan, submitFunction, plans::add);
-        return plans;
-      }, _querySubmissionExecutorService);
-      stageSubmissionStubs.add(future);
-    }
-    try {
-      long deadlineMs = QueryThreadContext.getDeadlineMs();
-      for (CompletableFuture<List<W>> stageSubmissionStub : 
stageSubmissionStubs) {
-        List<W> plans = stageSubmissionStub.get(deadlineMs - 
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
-        consumer.accept(plans);
-      }
-    } finally {
-      for (CompletableFuture<?> future : stageSubmissionStubs) {
-        if (!future.isDone()) {
-          future.cancel(true);
-        }
-      }
-    }
   }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 7308e63b56..a00be6155c 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -80,9 +80,7 @@ public class QueryServerEnclosure {
       Map<String, String> requestMetadataMap, ThreadExecutionContext 
parentContext) {
     try (QueryThreadContext.CloseableContext closeMe1 = 
QueryThreadContext.openFromRequestMetadata(requestMetadataMap);
         QueryThreadContext.CloseableContext closeMe2 = 
MseWorkerThreadContext.open()) {
-      return CompletableFuture.runAsync(
-          () -> _queryRunner.processQuery(workerMetadata, stagePlan, 
requestMetadataMap, parentContext),
-          _queryRunner.getExecutorService());
+      return _queryRunner.processQuery(workerMetadata, stagePlan, 
requestMetadataMap, parentContext);
     }
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java
index 1355fb7196..805f0ad434 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java
@@ -85,6 +85,7 @@ public class QueryThreadContext {
     String mode = 
conf.getProperty(CommonConstants.Query.CONFIG_OF_QUERY_CONTEXT_MODE);
     if ("strict".equalsIgnoreCase(mode)) {
       _strictMode = true;
+      return;
     }
     if (mode != null && !mode.isEmpty()) {
       throw new IllegalArgumentException("Invalid value '" + mode + "' for "
@@ -111,7 +112,7 @@ public class QueryThreadContext {
       String errorMessage = "QueryThreadContext is not initialized";
       if (_strictMode) {
         LOGGER.error(errorMessage);
-        throw new IllegalStateException("QueryThreadContext is not 
initialized");
+        throw new IllegalStateException(errorMessage);
       } else {
         LOGGER.debug(errorMessage);
         // in non-strict mode, return the fake instance
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 0ea24be9ad..2def9e6b57 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -907,15 +907,47 @@ public class CommonConstants {
     public static final int DEFAULT_MSE_MIN_GROUP_TRIM_SIZE = 5000;
 
     // TODO: Merge this with "mse"
+    /**
+     * The ExecutorServiceProvider to use for execution threads, which are the 
ones that execute
+     * MultiStageOperators (and SSE operators in the leaf stages).
+     *
+     * It is recommended to use cached. In case fixed is used, it should use a 
large enough number of threads or
+     * parent operators may consume all threads.
+     * In Java 21 or newer, virtual threads are a good solution. Although 
Apache Pinot doesn't include this option yet,
+     * it is trivial to implement that plugin.
+     *
+     * See QueryRunner
+     */
     public static final String MULTISTAGE_EXECUTOR = "multistage.executor";
     public static final String MULTISTAGE_EXECUTOR_CONFIG_PREFIX =
         QUERY_EXECUTOR_CONFIG_PREFIX + "." + MULTISTAGE_EXECUTOR;
     public static final String DEFAULT_MULTISTAGE_EXECUTOR_TYPE = "cached";
+    /**
+     * The ExecutorServiceProvider to be used for submission threads, which 
are the ones
+     * that receive requests in protobuf and transform them into 
MultiStageOperators.
+     *
+     * It is recommended to use a fixed thread pool, given submission code 
should not block.
+     *
+     * See QueryServer
+     */
+    public static final String MULTISTAGE_SUBMISSION_EXEC_CONFIG_PREFIX =
+        QUERY_EXECUTOR_CONFIG_PREFIX + "." + "multistage.submission";
+    public static final String DEFAULT_MULTISTAGE_SUBMISSION_EXEC_TYPE = 
"fixed";
     @Deprecated
     public static final String CONFIG_OF_QUERY_EXECUTOR_OPCHAIN_EXECUTOR = 
MULTISTAGE_EXECUTOR_CONFIG_PREFIX;
     @Deprecated
     public static final String DEFAULT_QUERY_EXECUTOR_OPCHAIN_EXECUTOR = 
DEFAULT_MULTISTAGE_EXECUTOR_TYPE;
 
+    /**
+     * The ExecutorServiceProvider to be used for timeseries threads.
+     *
+     * It is recommended to use a cached thread pool, given timeseries 
endpoints are blocking.
+     *
+     * See QueryServer
+     */
+    public static final String MULTISTAGE_TIMESERIES_EXEC_CONFIG_PREFIX =
+        QUERY_EXECUTOR_CONFIG_PREFIX + "." + "timeseries";
+    public static final String DEFAULT_TIMESERIES_EXEC_CONFIG_PREFIX = 
"cached";
     /* End of query executor related configs */
 
     public static final String CONFIG_OF_TRANSFORM_FUNCTIONS = 
"pinot.server.transforms";


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to