This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 2c48780500 Introduce MSE active and passive timeouts (#16075)
2c48780500 is described below
commit 2c48780500613be076d97c7a8f64c546c1ef3509
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Wed Jul 2 11:29:05 2025 +0200
Introduce MSE active and passive timeouts (#16075)
---
.../MultiStageBrokerRequestHandler.java | 16 +++-
.../common/utils/config/QueryOptionsUtils.java | 6 ++
.../apache/pinot/query/runtime/QueryRunner.java | 31 +++----
.../operator/BaseMailboxReceiveOperator.java | 9 ++
.../pinot/query/runtime/operator/LeafOperator.java | 13 ++-
.../runtime/operator/MailboxSendOperator.java | 10 +-
.../query/runtime/operator/MultiStageOperator.java | 25 ++++-
.../utils/BlockingMultiStreamConsumer.java | 2 +-
.../runtime/plan/OpChainExecutionContext.java | 60 +++++++++++-
.../plan/pipeline/PipelineBreakerExecutor.java | 31 ++++++-
.../plan/server/ServerPlanRequestUtils.java | 4 +-
.../query/service/dispatch/QueryDispatcher.java | 37 ++++++--
.../plan/pipeline/PipelineBreakerExecutorTest.java | 2 +-
.../query/runtime/queries/QueryRunnerTestBase.java | 7 +-
.../apache/pinot/spi/query/QueryThreadContext.java | 103 +++++++++++++++++----
.../apache/pinot/spi/utils/CommonConstants.java | 3 +
16 files changed, 298 insertions(+), 61 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index bd42a81d77..78513ef01c 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -128,6 +128,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
private final boolean _explainAskingServerDefault;
private final MultiStageQueryThrottler _queryThrottler;
private final ExecutorService _queryCompileExecutor;
+ protected final long _extraPassiveTimeoutMs;
public MultiStageBrokerRequestHandler(PinotConfiguration config, String
brokerId, BrokerRoutingManager routingManager,
AccessControlFactory accessControlFactory, QueryQuotaManager
queryQuotaManager, TableCache tableCache,
@@ -141,6 +142,10 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
CommonConstants.Helix.DEFAULT_MULTI_STAGE_ENGINE_TLS_ENABLED) ?
TlsUtils.extractTlsConfig(config,
CommonConstants.Broker.BROKER_TLS_PREFIX) : null;
+ _extraPassiveTimeoutMs = config.getProperty(
+ CommonConstants.Broker.CONFIG_OF_EXTRA_PASSIVE_TIMEOUT_MS,
+ CommonConstants.Broker.DEFAULT_EXTRA_PASSIVE_TIMEOUT_MS);
+
failureDetector.registerUnhealthyServerRetrier(this::retryUnhealthyServer);
long cancelMillis = config.getProperty(
CommonConstants.MultiStageQueryRunner.KEY_OF_CANCEL_TIMEOUT_MS,
@@ -295,7 +300,11 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
throws QueryException, WebApplicationException {
_queryLogger.log(requestId, query);
- long queryTimeoutMs = getTimeout(sqlNodeAndOptions.getOptions());
+ Map<String, String> options = sqlNodeAndOptions.getOptions();
+ long queryTimeoutMs = getTimeout(options);
+ QueryThreadContext.setActiveDeadlineMs(System.currentTimeMillis() +
queryTimeoutMs);
+ QueryThreadContext.setPassiveDeadlineMs(System.currentTimeMillis() +
queryTimeoutMs + getPassiveTimeout(options));
+
Timer queryTimer = new Timer(queryTimeoutMs, TimeUnit.MILLISECONDS);
try (QueryEnvironment.CompiledQuery compiledQuery =
@@ -421,6 +430,11 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
return timeoutMsFromQueryOption != null ? timeoutMsFromQueryOption :
_brokerTimeoutMs;
}
+ private long getPassiveTimeout(Map<String, String> queryOptions) {
+ Long passiveTimeoutMsFromQueryOption =
QueryOptionsUtils.getPassiveTimeoutMs(queryOptions);
+ return passiveTimeoutMsFromQueryOption != null ?
passiveTimeoutMsFromQueryOption : _extraPassiveTimeoutMs;
+ }
+
/**
* Explains the query and returns the broker response.
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 1252b29cee..63523f0af4 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -112,6 +112,12 @@ public class QueryOptionsUtils {
return checkedParseLongPositive(QueryOptionKey.TIMEOUT_MS,
timeoutMsString);
}
+ @Nullable
+ public static Long getPassiveTimeoutMs(Map<String, String> queryOptions) {
+ String passiveTimeoutMsString =
queryOptions.get(QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS);
+ return checkedParseLong(QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS,
passiveTimeoutMsString, 0);
+ }
+
@Nullable
public static Long getMaxServerResponseSizeBytes(Map<String, String>
queryOptions) {
String responseSize =
queryOptions.get(QueryOptionKey.MAX_SERVER_RESPONSE_SIZE_BYTES);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
index 1706187d4a..d98f919f29 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java
@@ -260,30 +260,25 @@ public class QueryRunner {
MseWorkerThreadContext.setStageId(stagePlan.getStageMetadata().getStageId());
MseWorkerThreadContext.setWorkerId(workerMetadata.getWorkerId());
- long requestId =
Long.parseLong(requestMetadata.get(MetadataKeys.REQUEST_ID));
- long timeoutMs =
Long.parseLong(requestMetadata.get(QueryOptionKey.TIMEOUT_MS));
- long deadlineMs = System.currentTimeMillis() + timeoutMs;
-
StageMetadata stageMetadata = stagePlan.getStageMetadata();
Map<String, String> opChainMetadata =
consolidateMetadata(stageMetadata.getCustomProperties(), requestMetadata);
// run pre-stage execution for all pipeline breakers
PipelineBreakerResult pipelineBreakerResult =
- PipelineBreakerExecutor.executePipelineBreakers(_opChainScheduler,
_mailboxService, workerMetadata, stagePlan,
- opChainMetadata, requestId, deadlineMs, parentContext,
_sendStats.getAsBoolean());
+
PipelineBreakerExecutor.executePipelineBreakersFromQueryContext(_opChainScheduler,
_mailboxService,
+ workerMetadata, stagePlan, opChainMetadata, parentContext,
_sendStats.getAsBoolean());
// Send error block to all the receivers if pipeline breaker fails
if (pipelineBreakerResult != null && pipelineBreakerResult.getErrorBlock()
!= null) {
ErrorMseBlock errorBlock = pipelineBreakerResult.getErrorBlock();
- notifyErrorAfterSubmission(stageMetadata.getStageId(), errorBlock,
requestId, workerMetadata, stagePlan,
- deadlineMs);
+ notifyErrorAfterSubmission(stageMetadata.getStageId(), errorBlock,
workerMetadata, stagePlan);
return;
}
// run OpChain
- OpChainExecutionContext executionContext =
- new OpChainExecutionContext(_mailboxService, requestId, deadlineMs,
opChainMetadata, stageMetadata,
- workerMetadata, pipelineBreakerResult, parentContext,
_sendStats.getAsBoolean());
+ OpChainExecutionContext executionContext =
OpChainExecutionContext.fromQueryContext(_mailboxService,
+ opChainMetadata, stageMetadata, workerMetadata, pipelineBreakerResult,
parentContext,
+ _sendStats.getAsBoolean());
OpChain opChain;
if (workerMetadata.isLeafStageWorker()) {
Map<String, String> rlsFilters =
RlsUtils.extractRlsFilters(requestMetadata);
@@ -298,13 +293,13 @@ public class QueryRunner {
_opChainScheduler.register(opChain);
} catch (RuntimeException e) {
ErrorMseBlock errorBlock = ErrorMseBlock.fromException(e);
- notifyErrorAfterSubmission(stageMetadata.getStageId(), errorBlock,
requestId, workerMetadata, stagePlan,
- deadlineMs);
+ notifyErrorAfterSubmission(stageMetadata.getStageId(), errorBlock,
workerMetadata, stagePlan);
}
}
- private void notifyErrorAfterSubmission(int stageId, ErrorMseBlock
errorBlock, long requestId,
- WorkerMetadata workerMetadata, StagePlan stagePlan, long deadlineMs) {
+ private void notifyErrorAfterSubmission(int stageId, ErrorMseBlock
errorBlock,
+ WorkerMetadata workerMetadata, StagePlan stagePlan) {
+ long requestId = QueryThreadContext.getRequestId();
LOGGER.error("Error executing pipeline breaker for request: {}, stage: {},
sending error block: {}", requestId,
stageId, errorBlock);
MailboxSendNode rootNode = (MailboxSendNode) stagePlan.getRootNode();
@@ -317,6 +312,7 @@ public class QueryRunner {
receiverMailboxInfos);
routingInfos.addAll(stageRoutingInfos);
}
+ long deadlineMs = QueryThreadContext.getPassiveDeadlineMs();
for (RoutingInfo routingInfo : routingInfos) {
try {
StatMap<MailboxSendOperator.StatKey> statMap = new
StatMap<>(MailboxSendOperator.StatKey.class);
@@ -525,9 +521,8 @@ public class QueryRunner {
}
};
// compile OpChain
- OpChainExecutionContext executionContext =
- new OpChainExecutionContext(_mailboxService, requestId, deadlineMs,
opChainMetadata, stageMetadata,
- workerMetadata, null, null, false);
+ OpChainExecutionContext executionContext =
OpChainExecutionContext.fromQueryContext(_mailboxService,
+ opChainMetadata, stageMetadata, workerMetadata, null, null, false);
OpChain opChain =
ServerPlanRequestUtils.compileLeafStage(executionContext, stagePlan,
_leafQueryExecutor, _executorService,
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
index 8c2001b3f9..71bee7bfa4 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/BaseMailboxReceiveOperator.java
@@ -131,6 +131,15 @@ public abstract class BaseMailboxReceiveOperator extends
MultiStageOperator {
}
}
+ @Override
+ protected void sampleAndCheckInterruption() {
+ // mailbox receive operator uses passive deadline instead of the active
one because it is not an active operator
+ // as it just waits for data from the mailbox.
+ // This way if timeout is reached, it will be less probable to hit the
timeout here, on the stage waiting for data,
+ // than in the operator that is actively processing the data, which will
produce a more meaningful error message.
+ sampleAndCheckInterruption(_context.getPassiveDeadlineMs());
+ }
+
@Override
public void registerExecution(long time, int numRows) {
_statMap.merge(StatKey.EXECUTION_TIME_MS, time);
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
index 3491f5c43b..20c96fdcf1 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafOperator.java
@@ -153,8 +153,10 @@ public class LeafOperator extends MultiStageOperator {
if (_isEarlyTerminated) {
return SuccessMseBlock.INSTANCE;
}
+ // Here we use passive deadline because we end up waiting for the SSE
operators
+ // which can timeout by their own
BaseResultsBlock resultsBlock =
- _blockingQueue.poll(_context.getDeadlineMs() -
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ _blockingQueue.poll(_context.getPassiveDeadlineMs() -
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
if (resultsBlock == null) {
throw new TimeoutException("Timed out waiting for results block");
}
@@ -321,7 +323,8 @@ public class LeafOperator extends MultiStageOperator {
while (true) {
BaseResultsBlock resultsBlock;
try {
- long timeout = _context.getDeadlineMs() - System.currentTimeMillis();
+ // Here we could use active or passive, given we don't actually
execute anything
+ long timeout = _context.getPassiveDeadlineMs() -
System.currentTimeMillis();
resultsBlock = _blockingQueue.poll(timeout, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
@@ -439,13 +442,13 @@ public class LeafOperator extends MultiStageOperator {
});
}
try {
- if (!latch.await(_context.getDeadlineMs() -
System.currentTimeMillis(), TimeUnit.MILLISECONDS)) {
+ if (!latch.await(_context.getPassiveDeadlineMs() -
System.currentTimeMillis(), TimeUnit.MILLISECONDS)) {
throw new TimeoutException("Timed out waiting for leaf stage to
finish");
}
// Propagate the exception thrown by the leaf stage
for (Future<Map<String, String>> future : futures) {
Map<String, String> stats =
- future.get(_context.getDeadlineMs() -
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
+ future.get(_context.getPassiveDeadlineMs() -
System.currentTimeMillis(), TimeUnit.MILLISECONDS);
mergeExecutionStats(stats);
}
} catch (TimeoutException e) {
@@ -467,7 +470,7 @@ public class LeafOperator extends MultiStageOperator {
@VisibleForTesting
void addResultsBlock(BaseResultsBlock resultsBlock)
throws InterruptedException, TimeoutException {
- if (!_blockingQueue.offer(resultsBlock, _context.getDeadlineMs() -
System.currentTimeMillis(),
+ if (!_blockingQueue.offer(resultsBlock, _context.getPassiveDeadlineMs() -
System.currentTimeMillis(),
TimeUnit.MILLISECONDS)) {
throw new TimeoutException("Timed out waiting to add results block");
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
index d48ba0469c..6c223e861f 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java
@@ -159,7 +159,9 @@ public class MailboxSendOperator extends MultiStageOperator
{
distributionType);
MailboxService mailboxService = context.getMailboxService();
long requestId = context.getRequestId();
- long deadlineMs = context.getDeadlineMs();
+ // It is important to use passive deadline here, otherwise the GRPC
channel could be closed before
+ // the useful error block is sent
+ long deadlineMs = context.getPassiveDeadlineMs();
List<MailboxInfo> mailboxInfos =
context.getWorkerMetadata().getMailboxInfosMap().get(receiverStageId).getMailboxInfos();
@@ -230,6 +232,12 @@ public class MailboxSendOperator extends
MultiStageOperator {
}
}
+ @Override
+ protected void sampleAndCheckInterruption() {
+ // mailbox send operator uses passive deadline instead of the active one
+ sampleAndCheckInterruption(_context.getPassiveDeadlineMs());
+ }
+
private void sendEos(MseBlock.Eos eosBlockWithoutStats)
throws Exception {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
index edc6de4160..d9a285d342 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java
@@ -73,9 +73,30 @@ public abstract class MultiStageOperator
public abstract void registerExecution(long time, int numRows);
- // Samples resource usage of the operator. The operator should call this
function for every block of data or
- // assuming the block holds 10000 rows or more.
+ /// This method should be called periodically by the operator to check
whether the execution should be interrupted.
+ ///
+ /// This could happen when the request deadline is reached, or the thread
accountant decides to interrupt the query
+ /// due to resource constraints.
+ ///
+ /// Normally, callers should call [#sampleAndCheckInterruption(long
deadlineMs)] passing the correct deadline, but
+ /// given most operators use either the active or the passive deadline, this
method is provided as a convenience
+ /// method. By default, it uses the active deadline, which is the one that
should be used for most operators, but
+ /// if the operator does not actively process data (ie both mailbox
operators), it should override this method to
+ /// use the passive deadline instead.
+ /// See for example
[MailboxSendOperator][org.apache.pinot.query.runtime.operator.MailboxSendOperator]).
protected void sampleAndCheckInterruption() {
+ sampleAndCheckInterruption(_context.getActiveDeadlineMs());
+ }
+
+ /// This method should be called periodically by the operator to check
whether the execution should be interrupted.
+ ///
+ /// This could happen when the request deadline is reached, or the thread
accountant decides to interrupt the query
+ /// due to resource constraints.
+ protected void sampleAndCheckInterruption(long deadlineMs) {
+ if (System.currentTimeMillis() >= deadlineMs) {
+ earlyTerminate();
+ throw QueryErrorCode.EXECUTION_TIMEOUT.asException("Timing out on " +
getExplainName());
+ }
Tracing.ThreadAccountantOps.sampleMSE();
if (Tracing.ThreadAccountantOps.isInterrupted()) {
earlyTerminate();
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
index 3f6fc1192d..68e5784bc1 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/utils/BlockingMultiStreamConsumer.java
@@ -344,7 +344,7 @@ public abstract class BlockingMultiStreamConsumer<E>
implements AutoCloseable {
public OfMseBlock(OpChainExecutionContext context,
List<? extends AsyncStream<ReceivingMailbox.MseBlockWithStats>>
asyncProducers) {
- super(context.getId(), context.getDeadlineMs(), asyncProducers);
+ super(context.getId(), context.getPassiveDeadlineMs(), asyncProducers);
_stageId = context.getStageId();
_stats = MultiStageQueryStats.emptyStats(context.getStageId());
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
index af627f4e28..0c95edbfdd 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java
@@ -29,6 +29,7 @@ import org.apache.pinot.query.runtime.operator.OpChainId;
import org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerResult;
import org.apache.pinot.query.runtime.plan.server.ServerPlanRequestContext;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.query.QueryThreadContext;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -41,7 +42,8 @@ public class OpChainExecutionContext {
private final MailboxService _mailboxService;
private final long _requestId;
- private final long _deadlineMs;
+ private final long _activeDeadlineMs;
+ private final long _passiveDeadlineMs;
private final Map<String, String> _opChainMetadata;
private final StageMetadata _stageMetadata;
private final WorkerMetadata _workerMetadata;
@@ -56,14 +58,24 @@ public class OpChainExecutionContext {
private ServerPlanRequestContext _leafStageContext;
private final boolean _sendStats;
-
+ @Deprecated
public OpChainExecutionContext(MailboxService mailboxService, long
requestId, long deadlineMs,
Map<String, String> opChainMetadata, StageMetadata stageMetadata,
WorkerMetadata workerMetadata,
@Nullable PipelineBreakerResult pipelineBreakerResult, @Nullable
ThreadExecutionContext parentContext,
boolean sendStats) {
+ this(mailboxService, requestId, deadlineMs, deadlineMs, opChainMetadata,
stageMetadata, workerMetadata,
+ pipelineBreakerResult, parentContext, sendStats);
+ }
+
+ public OpChainExecutionContext(MailboxService mailboxService, long requestId,
+ long activeDeadlineMs, long passiveDeadlineMs,
+ Map<String, String> opChainMetadata, StageMetadata stageMetadata,
WorkerMetadata workerMetadata,
+ @Nullable PipelineBreakerResult pipelineBreakerResult, @Nullable
ThreadExecutionContext parentContext,
+ boolean sendStats) {
_mailboxService = mailboxService;
_requestId = requestId;
- _deadlineMs = deadlineMs;
+ _activeDeadlineMs = activeDeadlineMs;
+ _passiveDeadlineMs = passiveDeadlineMs;
_opChainMetadata = Collections.unmodifiableMap(opChainMetadata);
_stageMetadata = stageMetadata;
_workerMetadata = workerMetadata;
@@ -76,6 +88,17 @@ public class OpChainExecutionContext {
_parentContext = parentContext;
}
+ public static OpChainExecutionContext fromQueryContext(MailboxService
mailboxService,
+ Map<String, String> opChainMetadata, StageMetadata stageMetadata,
WorkerMetadata workerMetadata,
+ @Nullable PipelineBreakerResult pipelineBreakerResult, @Nullable
ThreadExecutionContext parentContext,
+ boolean sendStats) {
+ long requestId = QueryThreadContext.getRequestId();
+ long activeDeadlineMs = QueryThreadContext.getActiveDeadlineMs();
+ long passiveDeadlineMs = QueryThreadContext.getPassiveDeadlineMs();
+ return new OpChainExecutionContext(mailboxService, requestId,
activeDeadlineMs, passiveDeadlineMs,
+ opChainMetadata, stageMetadata, workerMetadata, pipelineBreakerResult,
parentContext, sendStats);
+ }
+
public MailboxService getMailboxService() {
return _mailboxService;
}
@@ -96,8 +119,37 @@ public class OpChainExecutionContext {
return _server;
}
+ /// Returns the deadline in milliseconds for the OpChain to complete when it
is actively waiting for data.
+ ///
+ /// This deadline should only be used for _active_ waits, like when a
+ ///
[HashJoinOperator][org.apache.pinot.query.runtime.operator.HashJoinOperator] is
building the hash table.
+ ///
+ /// This should not be used for _passive_ waits, like when the a
+ ///
[MailboxReceiveOperator][org.apache.pinot.query.runtime.operator.MailboxReceiveOperator]
or a
+ ///
[PipelineBreakerOperator][org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerOperator]
passively waits
+ /// for data to arrive.
+ public long getActiveDeadlineMs() {
+ return _activeDeadlineMs;
+ }
+
+ /// For backward compatibility, we return the active deadline as the default.
+ /// This should be used for active waits only.
+ /// @deprecated Use {@link #getActiveDeadlineMs()} instead.
public long getDeadlineMs() {
- return _deadlineMs;
+ return getActiveDeadlineMs();
+ }
+
+ /// Returns the deadline in milliseconds for the OpChain to complete when it
is passively waiting for data.
+ ///
+ /// This deadline should only be used for _passive_ waits, like when the
+ ///
[MailboxReceiveOperator][org.apache.pinot.query.runtime.operator.MailboxReceiveOperator]
or a
+ ///
[PipelineBreakerOperator][org.apache.pinot.query.runtime.plan.pipeline.PipelineBreakerOperator]
+ /// passively waits for data to arrive.
+ ///
+ /// This should not be used for _active_ waits, like when the a
+ ///
[HashJoinOperator][org.apache.pinot.query.runtime.operator.HashJoinOperator] is
building the hash table.
+ public long getPassiveDeadlineMs() {
+ return _passiveDeadlineMs;
}
public Map<String, String> getOpChainMetadata() {
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
index dffd7f6ba2..27f375e4db 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutor.java
@@ -38,6 +38,7 @@ import org.apache.pinot.query.runtime.operator.OpChain;
import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
import org.apache.pinot.query.runtime.plan.PlanNodeToOpChain;
import org.apache.pinot.spi.accounting.ThreadExecutionContext;
+import org.apache.pinot.spi.query.QueryThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,6 +52,30 @@ public class PipelineBreakerExecutor {
private static final Logger LOGGER =
LoggerFactory.getLogger(PipelineBreakerExecutor.class);
+ /**
+ * Execute a pipeline breaker and collect the results (synchronously).
Currently, pipeline breaker executor can only
+ * execute mailbox receive pipeline breaker.
+ *
+ * @param scheduler scheduler service to run the pipeline breaker main
thread.
+ * @param mailboxService mailbox service to attach the {@link
MailboxReceiveNode} against.
+ * @param workerMetadata worker metadata for the current worker.
+ * @param stagePlan the distributed stage plan to run pipeline breaker on.
+ * @param opChainMetadata request metadata, including query options
+ * @param parentContext Parent thread metadata
+ * @return pipeline breaker result;
+ * - If exception occurs, exception block will be wrapped in {@link
MseBlock} and assigned to each PB node.
+ * - Normal stats will be attached to each PB node and downstream
execution should return with stats attached.
+ */
+ @Nullable
+ public static PipelineBreakerResult
executePipelineBreakersFromQueryContext(OpChainSchedulerService scheduler,
+ MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan
stagePlan,
+ Map<String, String> opChainMetadata,
+ @Nullable ThreadExecutionContext parentContext, boolean sendStats) {
+ return executePipelineBreakers(scheduler, mailboxService, workerMetadata,
stagePlan, opChainMetadata,
+ QueryThreadContext.getRequestId(),
QueryThreadContext.getActiveDeadlineMs(),
+ QueryThreadContext.getPassiveDeadlineMs(), parentContext, sendStats);
+ }
+
/**
* Execute a pipeline breaker and collect the results (synchronously).
Currently, pipeline breaker executor can only
* execute mailbox receive pipeline breaker.
@@ -68,7 +93,7 @@ public class PipelineBreakerExecutor {
@Nullable
public static PipelineBreakerResult
executePipelineBreakers(OpChainSchedulerService scheduler,
MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan
stagePlan,
- Map<String, String> opChainMetadata, long requestId, long deadlineMs,
+ Map<String, String> opChainMetadata, long requestId, long
activeDeadlineMs, long passiveDeadlineMs,
@Nullable ThreadExecutionContext parentContext, boolean sendStats) {
PipelineBreakerContext pipelineBreakerContext = new
PipelineBreakerContext();
PipelineBreakerVisitor.visitPlanRoot(stagePlan.getRootNode(),
pipelineBreakerContext);
@@ -78,7 +103,7 @@ public class PipelineBreakerExecutor {
// OpChain receive-mail callbacks.
// see also: MailboxIdUtils TODOs, de-couple mailbox id from query
information
OpChainExecutionContext opChainExecutionContext =
- new OpChainExecutionContext(mailboxService, requestId, deadlineMs,
opChainMetadata,
+ new OpChainExecutionContext(mailboxService, requestId,
activeDeadlineMs, passiveDeadlineMs, opChainMetadata,
stagePlan.getStageMetadata(), workerMetadata, null,
parentContext, sendStats);
return execute(scheduler, pipelineBreakerContext,
opChainExecutionContext);
} catch (Exception e) {
@@ -125,7 +150,7 @@ public class PipelineBreakerExecutor {
OpChain pipelineBreakerOpChain =
new OpChain(opChainExecutionContext, pipelineBreakerOperator, (id) ->
latch.countDown());
scheduler.register(pipelineBreakerOpChain);
- long timeoutMs = opChainExecutionContext.getDeadlineMs() -
System.currentTimeMillis();
+ long timeoutMs = opChainExecutionContext.getPassiveDeadlineMs() -
System.currentTimeMillis();
if (latch.await(timeoutMs, TimeUnit.MILLISECONDS)) {
return new PipelineBreakerResult(pipelineBreakerContext.getNodeIdMap(),
pipelineBreakerOperator.getResultMap(),
pipelineBreakerOperator.getErrorBlock(),
pipelineBreakerOperator.calculateStats());
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index 6e5d1aac01..572c4c55d7 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -294,7 +294,9 @@ public class ServerPlanRequestUtils {
pinotQuery.setQueryOptions(queryOptions);
}
queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
- Long.toString(executionContext.getDeadlineMs() -
System.currentTimeMillis()));
+ Long.toString(executionContext.getActiveDeadlineMs() -
System.currentTimeMillis()));
+
queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS,
+ Long.toString(executionContext.getPassiveDeadlineMs() -
executionContext.getActiveDeadlineMs()));
}
/**
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
index 68cac7e9ce..6486db9228 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java
@@ -168,7 +168,7 @@ public class QueryDispatcher {
boolean cancelled = false;
try {
submit(requestId, dispatchableSubPlan, timeoutMs, servers, queryOptions);
- QueryResult result = runReducer(requestId, dispatchableSubPlan,
timeoutMs, queryOptions, _mailboxService);
+ QueryResult result = runReducer(dispatchableSubPlan, queryOptions,
_mailboxService);
if (result.getProcessingException() != null) {
MultiStageQueryStats statsFromCancel = cancelWithStats(requestId,
servers);
cancelled = true;
@@ -452,6 +452,8 @@ public class QueryDispatcher {
requestMetadata.put(CommonConstants.Query.Request.MetadataKeys.CORRELATION_ID,
cid);
requestMetadata.put(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS,
Long.toString(deadline.timeRemaining(TimeUnit.MILLISECONDS)));
+
requestMetadata.put(CommonConstants.Broker.Request.QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS,
+ Long.toString(QueryThreadContext.getPassiveDeadlineMs()));
requestMetadata.putAll(queryOptions);
return requestMetadata;
}
@@ -577,15 +579,38 @@ public class QueryDispatcher {
return _timeSeriesDispatchClientMap.computeIfAbsent(key, k -> new
TimeSeriesDispatchClient(hostname, port));
}
- // There is no reduction happening here, results are simply concatenated.
+ /// Concatenates the results of the sub-plan and returns a [QueryResult]
with the concatenated result.
+ ///
+ /// This method assumes the caller thread is a query thread and therefore
[QueryThreadContext] has been initialized.
+ private static QueryResult runReducer(
+ DispatchableSubPlan subPlan,
+ Map<String, String> queryOptions,
+ MailboxService mailboxService
+ ) {
+ return runReducer(
+ QueryThreadContext.getRequestId(),
+ subPlan,
+ QueryThreadContext.getActiveDeadlineMs(),
+ QueryThreadContext.getPassiveDeadlineMs(),
+ queryOptions,
+ mailboxService
+ );
+ }
+
+ /// Concatenates the results of the sub-plan and returns a [QueryResult]
with the concatenated result.
+ ///
+ /// This method should be called from a query thread and therefore using
+ /// [#runReducer(DispatchableSubPlan, Map, MailboxService)] is preferred.
+ ///
+ /// Remember that in MSE there is no actual reduce but rather a single stage
that concatenates the results.
@VisibleForTesting
public static QueryResult runReducer(long requestId,
DispatchableSubPlan subPlan,
- long timeoutMs,
+ long activeDeadlineMs,
+ long passiveDeadlineMs,
Map<String, String> queryOptions,
MailboxService mailboxService) {
long startTimeMs = System.currentTimeMillis();
- long deadlineMs = startTimeMs + timeoutMs;
// NOTE: Reduce stage is always stage 0
DispatchablePlanFragment stagePlan = subPlan.getQueryStageMap().get(0);
PlanFragment planFragment = stagePlan.getPlanFragment();
@@ -597,8 +622,8 @@ public class QueryDispatcher {
StageMetadata stageMetadata = new StageMetadata(0, workerMetadata,
stagePlan.getCustomProperties());
ThreadExecutionContext parentContext =
Tracing.getThreadAccountant().getThreadExecutionContext();
OpChainExecutionContext executionContext =
- new OpChainExecutionContext(mailboxService, requestId, deadlineMs,
queryOptions, stageMetadata,
- workerMetadata.get(0), null, parentContext, true);
+ new OpChainExecutionContext(mailboxService, requestId,
activeDeadlineMs, passiveDeadlineMs,
+ queryOptions, stageMetadata, workerMetadata.get(0), null,
parentContext, true);
PairList<Integer, String> resultFields = subPlan.getQueryResultFields();
DataSchema sourceSchema = rootNode.getDataSchema();
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
index fa4689d26a..4f25431751 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/plan/pipeline/PipelineBreakerExecutorTest.java
@@ -108,7 +108,7 @@ public class PipelineBreakerExecutorTest {
MailboxService mailboxService, WorkerMetadata workerMetadata, StagePlan
stagePlan,
Map<String, String> opChainMetadata, long requestId, long deadlineMs) {
return PipelineBreakerExecutor.executePipelineBreakers(scheduler,
mailboxService, workerMetadata, stagePlan,
- opChainMetadata, requestId, deadlineMs, null, true);
+ opChainMetadata, requestId, deadlineMs, deadlineMs, null, true);
}
@AfterClass
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
index 89371ec0a3..ff34cc3ef8 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/queries/QueryRunnerTestBase.java
@@ -160,7 +160,12 @@ public abstract class QueryRunnerTestBase extends
QueryTestSet {
}
}
// exception will be propagated through for assert purpose on runtime error
- return QueryDispatcher.runReducer(requestId, dispatchableSubPlan,
timeoutMs, Collections.emptyMap(),
+ long now = System.currentTimeMillis();
+ long extraPassiveTimeout = 1000;
+ return QueryDispatcher.runReducer(requestId, dispatchableSubPlan,
+ timeoutMs + now,
+ timeoutMs + now + extraPassiveTimeout,
+ Collections.emptyMap(),
_mailboxService);
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java
index a4d5174efe..dbfc6ece96 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/query/QueryThreadContext.java
@@ -181,9 +181,12 @@ public class QueryThreadContext {
}
QueryThreadContext.setIds(requestId, cid);
long timeoutMs =
Long.parseLong(requestMetadata.get(CommonConstants.Broker.Request.QueryOptionKey.TIMEOUT_MS));
+ long extraPassiveTimeoutMs = Long.parseLong(requestMetadata.getOrDefault(
+
CommonConstants.Broker.Request.QueryOptionKey.EXTRA_PASSIVE_TIMEOUT_MS, "0"));
long startTimeMs = System.currentTimeMillis();
QueryThreadContext.setStartTimeMs(startTimeMs);
- QueryThreadContext.setDeadlineMs(startTimeMs + timeoutMs);
+ QueryThreadContext.setActiveDeadlineMs(startTimeMs + timeoutMs);
+ QueryThreadContext.setPassiveDeadlineMs(startTimeMs + timeoutMs +
extraPassiveTimeoutMs);
return open;
}
@@ -217,7 +220,8 @@ public class QueryThreadContext {
Instance context = new Instance();
if (memento != null) {
context.setStartTimeMs(memento._startTimeMs);
- context.setDeadlineMs(memento._deadlineMs);
+ context.setActiveDeadlineMs(memento._activeDeadlineMs);
+ context.setPassiveDeadlineMs(memento._passiveDeadlineMs);
context.setBrokerId(memento._brokerId);
context.setRequestId(memento._requestId);
context.setCid(memento._cid);
@@ -295,23 +299,60 @@ public class QueryThreadContext {
}
/**
- * Returns the deadline time of the query in milliseconds since epoch.
+ * Use {@link #getActiveDeadlineMs()} instead.
+ */
+ @Deprecated
+ public static long getDeadlineMs() {
+ return get().getActiveDeadlineMs();
+ }
+
+ /**
+ * @deprecated Use {@link #setActiveDeadlineMs(long)} instead.
+ * @throws IllegalStateException if deadline is already set or if the {@link
QueryThreadContext} is not initialized
+ */
+ @Deprecated
+ public static void setDeadlineMs(long deadlineMs) {
+ get().setActiveDeadlineMs(deadlineMs);
+ }
+
+ /**
+ * Returns the active deadline time of the query in milliseconds since epoch.
*
* The default value of 0 means the deadline is not set.
* @throws IllegalStateException if the {@link QueryThreadContext} is not
initialized
*/
- public static long getDeadlineMs() {
- return get().getDeadlineMs();
+ public static long getActiveDeadlineMs() {
+ return get().getActiveDeadlineMs();
}
/**
- * Sets the deadline time of the query in milliseconds since epoch.
+ * Sets the active deadline time of the query in milliseconds since epoch.
*
* The deadline can only be set once.
* @throws IllegalStateException if deadline is already set or if the {@link
QueryThreadContext} is not initialized
*/
- public static void setDeadlineMs(long deadlineMs) {
- get().setDeadlineMs(deadlineMs);
+ public static void setActiveDeadlineMs(long activeDeadlineMs) {
+ get().setActiveDeadlineMs(activeDeadlineMs);
+ }
+
+ /**
+ * Returns the passive deadline time of the query in milliseconds since
epoch.
+ *
+ * The default value of 0 means the deadline is not set.
+ * @throws IllegalStateException if the {@link QueryThreadContext} is not
initialized
+ */
+ public static long getPassiveDeadlineMs() {
+ return get().getPassiveDeadlineMs();
+ }
+
+ /**
+ * Sets the passive deadline time of the query in milliseconds since epoch.
+ *
+ * The deadline can only be set once.
+ * @throws IllegalStateException if deadline is already set or if the {@link
QueryThreadContext} is not initialized
+ */
+ public static void setPassiveDeadlineMs(long passiveDeadlineMs) {
+ get().setPassiveDeadlineMs(passiveDeadlineMs);
}
/**
@@ -456,7 +497,8 @@ public class QueryThreadContext {
*/
private static class Instance implements CloseableContext {
private long _startTimeMs;
- private long _deadlineMs;
+ private long _activeDeadlineMs;
+ private long _passiveDeadlineMs;
private String _brokerId;
private long _requestId;
private String _cid;
@@ -474,14 +516,34 @@ public class QueryThreadContext {
_startTimeMs = startTimeMs;
}
+ @Deprecated
public long getDeadlineMs() {
- return _deadlineMs;
+ return getActiveDeadlineMs();
+ }
+
+ public long getActiveDeadlineMs() {
+ return _activeDeadlineMs;
}
+ @Deprecated
public void setDeadlineMs(long deadlineMs) {
- Preconditions.checkState(getDeadlineMs() == 0, "Deadline already set to
%s, cannot set again",
- getDeadlineMs());
- _deadlineMs = deadlineMs;
+ setActiveDeadlineMs(deadlineMs);
+ }
+
+ public void setActiveDeadlineMs(long activeDeadlineMs) {
+ Preconditions.checkState(getActiveDeadlineMs() == 0, "Deadline already
set to %s, cannot set again",
+ getActiveDeadlineMs());
+ _activeDeadlineMs = activeDeadlineMs;
+ }
+
+ public long getPassiveDeadlineMs() {
+ return _passiveDeadlineMs;
+ }
+
+ public void setPassiveDeadlineMs(long passiveDeadlineMs) {
+ Preconditions.checkState(getPassiveDeadlineMs() == 0, "Passive deadline
already set to %s, cannot set again",
+ getPassiveDeadlineMs());
+ _passiveDeadlineMs = passiveDeadlineMs;
}
public String getBrokerId() {
@@ -576,8 +638,13 @@ public class QueryThreadContext {
}
@Override
- public void setDeadlineMs(long deadlineMs) {
- LOGGER.debug("Setting deadline to {} in a fake context", deadlineMs);
+ public void setActiveDeadlineMs(long activeDeadlineMs) {
+ LOGGER.debug("Setting active deadline to {} in a fake context",
activeDeadlineMs);
+ }
+
+ @Override
+ public void setPassiveDeadlineMs(long passiveDeadlineMs) {
+ LOGGER.debug("Setting passive deadline to {} in a fake context",
passiveDeadlineMs);
}
@Override
@@ -632,7 +699,8 @@ public class QueryThreadContext {
*/
public static class Memento {
private final long _startTimeMs;
- private final long _deadlineMs;
+ private final long _activeDeadlineMs;
+ private final long _passiveDeadlineMs;
private final String _brokerId;
private final long _requestId;
private final String _cid;
@@ -642,7 +710,8 @@ public class QueryThreadContext {
private Memento(Instance instance) {
_startTimeMs = instance.getStartTimeMs();
- _deadlineMs = instance.getDeadlineMs();
+ _activeDeadlineMs = instance.getActiveDeadlineMs();
+ _passiveDeadlineMs = instance.getPassiveDeadlineMs();
_brokerId = instance.getBrokerId();
_requestId = instance.getRequestId();
_cid = instance.getCid();
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 2d60123329..3a2b703edc 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -344,6 +344,8 @@ public class CommonConstants {
public static final String CONFIG_OF_BROKER_ENABLE_ROW_COLUMN_LEVEL_AUTH =
"pinot.broker.enable.row.column.level.auth";
public static final boolean DEFAULT_BROKER_ENABLE_ROW_COLUMN_LEVEL_AUTH =
false;
+ public static final String CONFIG_OF_EXTRA_PASSIVE_TIMEOUT_MS =
"pinot.broker.extraPassiveTimeoutMs";
+ public static final long DEFAULT_EXTRA_PASSIVE_TIMEOUT_MS = 100L;
public static final String CONFIG_OF_BROKER_ID =
"pinot.broker.instance.id";
public static final String CONFIG_OF_BROKER_INSTANCE_TAGS =
"pinot.broker.instance.tags";
public static final String CONFIG_OF_BROKER_HOSTNAME =
"pinot.broker.hostname";
@@ -544,6 +546,7 @@ public class CommonConstants {
public static class QueryOptionKey {
public static final String TIMEOUT_MS = "timeoutMs";
+ public static final String EXTRA_PASSIVE_TIMEOUT_MS =
"extraPassiveTimeoutMs";
public static final String SKIP_UPSERT = "skipUpsert";
public static final String SKIP_UPSERT_VIEW = "skipUpsertView";
public static final String UPSERT_VIEW_FRESHNESS_MS =
"upsertViewFreshnessMs";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]