This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new b061c555a4 Direct mse query server (#15445)
b061c555a4 is described below
commit b061c555a427efa8db83e77c342c6a027093d4c7
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Thu May 22 14:04:16 2025 +0200
Direct mse query server (#15445)
---
.../apache/pinot/query/runtime/QueryRunner.java | 117 +++---
.../runtime/executor/OpChainSchedulerService.java | 3 +-
.../pinot/query/service/server/QueryServer.java | 412 ++++++++++++---------
.../apache/pinot/query/QueryServerEnclosure.java | 4 +-
.../apache/pinot/spi/query/QueryThreadContext.java | 3 +-
.../apache/pinot/spi/utils/CommonConstants.java | 32 ++
6 files changed, 347 insertions(+), 224 deletions(-)
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 19282ae7a2..8609af8899 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.query.runtime;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import io.grpc.stub.StreamObserver;
@@ -28,6 +27,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
@@ -217,11 +217,6 @@ public class QueryRunner {
LOGGER.info("Initialized QueryRunner with hostname: {}, port: {}",
hostname, port);
}
- @VisibleForTesting
- public ExecutorService getExecutorService() {
- return _executorService;
- }
-
public void start() {
_mailboxService.start();
_leafQueryExecutor.start();
@@ -233,14 +228,27 @@ public class QueryRunner {
ExecutorServiceUtils.close(_executorService);
}
- /**
- * Execute a {@link StagePlan}.
- *
- * <p>This execution entry point should be asynchronously called by the
request handler and caller should not wait
- * for results/exceptions.</p>
- */
- public void processQuery(WorkerMetadata workerMetadata, StagePlan stagePlan,
Map<String, String> requestMetadata,
- @Nullable ThreadExecutionContext parentContext) {
+ /// Asynchronously executes a [StagePlan].
+ ///
+ /// This method will not block the current thread but use
[#_executorService] instead.
+ /// If any error happened during the asynchronous execution, an error block
will be sent to all receiver mailboxes.
+ public CompletableFuture<Void> processQuery(WorkerMetadata workerMetadata,
StagePlan stagePlan,
+ Map<String, String> requestMetadata, @Nullable ThreadExecutionContext
parentContext) {
+ Runnable runnable = () -> processQueryBlocking(workerMetadata, stagePlan,
requestMetadata, parentContext);
+ return CompletableFuture.runAsync(runnable, _executorService);
+ }
+
+ /// Executes a {@link StagePlan} pseudo-synchronously.
+ ///
+ /// First, the pipeline breaker is executed on the current thread. This is
the blocking part of the method.
+ /// If the pipeline breaker execution fails, the current thread will send
the error block to the receivers mailboxes.
+ ///
+ /// If the pipeline breaker success, the rest of the stage is asynchronously
executed on the [#_opChainScheduler].
+ private void processQueryBlocking(WorkerMetadata workerMetadata, StagePlan
stagePlan,
+ Map<String, String> requestMetadata, @Nullable ThreadExecutionContext
parentContext) {
+
MseWorkerThreadContext.setStageId(stagePlan.getStageMetadata().getStageId());
+ MseWorkerThreadContext.setWorkerId(workerMetadata.getWorkerId());
+
long requestId =
Long.parseLong(requestMetadata.get(MetadataKeys.REQUEST_ID));
long timeoutMs =
Long.parseLong(requestMetadata.get(QueryOptionKey.TIMEOUT_MS));
long deadlineMs = System.currentTimeMillis() + timeoutMs;
@@ -256,36 +264,8 @@ public class QueryRunner {
// Send error block to all the receivers if pipeline breaker fails
if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock()
!= null) {
ErrorMseBlock errorBlock = pipelineBreakerResult.getErrorBlock();
- int stageId = stageMetadata.getStageId();
- LOGGER.error("Error executing pipeline breaker for request: {}, stage:
{}, sending error block: {}", requestId,
- stageId, errorBlock);
- MailboxSendNode rootNode = (MailboxSendNode) stagePlan.getRootNode();
- List<RoutingInfo> routingInfos = new ArrayList<>();
- for (Integer receiverStageId : rootNode.getReceiverStageIds()) {
- List<MailboxInfo> receiverMailboxInfos =
-
workerMetadata.getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
- List<RoutingInfo> stageRoutingInfos =
- MailboxIdUtils.toRoutingInfos(requestId, stageId,
workerMetadata.getWorkerId(), receiverStageId,
- receiverMailboxInfos);
- routingInfos.addAll(stageRoutingInfos);
- }
- for (RoutingInfo routingInfo : routingInfos) {
- try {
- StatMap<MailboxSendOperator.StatKey> statMap = new
StatMap<>(MailboxSendOperator.StatKey.class);
- SendingMailbox sendingMailbox =
_mailboxService.getSendingMailbox(routingInfo.getHostname(),
- routingInfo.getPort(), routingInfo.getMailboxId(), deadlineMs,
statMap);
- // TODO: Here we are breaking the stats invariants, sending errors
without including the stats of the
- // current stage. We will need to fix this in future, but for
now, we are sending the error block without
- // the stats.
- sendingMailbox.send(errorBlock, Collections.emptyList());
- } catch (TimeoutException e) {
- LOGGER.warn("Timed out sending error block to mailbox: {} for
request: {}, stage: {}",
- routingInfo.getMailboxId(), requestId, stageId, e);
- } catch (Exception e) {
- LOGGER.error("Caught exception sending error block to mailbox: {}
for request: {}, stage: {}",
- routingInfo.getMailboxId(), requestId, stageId, e);
- }
- }
+ notifyErrorAfterSubmission(stageMetadata.getStageId(), errorBlock,
requestId, workerMetadata, stagePlan,
+ deadlineMs);
return;
}
@@ -295,12 +275,52 @@ public class QueryRunner {
workerMetadata, pipelineBreakerResult, parentContext,
_sendStats.getAsBoolean());
OpChain opChain;
if (workerMetadata.isLeafStageWorker()) {
- opChain =
- ServerPlanRequestUtils.compileLeafStage(executionContext, stagePlan,
_leafQueryExecutor, _executorService);
+ opChain = ServerPlanRequestUtils.compileLeafStage(executionContext,
stagePlan,
+ _leafQueryExecutor, _executorService);
} else {
opChain = PlanNodeToOpChain.convert(stagePlan.getRootNode(),
executionContext);
}
- _opChainScheduler.register(opChain);
+ try {
+ // This can fail if the executor rejects the task.
+ _opChainScheduler.register(opChain);
+ } catch (RuntimeException e) {
+ ErrorMseBlock errorBlock = ErrorMseBlock.fromException(e);
+ notifyErrorAfterSubmission(stageMetadata.getStageId(), errorBlock,
requestId, workerMetadata, stagePlan,
+ deadlineMs);
+ }
+ }
+
+ private void notifyErrorAfterSubmission(int stageId, ErrorMseBlock
errorBlock, long requestId,
+ WorkerMetadata workerMetadata, StagePlan stagePlan, long deadlineMs) {
+ LOGGER.error("Error executing pipeline breaker for request: {}, stage: {},
sending error block: {}", requestId,
+ stageId, errorBlock);
+ MailboxSendNode rootNode = (MailboxSendNode) stagePlan.getRootNode();
+ List<RoutingInfo> routingInfos = new ArrayList<>();
+ for (Integer receiverStageId : rootNode.getReceiverStageIds()) {
+ List<MailboxInfo> receiverMailboxInfos =
+
workerMetadata.getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
+ List<RoutingInfo> stageRoutingInfos =
+ MailboxIdUtils.toRoutingInfos(requestId, stageId,
workerMetadata.getWorkerId(), receiverStageId,
+ receiverMailboxInfos);
+ routingInfos.addAll(stageRoutingInfos);
+ }
+ for (RoutingInfo routingInfo : routingInfos) {
+ try {
+ StatMap<MailboxSendOperator.StatKey> statMap = new
StatMap<>(MailboxSendOperator.StatKey.class);
+ SendingMailbox sendingMailbox =
_mailboxService.getSendingMailbox(routingInfo.getHostname(),
+ routingInfo.getPort(), routingInfo.getMailboxId(), deadlineMs,
statMap);
+ // TODO: Here we are breaking the stats invariants, sending errors
without including the stats of the
+ // current stage. We will need to fix this in future, but for now, we
are sending the error block without
+ // the stats.
+ sendingMailbox.send(errorBlock, Collections.emptyList());
+ } catch (TimeoutException e) {
+ LOGGER.warn("Timed out sending error block to mailbox: {} for request:
{}, stage: {}",
+ routingInfo.getMailboxId(), requestId, stageId, e);
+ } catch (Exception e) {
+ LOGGER.error("Caught exception sending error block to mailbox: {} for
request: {}, stage: {}",
+ routingInfo.getMailboxId(), requestId, stageId, e);
+ }
+ }
}
/**
@@ -463,8 +483,7 @@ public class QueryRunner {
Map<String, String> opChainMetadata =
consolidateMetadata(stageMetadata.getCustomProperties(), requestMetadata);
if (PipelineBreakerExecutor.hasPipelineBreakers(stagePlan)) {
- // TODO: Support pipeline breakers before merging this feature.
- // See https://github.com/apache/pinot/pull/13733#discussion_r1752031714
+ //TODO: See
https://github.com/apache/pinot/pull/13733#discussion_r1752031714
LOGGER.error("Pipeline breaker is not supported in explain query");
return stagePlan;
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
index f3daece353..2ed131b782 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/OpChainSchedulerService.java
@@ -75,8 +75,6 @@ public class OpChainSchedulerService {
}
public void register(OpChain operatorChain) {
- _opChainCache.put(operatorChain.getId(), operatorChain.getRoot());
-
Future<?> scheduledFuture = _executorService.submit(new TraceRunnable() {
@Override
public void runJob() {
@@ -114,6 +112,7 @@ public class OpChainSchedulerService {
}
}
});
+ _opChainCache.put(operatorChain.getId(), operatorChain.getRoot());
_submittedOpChainMap.put(operatorChain.getId(), scheduledFuture);
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
index 6ac7ca5598..d5f14d6da5 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/server/QueryServer.java
@@ -30,13 +30,10 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
-import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.commons.io.output.UnsynchronizedByteArrayOutputStream;
import org.apache.pinot.common.config.TlsConfig;
@@ -54,9 +51,9 @@ import org.apache.pinot.query.routing.StagePlan;
import org.apache.pinot.query.routing.WorkerMetadata;
import org.apache.pinot.query.runtime.QueryRunner;
import org.apache.pinot.query.runtime.plan.MultiStageQueryStats;
-import org.apache.pinot.query.service.dispatch.QueryDispatcher;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.executor.ExecutorServiceUtils;
import org.apache.pinot.spi.executor.MetricsExecutor;
import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.trace.Tracing;
@@ -65,9 +62,12 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * {@link QueryServer} is the GRPC server that accepts query plan requests
sent from {@link QueryDispatcher}.
- */
+/// QueryServer is theGRPC server that accepts MSE query plan request sent from
+/// [org.apache.pinot.query.service.dispatch.QueryDispatcher].
+///
+/// All endpoints are being called by the GRPC server threads, which must
never be blocked.
+/// In some situations we need to run blocking code to resolve these requests,
so we need to move the work to a
+/// different thread pool. See each method for details.
public class QueryServer extends PinotQueryWorkerGrpc.PinotQueryWorkerImplBase
{
private static final Logger LOGGER =
LoggerFactory.getLogger(QueryServer.class);
// TODO: Inbound messages can get quite large because we send the entire
stage metadata map in each call.
@@ -81,7 +81,20 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
// query submission service is only used for plan submission for now.
// TODO: with complex query submission logic we should allow asynchronous
query submission return instead of
// directly return from submission response observer.
- private final ExecutorService _querySubmissionExecutorService;
+ private final ExecutorService _submissionExecutorService;
+ /// The thread executor service used to run explain plan requests.
+ /// In order to calculate explain plans we need to calculate the physical
plan for each worker, which is a
+ /// non-blocking but heavier CPU operation than the normal query submission.
+ /// This is why we use a different thread pool for this operation: We don't
want to block the query submission thread
+ /// pool.
+ /// Given the QPS for explain plans is much lower than the normal query
submission, we can use a single thread for
+ /// this.
+ private final ExecutorService _explainExecutorService;
+ /// The thread executor service used to run time series query requests.
+ ///
+ /// In order to run time series we need to call
[QueryRunner#processTimeSeriesQuery] which contrary to the normal MSE
+ /// methods, is a blocking method. This is why we need to use a different
(and usually cached) thread pool for this.
+ private final ExecutorService _timeSeriesExecutorService;
private Server _server = null;
@@ -95,36 +108,41 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
_queryRunner = queryRunner;
_tlsConfig = tlsConfig;
- ExecutorService baseExecutor = Executors.newCachedThreadPool(
- new NamedThreadFactory("query_submission_executor_on_" + _port +
"_port"));
+ ExecutorService baseExecutorService = ExecutorServiceUtils.create(config,
+ CommonConstants.Server.MULTISTAGE_SUBMISSION_EXEC_CONFIG_PREFIX,
+ "query_submission_executor_on_" + _port + "_port",
+ CommonConstants.Server.DEFAULT_MULTISTAGE_SUBMISSION_EXEC_TYPE);
ServerMetrics serverMetrics = ServerMetrics.get();
- MetricsExecutor withMetrics = new MetricsExecutor(
- baseExecutor,
+ _submissionExecutorService = new MetricsExecutor(
+ baseExecutorService,
serverMetrics.getMeteredValue(ServerMeter.MULTI_STAGE_SUBMISSION_STARTED_TASKS),
serverMetrics.getMeteredValue(ServerMeter.MULTI_STAGE_SUBMISSION_COMPLETED_TASKS));
- _querySubmissionExecutorService =
MseWorkerThreadContext.contextAwareExecutorService(
- QueryThreadContext.contextAwareExecutorService(withMetrics));
+
+ NamedThreadFactory explainThreadFactory =
+ new NamedThreadFactory("query_explain_on_" + _port + "_port");
+ _explainExecutorService =
Executors.newSingleThreadExecutor(explainThreadFactory);
+
+ ExecutorService baseTimeSeriesExecutorService =
ExecutorServiceUtils.create(config,
+ CommonConstants.Server.MULTISTAGE_TIMESERIES_EXEC_CONFIG_PREFIX,
+ "query_ts_on_" + _port + "_port",
+ CommonConstants.Server.DEFAULT_TIMESERIES_EXEC_CONFIG_PREFIX);
+ _timeSeriesExecutorService =
MseWorkerThreadContext.contextAwareExecutorService(
+
QueryThreadContext.contextAwareExecutorService(baseTimeSeriesExecutorService));
}
public void start() {
LOGGER.info("Starting QueryServer");
try {
if (_server == null) {
+ ServerBuilder<?> serverBuilder;
if (_tlsConfig == null) {
- _server = ServerBuilder
- .forPort(_port)
- .addService(this)
- .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
- .build();
+ serverBuilder = ServerBuilder.forPort(_port);
} else {
- _server = NettyServerBuilder
- .forPort(_port)
- .addService(this)
- .sslContext(GrpcQueryServer.buildGrpcSslContext(_tlsConfig))
- .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
- .build();
+ serverBuilder = NettyServerBuilder.forPort(_port)
+ .sslContext(GrpcQueryServer.buildGrpcSslContext(_tlsConfig));
}
+ _server = buildGrpcServer(serverBuilder);
LOGGER.info("Initialized QueryServer on port: {}", _port);
}
_queryRunner.start();
@@ -134,6 +152,15 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
}
}
+ private <T extends ServerBuilder<T>> Server buildGrpcServer(ServerBuilder<T>
builder) {
+ return builder
+ // By using directExecutor, GRPC doesn't need to manage its own
thread pool
+ .directExecutor()
+ .addService(this)
+ .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
+ .build();
+ }
+
public void shutdown() {
LOGGER.info("Shutting down QueryServer");
try {
@@ -142,12 +169,23 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
_server.shutdown();
_server.awaitTermination();
}
- _querySubmissionExecutorService.shutdown();
} catch (Exception e) {
throw new RuntimeException(e);
}
+
+ _submissionExecutorService.shutdown();
+ _explainExecutorService.shutdown();
+ _timeSeriesExecutorService.shutdown();
}
+ /// Submits a query for executions.
+ ///
+ /// The request is deserialized on the GRPC thread and the rest of the work
is done on the submission thread pool.
+ /// Given the submission code should be not blocking, we could directly use
the GRPC thread pool, but we decide to
+ /// use a different thread pool to be able to time out the submission in the
rare case it takes too long.
+ ///
+ // TODO: Study if that is actually needed. We could just use the GRPC thread
pool and timeout once the execution
+ // starts in the query runner.
@Override
public void submit(Worker.QueryRequest request,
StreamObserver<Worker.QueryResponse> responseObserver) {
Map<String, String> reqMetadata;
@@ -163,37 +201,92 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
return;
}
- try (QueryThreadContext.CloseableContext queryTlClosable =
QueryThreadContext.openFromRequestMetadata(reqMetadata);
+ long timeoutMs =
Long.parseLong(reqMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
+
+ CompletableFuture.runAsync(() -> submitInternal(request, reqMetadata),
_submissionExecutorService)
+ .orTimeout(timeoutMs, TimeUnit.MILLISECONDS)
+ .whenComplete((result, error) -> {
+ // this will always be called, either on the submission thread that
finished the last or in the caller
+ // (GRPC) thread in the improbable case all submission tasks
finished before the caller thread reaches
+ // this line
+ if (error != null) { // if there was an error submitting the
request, return an error response
+ try (QueryThreadContext.CloseableContext qCtx =
QueryThreadContext.openFromRequestMetadata(reqMetadata);
+ QueryThreadContext.CloseableContext mseCtx =
MseWorkerThreadContext.open()) {
+ long requestId = QueryThreadContext.getRequestId();
+ LOGGER.error("Caught exception while submitting request: {}",
requestId, error);
+ String errorMsg = "Caught exception while submitting request: "
+ error.getMessage();
+ responseObserver.onNext(Worker.QueryResponse.newBuilder()
+
.putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR,
errorMsg)
+ .build());
+ responseObserver.onCompleted();
+ }
+ } else { // if the request was submitted successfully, return a
success response
+ responseObserver.onNext(
+ Worker.QueryResponse.newBuilder()
+
.putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_OK, "")
+ .build());
+ responseObserver.onCompleted();
+ }
+ });
+ }
+
+ /// Iterates over all the stage plans in the request and submits each worker
to the query runner.
+ ///
+ /// This method should be called from the submission thread pool.
+ /// If any exception is thrown while submitting a worker, all the workers
that have been started are cancelled and the
+ /// exception is thrown.
+ ///
+ /// Remember that this method doesn't track the status of the workers their
self. In case of error while the worker is
+ /// running, the exception will be managed by the worker, which usually send
the error downstream to the receiver
+ /// mailboxes. Therefore these error won't be reported here.
+ private void submitInternal(Worker.QueryRequest request, Map<String, String>
reqMetadata) {
+ try (QueryThreadContext.CloseableContext qTlClosable =
QueryThreadContext.openFromRequestMetadata(reqMetadata);
QueryThreadContext.CloseableContext mseTlCloseable =
MseWorkerThreadContext.open()) {
- long requestId = QueryThreadContext.getRequestId();
QueryThreadContext.setQueryEngine("mse");
+ List<Worker.StagePlan> protoStagePlans = request.getStagePlanList();
- Tracing.ThreadAccountantOps.setupRunner(Long.toString(requestId),
ThreadExecutionContext.TaskType.MSE);
- ThreadExecutionContext parentContext =
Tracing.getThreadAccountant().getThreadExecutionContext();
- try {
- forEachStage(request,
- (stagePlan, workerMetadata) -> {
- _queryRunner.processQuery(workerMetadata, stagePlan,
reqMetadata, parentContext);
- return null;
- },
- (ignored) -> {
- });
- } catch (ExecutionException | InterruptedException | TimeoutException |
RuntimeException e) {
- LOGGER.error("Caught exception while submitting request: {}",
requestId, e);
- String errorMsg = "Caught exception while submitting request: " +
e.getMessage();
- responseObserver.onNext(Worker.QueryResponse.newBuilder()
-
.putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_ERROR,
errorMsg)
- .build());
- responseObserver.onCompleted();
- return;
- } finally {
- Tracing.ThreadAccountantOps.clear();
+ List<CompletableFuture<Void>> startedWorkers = new ArrayList<>();
+
+ for (Worker.StagePlan protoStagePlan : protoStagePlans) {
+ StagePlan stagePlan = deserializePlan(protoStagePlan);
+ StageMetadata stageMetadata = stagePlan.getStageMetadata();
+ List<WorkerMetadata> workerMetadataList =
stageMetadata.getWorkerMetadataList();
+
+ for (WorkerMetadata workerMetadata : workerMetadataList) {
+ try {
+ CompletableFuture<Void> job = submitWorker(workerMetadata,
stagePlan, reqMetadata);
+ startedWorkers.add(job);
+ } catch (RuntimeException e) {
+ startedWorkers.forEach(worker -> worker.cancel(true));
+ throw e;
+ }
+ }
}
- responseObserver.onNext(
- Worker.QueryResponse.newBuilder()
-
.putMetadata(CommonConstants.Query.Response.ServerResponseStatus.STATUS_OK, "")
- .build());
- responseObserver.onCompleted();
+ }
+ }
+
+ /// Submits creates a new worker by submitting the stage plan to the query
runner.
+ ///
+ /// The returned CompletableFuture will be completed when the opchain
starts, which may take a while if there are
+ /// pipeline breakers.
+ /// Remember that the completable future may (and usually will) be completed
*before* the opchain is finished.
+ ///
+ /// If there is an error scheduling creating the worker (ie too many workers
are already running), the
+ /// CompletableFuture will be completed exceptionally. In that case it is up
to the caller to deal with the error
+ /// (normally cancelling other already started workers and sending the error
through GRPC)
+ private CompletableFuture<Void> submitWorker(WorkerMetadata workerMetadata,
StagePlan stagePlan,
+ Map<String, String> reqMetadata) {
+ String requestIdStr = Long.toString(QueryThreadContext.getRequestId());
+
+ //TODO: Verify if this matches with what OOM protection expects. This
method will not block for the query to
+ // finish, so it may be breaking some of the OOM protection assumptions.
+ Tracing.ThreadAccountantOps.setupRunner(requestIdStr,
ThreadExecutionContext.TaskType.MSE);
+ ThreadExecutionContext parentContext =
Tracing.getThreadAccountant().getThreadExecutionContext();
+
+ try {
+ return _queryRunner.processQuery(workerMetadata, stagePlan, reqMetadata,
parentContext);
+ } finally {
+ Tracing.ThreadAccountantOps.clear();
}
}
@@ -212,49 +305,103 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
return;
}
- try (QueryThreadContext.CloseableContext queryTlClosable =
QueryThreadContext.openFromRequestMetadata(reqMetadata);
- QueryThreadContext.CloseableContext mseTlCloseable =
MseWorkerThreadContext.open()) {
- try {
- forEachStage(request,
- (stagePlan, workerMetadata) ->
_queryRunner.explainQuery(workerMetadata, stagePlan, reqMetadata),
- (plans) -> {
- Worker.ExplainResponse.Builder builder =
Worker.ExplainResponse.newBuilder();
- for (StagePlan plan : plans) {
- ByteString rootAsBytes =
PlanNodeSerializer.process(plan.getRootNode()).toByteString();
-
- StageMetadata metadata = plan.getStageMetadata();
- List<Worker.WorkerMetadata> protoWorkerMetadataList =
-
QueryPlanSerDeUtils.toProtoWorkerMetadataList(metadata.getWorkerMetadataList());
-
-
builder.addStagePlan(Worker.StagePlan.newBuilder().setRootNode(rootAsBytes).setStageMetadata(
-
Worker.StageMetadata.newBuilder().setStageId(metadata.getStageId())
- .addAllWorkerMetadata(protoWorkerMetadataList)
-
.setCustomProperty(QueryPlanSerDeUtils.toProtoProperties(metadata.getCustomProperties()))));
- }
-
builder.putMetadata(CommonConstants.Explain.Response.ServerResponseStatus.STATUS_OK,
"");
- responseObserver.onNext(builder.build());
- });
- } catch (ExecutionException | InterruptedException | TimeoutException |
RuntimeException e) {
- long requestId = QueryThreadContext.getRequestId();
- LOGGER.error("Caught exception while submitting request: {}",
requestId, e);
- String errorMsg = "Caught exception while submitting request: " +
e.getMessage();
- responseObserver.onNext(Worker.ExplainResponse.newBuilder()
-
.putMetadata(CommonConstants.Explain.Response.ServerResponseStatus.STATUS_ERROR,
errorMsg)
+ long timeoutMs =
Long.parseLong(reqMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
+
+ CompletableFuture.runAsync(() -> explainInternal(request,
responseObserver, reqMetadata), _explainExecutorService)
+ .orTimeout(timeoutMs, TimeUnit.MILLISECONDS)
+ .handle((result, error) -> {
+ if (error != null) {
+ try (QueryThreadContext.CloseableContext qCtx =
QueryThreadContext.openFromRequestMetadata(reqMetadata);
+ QueryThreadContext.CloseableContext mseCtx =
MseWorkerThreadContext.open()) {
+ long requestId = QueryThreadContext.getRequestId();
+ LOGGER.error("Caught exception while submitting request: {}",
requestId, error);
+ String errorMsg = "Caught exception while submitting request: " +
error.getMessage();
+ synchronized (responseObserver) {
+ responseObserver.onNext(Worker.ExplainResponse.newBuilder()
+
.putMetadata(CommonConstants.Explain.Response.ServerResponseStatus.STATUS_ERROR,
errorMsg)
.build());
- responseObserver.onCompleted();
- return;
+ responseObserver.onCompleted();
+ }
+ }
+ } else {
+ synchronized (responseObserver) {
+ responseObserver.onNext(
+ Worker.ExplainResponse.newBuilder()
+
.putMetadata(CommonConstants.Explain.Response.ServerResponseStatus.STATUS_OK,
"")
+ .build());
+ responseObserver.onCompleted();
+ }
+ }
+ return null;
+ });
+ }
+
+ /// Iterates over all the stage plans in the request and explains the query
for each worker.
+ ///
+ /// This method should be called from the submission thread pool.
+ /// Each stage plan will be sent as a new response to the response observer.
+ /// In case of failure, this method will fail with an exception and the
response observer won't be notified.
+ ///
+ /// Remember that in case of error some stages may have been explained and
some not.
+ private void explainInternal(Worker.QueryRequest request,
StreamObserver<Worker.ExplainResponse> responseObserver,
+ Map<String, String> reqMetadata) {
+ try (QueryThreadContext.CloseableContext qTlClosable =
QueryThreadContext.openFromRequestMetadata(reqMetadata);
+ QueryThreadContext.CloseableContext mseTlCloseable =
MseWorkerThreadContext.open()) {
+ // Explain the stage for each worker
+ BiFunction<StagePlan, WorkerMetadata, StagePlan> explainFun =
(stagePlan, workerMetadata) ->
+ _queryRunner.explainQuery(workerMetadata, stagePlan, reqMetadata);
+
+ List<Worker.StagePlan> protoStagePlans = request.getStagePlanList();
+
+ for (Worker.StagePlan protoStagePlan : protoStagePlans) {
+ StagePlan stagePlan = deserializePlan(protoStagePlan);
+ StageMetadata stageMetadata = stagePlan.getStageMetadata();
+ List<WorkerMetadata> workerMetadataList =
stageMetadata.getWorkerMetadataList();
+
+ Worker.ExplainResponse stageResponse = calculateExplainPlanForStage(
+ protoStagePlan, workerMetadataList.toArray(new WorkerMetadata[0]),
reqMetadata);
+ synchronized (responseObserver) {
+ responseObserver.onNext(stageResponse);
+ }
}
- responseObserver.onNext(
- Worker.ExplainResponse.newBuilder()
-
.putMetadata(CommonConstants.Explain.Response.ServerResponseStatus.STATUS_OK,
"")
- .build());
- responseObserver.onCompleted();
}
}
+ /// Calculates the explain plan for a stage, iterating over each worker and
merging the results.
+ ///
+ /// The result is then returned as result.
+ private Worker.ExplainResponse calculateExplainPlanForStage(Worker.StagePlan
protoStagePlan,
+ WorkerMetadata[] workerMetadataList, Map<String, String> reqMetadata) {
+ Worker.ExplainResponse.Builder builder =
Worker.ExplainResponse.newBuilder();
+ StagePlan stagePlan = deserializePlan(protoStagePlan);
+ for (WorkerMetadata workerMetadata : workerMetadataList) {
+ StagePlan explainPlan = _queryRunner.explainQuery(workerMetadata,
stagePlan, reqMetadata);
+
+ ByteString rootAsBytes =
PlanNodeSerializer.process(explainPlan.getRootNode()).toByteString();
+
+ StageMetadata metadata = explainPlan.getStageMetadata();
+ List<Worker.WorkerMetadata> protoWorkerMetadataList =
+
QueryPlanSerDeUtils.toProtoWorkerMetadataList(metadata.getWorkerMetadataList());
+
+ builder.addStagePlan(Worker.StagePlan.newBuilder()
+ .setRootNode(rootAsBytes)
+ .setStageMetadata(Worker.StageMetadata.newBuilder()
+ .setStageId(metadata.getStageId())
+ .addAllWorkerMetadata(protoWorkerMetadataList)
+
.setCustomProperty(QueryPlanSerDeUtils.toProtoProperties(metadata.getCustomProperties()))));
+ }
+
builder.putMetadata(CommonConstants.Explain.Response.ServerResponseStatus.STATUS_OK,
"");
+ return builder.build();
+ }
+
@Override
public void submitTimeSeries(Worker.TimeSeriesQueryRequest request,
StreamObserver<Worker.TimeSeriesResponse> responseObserver) {
+ CompletableFuture.runAsync(() -> submitTimeSeriesInternal(request,
responseObserver), _timeSeriesExecutorService);
+ }
+
+ private void submitTimeSeriesInternal(Worker.TimeSeriesQueryRequest request,
+ StreamObserver<Worker.TimeSeriesResponse> responseObserver) {
try (QueryThreadContext.CloseableContext queryTlClosable =
QueryThreadContext.open();
QueryThreadContext.CloseableContext mseTlCloseable =
MseWorkerThreadContext.open()) {
// TODO: populate the thread context with TSE information
@@ -263,6 +410,11 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
}
}
+ /// Executes a cancel request.
+ ///
+ /// Cancel requests should always be executed on different threads than the
submission threads to be sure that
+ /// the cancel request is not waiting for the query submission to finish.
Given these requests are not blocking, we
+ /// run them in the GRPC thread pool.
@Override
public void cancel(Worker.CancelRequest request,
StreamObserver<Worker.CancelResponse> responseObserver) {
long requestId = request.getRequestId();
@@ -272,6 +424,7 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
Worker.CancelResponse.Builder cancelBuilder =
Worker.CancelResponse.newBuilder();
for (Map.Entry<Integer, MultiStageQueryStats.StageStats.Closed>
statEntry : stats.entrySet()) {
+ // even we are using output streams here, these calls are non-blocking
because we use in memory output streams
try (UnsynchronizedByteArrayOutputStream baos = new
UnsynchronizedByteArrayOutputStream.Builder().get();
DataOutputStream daos = new DataOutputStream(baos)) {
statEntry.getValue().serialize(daos);
@@ -290,93 +443,14 @@ public class QueryServer extends
PinotQueryWorkerGrpc.PinotQueryWorkerImplBase {
responseObserver.onCompleted();
}
- private <W> void submitStage(
- Worker.StagePlan protoStagePlan,
- BiFunction<StagePlan, WorkerMetadata, W> submitFunction,
- Consumer<W> consumer) {
- StagePlan stagePlan;
+ private StagePlan deserializePlan(Worker.StagePlan protoStagePlan) {
try {
- stagePlan = QueryPlanSerDeUtils.fromProtoStagePlan(protoStagePlan);
+ return QueryPlanSerDeUtils.fromProtoStagePlan(protoStagePlan);
} catch (Exception e) {
long requestId = QueryThreadContext.getRequestId();
throw new RuntimeException(
String.format("Caught exception while deserializing stage plan for
request: %d, stage: %d", requestId,
protoStagePlan.getStageMetadata().getStageId()), e);
}
- StageMetadata stageMetadata = stagePlan.getStageMetadata();
- MseWorkerThreadContext.setStageId(stageMetadata.getStageId());
- List<WorkerMetadata> workerMetadataList =
stageMetadata.getWorkerMetadataList();
- int numWorkers = workerMetadataList.size();
- CompletableFuture<W>[] workerSubmissionStubs = new
CompletableFuture[numWorkers];
- for (int j = 0; j < numWorkers; j++) {
- WorkerMetadata workerMetadata = workerMetadataList.get(j);
- workerSubmissionStubs[j] = CompletableFuture.supplyAsync(
- () -> {
- MseWorkerThreadContext.setWorkerId(workerMetadata.getWorkerId());
- return submitFunction.apply(stagePlan, workerMetadata);
- },
- _querySubmissionExecutorService);
- }
-
- try {
- long deadlineMs = QueryThreadContext.getDeadlineMs();
- for (int j = 0; j < numWorkers; j++) {
- W workerResult = workerSubmissionStubs[j].get(deadlineMs -
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
- consumer.accept(workerResult);
- }
- } catch (TimeoutException e) {
- long requestId = QueryThreadContext.getRequestId();
- throw new RuntimeException(
- "Timeout while submitting request: " + requestId + ", stage: " +
stageMetadata.getStageId(), e);
- } catch (Exception e) {
- long requestId = QueryThreadContext.getRequestId();
- throw new RuntimeException(
- "Caught exception while submitting request: " + requestId + ",
stage: " + stageMetadata.getStageId()
- + ": " + e.getMessage(), e);
- } finally {
- for (CompletableFuture<?> future : workerSubmissionStubs) {
- if (!future.isDone()) {
- future.cancel(true);
- }
- }
- }
- }
-
- /**
- * Submits each stage in the request to the workers and waits for all
workers to complete,
- * applying the submitFunction to each worker and the consumer to the list
of results.
- *
- * @param request the query request
- * @param submitFunction the function to apply to each worker.
- * @param consumer the consumer to apply to the list of results. It can just
ignore the results if not needed.
- * @param <W> the type of the result returned by the submitFunction.
- */
- <W> void forEachStage(Worker.QueryRequest request,
- BiFunction<StagePlan, WorkerMetadata, W> submitFunction,
Consumer<List<W>> consumer)
- throws ExecutionException, InterruptedException, TimeoutException {
- List<Worker.StagePlan> protoStagePlans = request.getStagePlanList();
- int numStages = protoStagePlans.size();
- List<CompletableFuture<List<W>>> stageSubmissionStubs = new
ArrayList<>(numStages);
- for (Worker.StagePlan protoStagePlan : protoStagePlans) {
- CompletableFuture<List<W>> future = CompletableFuture.supplyAsync(() -> {
- List<W> plans = new ArrayList<>();
- submitStage(protoStagePlan, submitFunction, plans::add);
- return plans;
- }, _querySubmissionExecutorService);
- stageSubmissionStubs.add(future);
- }
- try {
- long deadlineMs = QueryThreadContext.getDeadlineMs();
- for (CompletableFuture<List<W>> stageSubmissionStub :
stageSubmissionStubs) {
- List<W> plans = stageSubmissionStub.get(deadlineMs -
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
- consumer.accept(plans);
- }
- } finally {
- for (CompletableFuture<?> future : stageSubmissionStubs) {
- if (!future.isDone()) {
- future.cancel(true);
- }
- }
- }
}
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
index 7308e63b56..a00be6155c 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/QueryServerEnclosure.java
@@ -80,9 +80,7 @@ public class QueryServerEnclosure {
Map<String, String> requestMetadataMap, ThreadExecutionContext
parentContext) {
try (QueryThreadContext.CloseableContext closeMe1 =
QueryThreadContext.openFromRequestMetadata(requestMetadataMap);
QueryThreadContext.CloseableContext closeMe2 =
MseWorkerThreadContext.open()) {
- return CompletableFuture.runAsync(
- () -> _queryRunner.processQuery(workerMetadata, stagePlan,
requestMetadataMap, parentContext),
- _queryRunner.getExecutorService());
+ return _queryRunner.processQuery(workerMetadata, stagePlan,
requestMetadataMap, parentContext);
}
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java
index 1355fb7196..805f0ad434 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java
@@ -85,6 +85,7 @@ public class QueryThreadContext {
String mode =
conf.getProperty(CommonConstants.Query.CONFIG_OF_QUERY_CONTEXT_MODE);
if ("strict".equalsIgnoreCase(mode)) {
_strictMode = true;
+ return;
}
if (mode != null && !mode.isEmpty()) {
throw new IllegalArgumentException("Invalid value '" + mode + "' for "
@@ -111,7 +112,7 @@ public class QueryThreadContext {
String errorMessage = "QueryThreadContext is not initialized";
if (_strictMode) {
LOGGER.error(errorMessage);
- throw new IllegalStateException("QueryThreadContext is not
initialized");
+ throw new IllegalStateException(errorMessage);
} else {
LOGGER.debug(errorMessage);
// in non-strict mode, return the fake instance
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 0ea24be9ad..2def9e6b57 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -907,15 +907,47 @@ public class CommonConstants {
public static final int DEFAULT_MSE_MIN_GROUP_TRIM_SIZE = 5000;
// TODO: Merge this with "mse"
+ /**
+ * The ExecutorServiceProvider to use for execution threads, which are the
ones that execute
+ * MultiStageOperators (and SSE operators in the leaf stages).
+ *
+ * It is recommended to use cached. In case fixed is used, it should use a
large enough number of threads or
+ * parent operators may consume all threads.
+ * In Java 21 or newer, virtual threads are a good solution. Although
Apache Pinot doesn't include this option yet,
+ * it is trivial to implement that plugin.
+ *
+ * See QueryRunner
+ */
public static final String MULTISTAGE_EXECUTOR = "multistage.executor";
public static final String MULTISTAGE_EXECUTOR_CONFIG_PREFIX =
QUERY_EXECUTOR_CONFIG_PREFIX + "." + MULTISTAGE_EXECUTOR;
public static final String DEFAULT_MULTISTAGE_EXECUTOR_TYPE = "cached";
+ /**
+ * The ExecutorServiceProvider to be used for submission threads, which
are the ones
+ * that receive requests in protobuf and transform them into
MultiStageOperators.
+ *
+ * It is recommended to use a fixed thread pool, given submission code
should not block.
+ *
+ * See QueryServer
+ */
+ public static final String MULTISTAGE_SUBMISSION_EXEC_CONFIG_PREFIX =
+ QUERY_EXECUTOR_CONFIG_PREFIX + "." + "multistage.submission";
+ public static final String DEFAULT_MULTISTAGE_SUBMISSION_EXEC_TYPE =
"fixed";
@Deprecated
public static final String CONFIG_OF_QUERY_EXECUTOR_OPCHAIN_EXECUTOR =
MULTISTAGE_EXECUTOR_CONFIG_PREFIX;
@Deprecated
public static final String DEFAULT_QUERY_EXECUTOR_OPCHAIN_EXECUTOR =
DEFAULT_MULTISTAGE_EXECUTOR_TYPE;
+ /**
+ * The ExecutorServiceProvider to be used for timeseries threads.
+ *
+ * It is recommended to use a cached thread pool, given timeseries
endpoints are blocking.
+ *
+ * See QueryServer
+ */
+ public static final String MULTISTAGE_TIMESERIES_EXEC_CONFIG_PREFIX =
+ QUERY_EXECUTOR_CONFIG_PREFIX + "." + "timeseries";
+ public static final String DEFAULT_TIMESERIES_EXEC_CONFIG_PREFIX =
"cached";
/* End of query executor related configs */
public static final String CONFIG_OF_TRANSFORM_FUNCTIONS =
"pinot.server.transforms";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]