This is an automated email from the ASF dual-hosted git repository.
tuglu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 78a1daea3f1 Implement per-segment query timeout on data nodes (#18148)
78a1daea3f1 is described below
commit 78a1daea3f1e0283c03c5e7abcb34ce1975ac76d
Author: jtuglu1 <[email protected]>
AuthorDate: Tue Oct 7 17:56:53 2025 -0700
Implement per-segment query timeout on data nodes (#18148)
Adds settable per-segment processing query timeout on data nodes.
This feature can be enabled via setting `perSegmentTimeout` query context
parameter in the query context and by setting
`druid.processing.numTimeoutThreads` to a small number (e.g. 1-3) on all data
nodes where you want this feature enabled.
---
docs/configuration/index.md | 3 +
docs/querying/query-context-reference.md | 1 +
.../SeekableStreamAppenderatorConfigTest.java | 1 +
.../druid/query/ChainedExecutionQueryRunner.java | 109 ++++----
.../druid/query/DirectQueryProcessingPool.java | 14 +
.../apache/druid/query/DruidProcessingConfig.java | 15 +-
.../druid/query/ForwardingQueryProcessingPool.java | 46 +++-
.../query/MetricsEmittingQueryProcessingPool.java | 6 +-
.../druid/query/NoopQueryProcessingPool.java | 10 +
.../java/org/apache/druid/query/QueryContext.java | 27 +-
.../java/org/apache/druid/query/QueryContexts.java | 1 +
.../apache/druid/query/QueryProcessingPool.java | 15 ++
.../epinephelinae/GroupByMergingQueryRunner.java | 83 +++---
.../SegmentMetadataQueryRunnerFactory.java | 4 -
.../query/ChainedExecutionQueryRunnerTest.java | 168 +++++++++++-
.../MetricsEmittingQueryProcessingPoolTest.java | 24 +-
.../groupby/GroupByQueryRunnerFailureTest.java | 287 ++++++++++++++++++++-
.../apache/druid/guice/BrokerProcessingModule.java | 2 +-
.../apache/druid/guice/DruidProcessingModule.java | 2 +
19 files changed, 721 insertions(+), 97 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index c8e1d200cfb..3198f6529f4 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1371,6 +1371,7 @@ Processing properties set on the Middle Manager are
passed through to Peons.
|`druid.processing.formatString`|Realtime and Historical processes use this
format string to name their processing threads.|processing-%s|
|`druid.processing.numMergeBuffers`|The number of direct memory buffers
available for merging query results. The buffers are sized by
`druid.processing.buffer.sizeBytes`. This property is effectively a concurrency
limit for queries that require merging buffers. If you are using any queries
that require merge buffers (currently, just groupBy) then you should have at
least two of these.|`max(2, druid.processing.numThreads / 4)`|
|`druid.processing.numThreads`|The number of processing threads to have
available for parallel processing of segments. Our rule of thumb is `num_cores
- 1`, which means that even under heavy load there will still be one core
available to do background tasks like talking with ZooKeeper and pulling down
segments. If only one core is available, this property defaults to the value
`1`.|Number of cores - 1 (or 1)|
+|`druid.processing.numTimeoutThreads`|The number of processing threads to have
available for handling per-segment query timeouts. Setting this value to `0`
removes the ability to service per-segment timeouts, irrespective of
`perSegmentTimeout` query context parameter. As these threads are just
servicing timers, it's recommended to set this value to some small percent
(e.g. 5%) of the total query processing cores available to the peon.|0|
|`druid.processing.fifo`|Enables the processing queue to treat tasks of equal
priority in a FIFO manner.|`true`|
|`druid.processing.tmpDir`|Path where temporary files created while processing
a query should be stored. If specified, this configuration takes priority over
the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
|`druid.processing.intermediaryData.storage.type`|Storage type for
intermediary segments of data shuffle between native parallel index tasks. <br
/>Set to `local` to store segment files in the local storage of the Middle
Manager or Indexer. <br />Set to `deepstore` to use configured deep storage for
better fault tolerance during rolling updates. When the storage type is
`deepstore`, Druid stores the data in the `shuffle-data` directory under the
configured deep storage path. Druid does n [...]
@@ -1514,6 +1515,7 @@ Druid uses Jetty to serve HTTP requests.
|`druid.processing.formatString`|Indexer processes use this format string to
name their processing threads.|processing-%s|
|`druid.processing.numMergeBuffers`|The number of direct memory buffers
available for merging query results. The buffers are sized by
`druid.processing.buffer.sizeBytes`. This property is effectively a concurrency
limit for queries that require merging buffers. If you are using any queries
that require merge buffers (currently, just groupBy) then you should have at
least two of these.|`max(2, druid.processing.numThreads / 4)`|
|`druid.processing.numThreads`|The number of processing threads to have
available for parallel processing of segments. Our rule of thumb is `num_cores
- 1`, which means that even under heavy load there will still be one core
available to do background tasks like talking with ZooKeeper and pulling down
segments. If only one core is available, this property defaults to the value
`1`.|Number of cores - 1 (or 1)|
+|`druid.processing.numTimeoutThreads`|The number of processing threads to have
available for handling per-segment query timeouts. Setting this value to `0`
removes the ability to service per-segment timeouts, irrespective of
`perSegmentTimeout` query context parameter. As these threads are just
servicing timers, it's recommended to set this value to some small percent
(e.g. 5%) of the total query processing cores available to the indexer.|0|
|`druid.processing.fifo`|If the processing queue should treat tasks of equal
priority in a FIFO manner|`true`|
|`druid.processing.tmpDir`|Path where temporary files created while processing
a query should be stored. If specified, this configuration takes priority over
the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
@@ -1622,6 +1624,7 @@ Druid uses Jetty to serve HTTP requests.
|`druid.processing.formatString`|Realtime and Historical processes use this
format string to name their processing threads.|processing-%s|
|`druid.processing.numMergeBuffers`|The number of direct memory buffers
available for merging query results. The buffers are sized by
`druid.processing.buffer.sizeBytes`. This property is effectively a concurrency
limit for queries that require merging buffers. If you are using any queries
that require merge buffers (currently, just groupBy) then you should have at
least two of these.|`max(2, druid.processing.numThreads / 4)`|
|`druid.processing.numThreads`|The number of processing threads to have
available for parallel processing of segments. Our rule of thumb is `num_cores
- 1`, which means that even under heavy load there will still be one core
available to do background tasks like talking with ZooKeeper and pulling down
segments. If only one core is available, this property defaults to the value
`1`.|Number of cores - 1 (or 1)|
+|`druid.processing.numTimeoutThreads`|The number of processing threads to have
available for handling per-segment query timeouts. Setting this value to `0`
removes the ability to service per-segment timeouts, irrespective of
`perSegmentTimeout` query context parameter. As these threads are just
servicing timers, it's recommended to set this value to some small percent
(e.g. 5%) of the total query processing cores available to the historical.|0|
|`druid.processing.fifo`|If the processing queue should treat tasks of equal
priority in a FIFO manner|`true`|
|`druid.processing.tmpDir`|Path where temporary files created while processing
a query should be stored. If specified, this configuration takes priority over
the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
diff --git a/docs/querying/query-context-reference.md
b/docs/querying/query-context-reference.md
index 52515236620..a7532b265d0 100644
--- a/docs/querying/query-context-reference.md
+++ b/docs/querying/query-context-reference.md
@@ -44,6 +44,7 @@ Unless otherwise noted, the following parameters apply to all
query types, and t
|Parameter |Default | Description
|
|-------------------|----------------------------------------|----------------------|
|`timeout` | `druid.server.http.defaultQueryTimeout`| Query timeout
in millis, beyond which unfinished queries will be cancelled. 0 timeout means
`no timeout` (up to the server-side maximum query timeout,
`druid.server.http.maxQueryTimeout`). To set the default timeout and maximum
timeout, see [Broker configuration](../configuration/index.md#broker) |
+|`perSegmentTimeout`| `null` | Per-segment
processing timeout in millis, beyond which unfinished queries will be
cancelled. Should be ≤ `timeout`. 0 `perSegmentTimeout` means `no per-segment
timeout`. Generally, a standard default should be O(X seconds). A cluster-wide
default value for this query context can be specified via
`druid.query.default.context.perSegmentTimeout`.|
|`priority` | The default priority is one of the following:
<ul><li>Value of `priority` in the query context, if set</li><li>The value of
the runtime property `druid.query.default.context.priority`, if set and not
null</li><li>`0` if the priority is not set in the query context or runtime
properties</li></ul>| Query priority. Queries with higher priority get
precedence for computational resources.|
|`lane` | `null` | Query lane,
used to control usage limits on classes of queries. See [Broker
configuration](../configuration/index.md#broker) for more details.|
|`queryId` | auto-generated | Unique
identifier given to this query. If a query ID is set or known, this can be used
to cancel the query |
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
index 2cc2a1b6c25..85c4f2ea5eb 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
@@ -192,6 +192,7 @@ public class SeekableStreamAppenderatorConfigTest
super(
null,
numThreads,
+ null,
numMergeBuffers,
null,
null,
diff --git
a/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java
b/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java
index 4cc5a9fbd2d..74f2ffc634a 100644
---
a/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java
+++
b/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java
@@ -81,6 +81,10 @@ public class ChainedExecutionQueryRunner<T> implements
QueryRunner<T>
final int priority = query.context().getPriority();
final Ordering ordering = query.getResultOrdering();
final QueryPlus<T> threadSafeQueryPlus =
queryPlus.withoutThreadUnsafeState();
+
+ final QueryContext context = query.context();
+ final boolean usePerSegmentTimeout = context.usePerSegmentTimeout();
+ final long perSegmentTimeout = context.getPerSegmentTimeout();
return new BaseSequence<>(
new BaseSequence.IteratorMaker<>()
{
@@ -97,42 +101,54 @@ public class ChainedExecutionQueryRunner<T> implements
QueryRunner<T>
throw new ISE("Null queryRunner! Looks to be some
segment unmapping action happening");
}
- return queryProcessingPool.submitRunnerTask(
- new
AbstractPrioritizedQueryRunnerCallable<>(priority, input)
- {
- @Override
- public Iterable<T> call()
- {
- try {
- Sequence<T> result =
input.run(threadSafeQueryPlus, responseContext);
- if (result == null) {
- throw new ISE("Got a null result!
Segments are missing!");
- }
-
- List<T> retVal = result.toList();
- if (retVal == null) {
- throw new ISE("Got a null list of
results");
- }
-
- return retVal;
- }
- catch (QueryInterruptedException e) {
- throw new RuntimeException(e);
- }
- catch (QueryTimeoutException e) {
- throw e;
- }
- catch (Exception e) {
- if (query.context().isDebug()) {
- log.error(e, "Exception with one of the
sequences!");
- } else {
- log.noStackTrace().error(e, "Exception
with one of the sequences!");
- }
- Throwables.propagateIfPossible(e);
- throw new RuntimeException(e);
- }
+ final
AbstractPrioritizedQueryRunnerCallable<Iterable<T>, T> callable = new
AbstractPrioritizedQueryRunnerCallable<>(
+ priority,
+ input
+ )
+ {
+ @Override
+ public Iterable<T> call()
+ {
+ try {
+ Sequence<T> result =
input.run(threadSafeQueryPlus, responseContext);
+ if (result == null) {
+ throw new ISE("Got a null result! Segments
are missing!");
+ }
+
+ List<T> retVal = result.toList();
+ if (retVal == null) {
+ throw new ISE("Got a null list of results");
+ }
+
+ return retVal;
+ }
+ catch (QueryInterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ catch (QueryTimeoutException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ if (query.context().isDebug()) {
+ log.error(e, "Exception with one of the
sequences!");
+ } else {
+ log.noStackTrace().error(e, "Exception with
one of the sequences!");
}
- });
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ if (usePerSegmentTimeout) {
+ return queryProcessingPool.submitRunnerTask(
+ callable,
+ perSegmentTimeout,
+ TimeUnit.MILLISECONDS
+ );
+ } else {
+ return
queryProcessingPool.submitRunnerTask(callable);
+ }
}
)
);
@@ -141,7 +157,6 @@ public class ChainedExecutionQueryRunner<T> implements
QueryRunner<T>
queryWatcher.registerQueryFuture(query, future);
try {
- final QueryContext context = query.context();
return new MergeIterable<>(
context.hasTimeout() ?
future.get(context.getTimeout(), TimeUnit.MILLISECONDS) :
@@ -149,24 +164,26 @@ public class ChainedExecutionQueryRunner<T> implements
QueryRunner<T>
ordering.nullsFirst()
).iterator();
}
- catch (InterruptedException e) {
- log.noStackTrace().warn(e, "Query interrupted, cancelling
pending results, query id [%s]", query.getId());
- //Note: canceling combinedFuture first so that it can complete
with INTERRUPTED as its final state. See
ChainedExecutionQueryRunnerTest.testQueryTimeout()
+ catch (CancellationException | InterruptedException e) {
+ log.noStackTrace().warn(e, "Query interrupted, cancelling
pending results for query [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
throw new QueryInterruptedException(e);
}
- catch (CancellationException e) {
- throw new QueryInterruptedException(e);
- }
- catch (TimeoutException e) {
- log.warn("Query timeout, cancelling pending results for query id
[%s]", query.getId());
+ catch (TimeoutException | QueryTimeoutException e) {
+ log.noStackTrace().warn(e, "Query timeout, cancelling pending
results for query [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
throw new
QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out",
query.getId()));
}
catch (ExecutionException e) {
+ log.noStackTrace().warn(e, "Query error, cancelling pending
results for query [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
- Throwables.propagateIfPossible(e.getCause());
- throw new RuntimeException(e.getCause());
+ Throwable cause = e.getCause();
+ // Nested per-segment future timeout
+ if (cause instanceof TimeoutException) {
+ throw new
QueryTimeoutException(StringUtils.nonStrictFormat("Query timeout, cancelling
pending results for query [%s]. Per-segment timeout exceeded.", query.getId()));
+ }
+ Throwables.throwIfUnchecked(cause);
+ throw new RuntimeException(cause);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/DirectQueryProcessingPool.java
b/processing/src/main/java/org/apache/druid/query/DirectQueryProcessingPool.java
index c24b0157af6..054ba1bd616 100644
---
a/processing/src/main/java/org/apache/druid/query/DirectQueryProcessingPool.java
+++
b/processing/src/main/java/org/apache/druid/query/DirectQueryProcessingPool.java
@@ -24,6 +24,8 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.java.util.common.concurrent.Execs;
+import java.util.concurrent.TimeUnit;
+
/**
* {@link QueryProcessingPool} wrapper over {@link Execs#directExecutor()}
*/
@@ -42,6 +44,18 @@ public class DirectQueryProcessingPool extends
ForwardingListeningExecutorServic
return delegate().submit(task);
}
+ @Override
+ public <T, V> ListenableFuture<T> submitRunnerTask(
+ PrioritizedQueryRunnerCallable<T, V> task,
+ long timeout,
+ TimeUnit unit
+ )
+ {
+ // As the thread that will call .get() on this future will be the one
doing the execution, having
+ // a supported timeout here will not work since the executing thread will
not receive the timeout interrupt.
+ return submitRunnerTask(task);
+ }
+
@Override
public ListeningExecutorService delegate()
{
diff --git
a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
index c469226a62e..7a538910fe5 100644
--- a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
+++ b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
@@ -52,6 +52,9 @@ public class DruidProcessingConfig implements ColumnConfig
private final DruidProcessingBufferConfig buffer;
@JsonProperty
private final DruidProcessingIndexesConfig indexes;
+ @JsonProperty
+ private final int numTimeoutThreads;
+
private final AtomicReference<Integer> computedBufferSizeBytes = new
AtomicReference<>();
private final boolean numThreadsConfigured;
private final boolean numMergeBuffersConfigured;
@@ -60,6 +63,7 @@ public class DruidProcessingConfig implements ColumnConfig
public DruidProcessingConfig(
@JsonProperty("formatString") @Nullable String formatString,
@JsonProperty("numThreads") @Nullable Integer numThreads,
+ @JsonProperty("numTimeoutThreads") @Nullable Integer numTimeoutThreads,
@JsonProperty("numMergeBuffers") @Nullable Integer numMergeBuffers,
@JsonProperty("fifo") @Nullable Boolean fifo,
@JsonProperty("tmpDir") @Nullable String tmpDir,
@@ -73,6 +77,10 @@ public class DruidProcessingConfig implements ColumnConfig
numThreads,
Math.max(runtimeInfo.getAvailableProcessors() - 1, 1)
);
+ this.numTimeoutThreads = Configs.valueOrDefault(
+ numTimeoutThreads,
+ 0
+ );
this.numMergeBuffers = Configs.valueOrDefault(numMergeBuffers, Math.max(2,
this.numThreads / 4));
this.fifo = fifo == null || fifo;
this.tmpDir = Configs.valueOrDefault(tmpDir,
System.getProperty("java.io.tmpdir"));
@@ -87,7 +95,7 @@ public class DruidProcessingConfig implements ColumnConfig
@VisibleForTesting
public DruidProcessingConfig()
{
- this(null, null, null, null, null, null, null, JvmUtils.getRuntimeInfo());
+ this(null, null, null, null, null, null, null, null,
JvmUtils.getRuntimeInfo());
}
private void initializeBufferSize(RuntimeInfo runtimeInfo)
@@ -143,6 +151,11 @@ public class DruidProcessingConfig implements ColumnConfig
return numThreads;
}
+ public int getNumTimeoutThreads()
+ {
+ return numTimeoutThreads;
+ }
+
public int getNumMergeBuffers()
{
return numMergeBuffers;
diff --git
a/processing/src/main/java/org/apache/druid/query/ForwardingQueryProcessingPool.java
b/processing/src/main/java/org/apache/druid/query/ForwardingQueryProcessingPool.java
index fdb98756c53..532cdd85f4d 100644
---
a/processing/src/main/java/org/apache/druid/query/ForwardingQueryProcessingPool.java
+++
b/processing/src/main/java/org/apache/druid/query/ForwardingQueryProcessingPool.java
@@ -19,24 +19,42 @@
package org.apache.druid.query;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
+import javax.annotation.Nullable;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* Default implementation of {@link QueryProcessingPool} that just forwards
operations, including query execution tasks,
* to an underlying {@link ExecutorService}
+ * Exposes a method {@link #submitRunnerTask(PrioritizedQueryRunnerCallable,
long, TimeUnit)} which allows execution tasks to be serviced a custom timeout.
*/
public class ForwardingQueryProcessingPool extends
ForwardingListeningExecutorService implements QueryProcessingPool
{
private final ListeningExecutorService delegate;
+ @Nullable private final ScheduledExecutorService timeoutService;
- public ForwardingQueryProcessingPool(ExecutorService executorService)
+ public ForwardingQueryProcessingPool(ExecutorService executorService,
@Nullable ScheduledExecutorService timeoutService)
{
this.delegate = MoreExecutors.listeningDecorator(executorService);
+ if (timeoutService != null) {
+ this.timeoutService = MoreExecutors.listeningDecorator(timeoutService);
+ } else {
+ this.timeoutService = null;
+ }
+ }
+
+ @VisibleForTesting
+ public ForwardingQueryProcessingPool(ExecutorService executorService)
+ {
+ this(executorService, null);
}
@Override
@@ -45,10 +63,36 @@ public class ForwardingQueryProcessingPool extends
ForwardingListeningExecutorSe
return delegate().submit(task);
}
+ @Override
+ public <T, V> ListenableFuture<T> submitRunnerTask(
+ PrioritizedQueryRunnerCallable<T, V> task,
+ long timeout,
+ TimeUnit unit
+ )
+ {
+ if (timeoutService != null) {
+ return Futures.withTimeout(
+ delegate().submit(task),
+ timeout,
+ unit,
+ timeoutService
+ );
+ }
+ return submitRunnerTask(task);
+ }
+
@Override
protected ListeningExecutorService delegate()
{
return delegate;
}
+ @Override
+ public void shutdown()
+ {
+ super.shutdown(); // shutdown delegate()
+ if (timeoutService != null) {
+ timeoutService.shutdown();
+ }
+ }
}
diff --git
a/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryProcessingPool.java
b/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryProcessingPool.java
index 1f831d7953e..e8d1b36815b 100644
---
a/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryProcessingPool.java
+++
b/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryProcessingPool.java
@@ -23,16 +23,20 @@ import
com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
+import javax.annotation.Nullable;
+import java.util.concurrent.ScheduledExecutorService;
+
public class MetricsEmittingQueryProcessingPool extends
ForwardingQueryProcessingPool
implements ExecutorServiceMonitor.MetricEmitter
{
public MetricsEmittingQueryProcessingPool(
ListeningExecutorService delegate,
+ @Nullable ScheduledExecutorService timeoutService,
ExecutorServiceMonitor executorServiceMonitor
)
{
- super(delegate);
+ super(delegate, timeoutService);
executorServiceMonitor.add(this);
}
diff --git
a/processing/src/main/java/org/apache/druid/query/NoopQueryProcessingPool.java
b/processing/src/main/java/org/apache/druid/query/NoopQueryProcessingPool.java
index efb9a53776a..abf646f9f3e 100644
---
a/processing/src/main/java/org/apache/druid/query/NoopQueryProcessingPool.java
+++
b/processing/src/main/java/org/apache/druid/query/NoopQueryProcessingPool.java
@@ -49,6 +49,16 @@ public class NoopQueryProcessingPool implements
QueryProcessingPool
throw unsupportedException();
}
+ @Override
+ public <T, V> ListenableFuture<T> submitRunnerTask(
+ PrioritizedQueryRunnerCallable<T, V> task,
+ long timeout,
+ TimeUnit unit
+ )
+ {
+ throw unsupportedException();
+ }
+
@Override
public <T> ListenableFuture<T> submit(Callable<T> callable)
{
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContext.java
b/processing/src/main/java/org/apache/druid/query/QueryContext.java
index 096fd4afe9b..42ed66978ff 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContext.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContext.java
@@ -32,7 +32,6 @@ import org.apache.druid.query.filter.InDimFilter;
import org.apache.druid.query.filter.TypedInFilter;
import javax.annotation.Nullable;
-
import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
@@ -545,6 +544,32 @@ public class QueryContext
}
}
+ public long getPerSegmentTimeout()
+ {
+ return getPerSegmentTimeout(QueryContexts.NO_TIMEOUT);
+ }
+
+ public long getPerSegmentTimeout(long defaultPerSegmentTimeout)
+ {
+ final long timeout = getLong(QueryContexts.PER_SEGMENT_TIMEOUT_KEY,
defaultPerSegmentTimeout);
+ if (timeout >= 0) {
+ return timeout;
+ }
+
+ throw new BadQueryContextException(
+ StringUtils.format(
+ "Per-segment timeout [%s] must be a non negative value, but was
[%d]",
+ QueryContexts.PER_SEGMENT_TIMEOUT_KEY,
+ timeout
+ )
+ );
+ }
+
+ public boolean usePerSegmentTimeout()
+ {
+ return getPerSegmentTimeout() != QueryContexts.NO_TIMEOUT;
+ }
+
public void verifyMaxScatterGatherBytes(long maxScatterGatherBytesLimit)
{
long curr = getLong(QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, 0);
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java
b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
index 0619e521b7a..6df392d9571 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
@@ -44,6 +44,7 @@ public class QueryContexts
public static final String PRIORITY_KEY = "priority";
public static final String LANE_KEY = "lane";
public static final String TIMEOUT_KEY = "timeout";
+ public static final String PER_SEGMENT_TIMEOUT_KEY = "perSegmentTimeout";
public static final String MAX_SCATTER_GATHER_BYTES_KEY =
"maxScatterGatherBytes";
public static final String MAX_QUEUED_BYTES_KEY = "maxQueuedBytes";
public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout";
diff --git
a/processing/src/main/java/org/apache/druid/query/QueryProcessingPool.java
b/processing/src/main/java/org/apache/druid/query/QueryProcessingPool.java
index da423c6cddd..9091164a8ee 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryProcessingPool.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryProcessingPool.java
@@ -23,6 +23,8 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.guice.annotations.ExtensionPoint;
+import java.util.concurrent.TimeUnit;
+
/**
* This class implements the logic of how units of query execution run
concurrently. It is used in {@link
QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)}.
* In a most straightforward implementation, each unit will be submitted to an
{@link PrioritizedExecutorService}. Extensions,
@@ -48,4 +50,17 @@ public interface QueryProcessingPool extends
ListeningExecutorService
* @return - Future object for tracking the task completion.
*/
<T, V> ListenableFuture<T>
submitRunnerTask(PrioritizedQueryRunnerCallable<T, V> task);
+
+
+ /**
+ * Submits the query execution task for asynchronous execution, with a
provided timeout.
+ *
+ * @param task - Task to be submitted.
+ * @param <T> - Task result type
+ * @param <V> - Query runner sequence type
+ * @param timeout - Timeout value
+ * @param unit - Timeout unit
+ * @return - Future object for tracking the task completion.
+ */
+ <T, V> ListenableFuture<T>
submitRunnerTask(PrioritizedQueryRunnerCallable<T, V> task, long timeout,
TimeUnit unit);
}
diff --git
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java
index 7ff0258b949..43bec66de7c 100644
---
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java
+++
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java
@@ -176,6 +176,8 @@ public class GroupByMergingQueryRunner implements
QueryRunner<ResultRow>
// query processing together.
final long queryTimeout = queryContext.getTimeout();
final boolean hasTimeout = queryContext.hasTimeout();
+ final boolean hasPerSegmentTimeout = queryContext.usePerSegmentTimeout();
+ final long perSegmentTimeout = queryContext.getPerSegmentTimeout();
final long timeoutAt = System.currentTimeMillis() + queryTimeout;
return new BaseSequence<>(
@@ -247,34 +249,43 @@ public class GroupByMergingQueryRunner implements
QueryRunner<ResultRow>
throw new ISE("Null queryRunner! Looks to be
some segment unmapping action happening");
}
- ListenableFuture<AggregateResult> future =
queryProcessingPool.submitRunnerTask(
- new
AbstractPrioritizedQueryRunnerCallable<>(priority, input)
- {
- @Override
- public AggregateResult call()
- {
- try (
- // These variables are used to close
releasers automatically.
- @SuppressWarnings("unused")
- Closeable bufferReleaser =
mergeBufferHolder.increment();
- @SuppressWarnings("unused")
- Closeable grouperReleaser =
grouperHolder.increment()
- ) {
- // Return true if OK, false if
resources were exhausted.
- return input.run(queryPlusForRunners,
responseContext)
- .accumulate(AggregateResult.ok(),
accumulator);
- }
- catch (QueryInterruptedException |
QueryTimeoutException e) {
- throw e;
- }
- catch (Exception e) {
- log.error(e, "Exception with one of
the sequences!");
- Throwables.propagateIfPossible(e);
- throw new RuntimeException(e);
- }
- }
+ final
AbstractPrioritizedQueryRunnerCallable<AggregateResult, ResultRow> callable =
new AbstractPrioritizedQueryRunnerCallable<>(priority, input)
+ {
+ @Override
+ public AggregateResult call()
+ {
+ try (
+ // These variables are used to close
releasers automatically.
+ @SuppressWarnings("unused")
+ Closeable bufferReleaser =
mergeBufferHolder.increment();
+ @SuppressWarnings("unused")
+ Closeable grouperReleaser =
grouperHolder.increment()
+ ) {
+ // Return true if OK, false if resources
were exhausted.
+ return input.run(queryPlusForRunners,
responseContext)
+
.accumulate(AggregateResult.ok(), accumulator);
}
- );
+ catch (QueryInterruptedException |
QueryTimeoutException e) {
+ throw e;
+ }
+ catch (Exception e) {
+ log.error(e, "Exception with one of the
sequences!");
+ Throwables.throwIfUnchecked(e);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ final ListenableFuture<AggregateResult> future;
+ if (hasPerSegmentTimeout) {
+ future = queryProcessingPool.submitRunnerTask(
+ callable,
+ perSegmentTimeout,
+ TimeUnit.MILLISECONDS
+ );
+ } else {
+ future =
queryProcessingPool.submitRunnerTask(callable);
+ }
if (isSingleThreaded) {
waitForFutureCompletion(
@@ -376,22 +387,24 @@ public class GroupByMergingQueryRunner implements
QueryRunner<ResultRow>
}
}
}
- catch (InterruptedException e) {
- log.warn(e, "Query interrupted, cancelling pending results, query id
[%s]", query.getId());
- GuavaUtils.cancelAll(true, future, futures);
- throw new QueryInterruptedException(e);
- }
- catch (CancellationException e) {
+ catch (InterruptedException | CancellationException e) {
+ log.noStackTrace().warn(e, "Query interrupted, cancelling pending
results for query [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
throw new QueryInterruptedException(e);
}
catch (QueryTimeoutException | TimeoutException e) {
- log.info("Query timeout, cancelling pending results for query id [%s]",
query.getId());
+ log.noStackTrace().warn(e, "Query timeout, cancelling pending results
for query [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
- throw new QueryTimeoutException();
+ throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s]
timed out", query.getId()));
}
catch (ExecutionException e) {
+ log.noStackTrace().warn(e, "Query error, cancelling pending results for
query [%s]", query.getId());
GuavaUtils.cancelAll(true, future, futures);
+ Throwable cause = e.getCause();
+ // Nested per-segment future timeout
+ if (cause instanceof TimeoutException) {
+ throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query
timeout, cancelling pending results for query [%s]. Per-segment timeout
exceeded.", query.getId()));
+ }
throw new RuntimeException(e);
}
}
diff --git
a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
index bf7b3877e51..2047e8ee159 100644
---
a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
+++
b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
@@ -24,7 +24,6 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
-import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.ChainedExecutionQueryRunner;
import org.apache.druid.query.QueryPlus;
import org.apache.druid.query.QueryProcessingPool;
@@ -55,9 +54,6 @@ import java.util.stream.Collectors;
public class SegmentMetadataQueryRunnerFactory implements
QueryRunnerFactory<SegmentAnalysis, SegmentMetadataQuery>
{
- private static final Logger log = new
Logger(SegmentMetadataQueryRunnerFactory.class);
-
-
private final SegmentMetadataQueryQueryToolChest toolChest;
private final QueryWatcher queryWatcher;
diff --git
a/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java
b/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java
index f7401a0dd99..726300d8ae8 100644
---
a/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java
+++
b/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java
@@ -19,6 +19,7 @@
package org.apache.druid.query;
+import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -35,6 +36,7 @@ import
org.apache.druid.query.timeseries.TimeseriesResultValue;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -52,6 +54,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
@@ -60,13 +63,24 @@ import java.util.stream.Collectors;
public class ChainedExecutionQueryRunnerTest
{
private final Lock neverRelease = new ReentrantLock();
+ private QueryProcessingPool processingPool;
@Before
public void setup()
{
neverRelease.lock();
+ processingPool = new ForwardingQueryProcessingPool(
+ Execs.multiThreaded(2, "ChainedExecutionQueryRunnerTestExecutor-%d"),
+
Execs.scheduledSingleThreaded("ChainedExecutionQueryRunnerTestExecutor-Timeout-%d")
+ );
+ }
+
+ @After
+ public void tearDown()
+ {
+ processingPool.shutdown();
}
-
+
@Test(timeout = 60_000L)
public void testQueryCancellation() throws Exception
{
@@ -121,7 +135,7 @@ public class ChainedExecutionQueryRunnerTest
);
ChainedExecutionQueryRunner chainedRunner = new
ChainedExecutionQueryRunner<>(
- new ForwardingQueryProcessingPool(exec),
+ processingPool,
watcher,
Lists.newArrayList(
runners
@@ -245,7 +259,7 @@ public class ChainedExecutionQueryRunnerTest
);
ChainedExecutionQueryRunner chainedRunner = new
ChainedExecutionQueryRunner<>(
- new ForwardingQueryProcessingPool(exec),
+ processingPool,
watcher,
Lists.newArrayList(
runners
@@ -344,6 +358,154 @@ public class ChainedExecutionQueryRunnerTest
Assert.assertEquals(runners, actual);
}
+ @Test(timeout = 10_000L)
+ public void testPerSegmentTimeout()
+ {
+ QueryRunner<Integer> slowRunner = (queryPlus, responseContext) -> {
+ try {
+ Thread.sleep(500);
+ return Sequences.of(2);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ };
+ QueryRunner<Integer> fastRunner = (queryPlus, responseContext) ->
Sequences.of(1);
+
+ QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class);
+ watcher.registerQueryFuture(
+ EasyMock.anyObject(),
+ EasyMock.anyObject()
+ );
+ EasyMock.expectLastCall().anyTimes();
+ EasyMock.replay(watcher);
+
+ ChainedExecutionQueryRunner chainedRunner = new
ChainedExecutionQueryRunner<>(
+ processingPool,
+ watcher,
+ Arrays.asList(slowRunner, fastRunner)
+ );
+ TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+ .dataSource("test")
+ .intervals("2014/2015")
+ .aggregators(Collections.singletonList(new
CountAggregatorFactory("count")))
+ .context(
+ ImmutableMap.of(
+
QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 100L,
+ QueryContexts.TIMEOUT_KEY, 5_000L
+ )
+ )
+ .queryId("test")
+ .build();
+ Sequence seq = chainedRunner.run(QueryPlus.wrap(query));
+
+ List<Integer> results = null;
+ Exception thrown = null;
+ try {
+ results = seq.toList();
+ }
+ catch (Exception e) {
+ thrown = e;
+ }
+
+ Assert.assertNull("No results expected due to timeout", results);
+ Assert.assertNotNull("Exception should be thrown", thrown);
+ Assert.assertTrue(
+ "Should be QueryTimeoutException or caused by it",
+ Throwables.getRootCause(thrown) instanceof QueryTimeoutException
+ );
+ Assert.assertEquals("Query timeout, cancelling pending results for query
[test]. Per-segment timeout exceeded.", thrown.getMessage());
+
+ EasyMock.verify(watcher);
+ }
+
+ @Test(timeout = 5_000L)
+ public void test_perSegmentTimeout_crossQuery() throws Exception
+ {
+ final CountDownLatch slowStarted = new CountDownLatch(2);
+ final CountDownLatch fastStarted = new CountDownLatch(1);
+
+ QueryRunner<Result<TimeseriesResultValue>> slowRunner = (queryPlus,
responseContext) -> {
+ slowStarted.countDown();
+ try {
+ Thread.sleep(60_000L);
+ }
+ catch (InterruptedException e) {
+ throw new QueryInterruptedException(e);
+ }
+ return Sequences.empty();
+ };
+
+ QueryRunner<Result<TimeseriesResultValue>> fastRunner = (queryPlus,
responseContext) -> {
+ fastStarted.countDown();
+ return Sequences.simple(Collections.singletonList(
+ new Result<>(null, new
TimeseriesResultValue(ImmutableMap.of("count", 1)))
+ ));
+ };
+
+ TimeseriesQuery slowQuery = Druids.newTimeseriesQueryBuilder()
+ .dataSource("test")
+ .intervals("2014/2015")
+
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
+ .context(ImmutableMap.of(
+ QueryContexts.TIMEOUT_KEY, 300_000L,
+
QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 1_000L
+ ))
+ .queryId("slow")
+ .build();
+
+ TimeseriesQuery fastQuery = Druids.newTimeseriesQueryBuilder()
+ .dataSource("test")
+ .intervals("2014/2015")
+
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
+ .context(ImmutableMap.of(
+ QueryContexts.TIMEOUT_KEY, 5_000L,
+
QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 3_000L
+ ))
+ .queryId("fast")
+ .build();
+
+ ChainedExecutionQueryRunner<Result<TimeseriesResultValue>>
slowChainedRunner = new ChainedExecutionQueryRunner<>(
+ processingPool,
+ QueryRunnerTestHelper.NOOP_QUERYWATCHER,
+ Arrays.asList(slowRunner, slowRunner)
+ );
+ ChainedExecutionQueryRunner<Result<TimeseriesResultValue>>
fastChainedRunner = new ChainedExecutionQueryRunner<>(
+ processingPool,
+ QueryRunnerTestHelper.NOOP_QUERYWATCHER,
+ Collections.singletonList(fastRunner)
+ );
+
+ ExecutorService exec = Execs.multiThreaded(2, "QueryExecutor-%d");
+ try {
+ Future<List<Result<TimeseriesResultValue>>> slowFuture = exec.submit(()
-> slowChainedRunner.run(QueryPlus.wrap(
+ slowQuery)).toList());
+
+ slowStarted.await();
+
+ Future<List<Result<TimeseriesResultValue>>> fastFuture = exec.submit(()
-> fastChainedRunner.run(QueryPlus.wrap(
+ fastQuery)).toList());
+
+ boolean fastStartedEarly = fastStarted.await(500, TimeUnit.MILLISECONDS);
+ Assert.assertFalse(
+ "Fast query should be blocked and not started while slow queries are
running",
+ fastStartedEarly
+ );
+
+ ExecutionException ex = Assert.assertThrows(ExecutionException.class,
slowFuture::get);
+ Assert.assertTrue(Throwables.getRootCause(ex) instanceof
QueryTimeoutException);
+ Assert.assertEquals("Query timeout, cancelling pending results for query
[slow]. Per-segment timeout exceeded.", ex.getCause().getMessage());
+ Assert.assertEquals(
+ Collections.singletonList(
+ new Result<>(null, new
TimeseriesResultValue(ImmutableMap.of("count", 1)))
+ ), fastFuture.get()
+ );
+ }
+ finally {
+ exec.shutdownNow();
+ }
+ }
+
private class DyingQueryRunner implements QueryRunner<Integer>
{
private final CountDownLatch start;
diff --git
a/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java
b/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java
index 3a8f235ab0f..8e3172812cb 100644
---
a/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java
+++
b/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java
@@ -23,20 +23,36 @@ import
com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.StubServiceEmitter;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import java.util.concurrent.ScheduledExecutorService;
+
@SuppressWarnings("DoNotMock")
public class MetricsEmittingQueryProcessingPoolTest
{
+ private ScheduledExecutorService timeoutService;
+
+ @Before
+ public void setUp()
+ {
+ timeoutService = Mockito.mock(ScheduledExecutorService.class);
+ }
+
@Test
public void testPrioritizedExecutorDelegate()
{
PrioritizedExecutorService service =
Mockito.mock(PrioritizedExecutorService.class);
+ ScheduledExecutorService timeoutService =
Mockito.mock(ScheduledExecutorService.class);
Mockito.when(service.getQueueSize()).thenReturn(10);
Mockito.when(service.getActiveTasks()).thenReturn(2);
ExecutorServiceMonitor monitor = new ExecutorServiceMonitor();
- MetricsEmittingQueryProcessingPool processingPool = new
MetricsEmittingQueryProcessingPool(service, monitor);
+ MetricsEmittingQueryProcessingPool processingPool = new
MetricsEmittingQueryProcessingPool(
+ service,
+ timeoutService,
+ monitor
+ );
Assert.assertSame(service, processingPool.delegate());
final StubServiceEmitter serviceEmitter = new
StubServiceEmitter("service", "host");
@@ -51,7 +67,11 @@ public class MetricsEmittingQueryProcessingPoolTest
{
ListeningExecutorService service =
Mockito.mock(ListeningExecutorService.class);
ExecutorServiceMonitor monitor = new ExecutorServiceMonitor();
- MetricsEmittingQueryProcessingPool processingPool = new
MetricsEmittingQueryProcessingPool(service, monitor);
+ MetricsEmittingQueryProcessingPool processingPool = new
MetricsEmittingQueryProcessingPool(
+ service,
+ timeoutService,
+ monitor
+ );
Assert.assertSame(service, processingPool.delegate());
ServiceEmitter serviceEmitter = Mockito.mock(ServiceEmitter.class);
diff --git
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
index 2453aee882a..2e52da567b6 100644
---
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
+++
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
@@ -31,8 +31,11 @@ import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequences;
import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.QueryContexts;
import org.apache.druid.query.QueryDataSource;
+import org.apache.druid.query.QueryInterruptedException;
+import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.QueryRunner;
import org.apache.druid.query.QueryRunnerTestHelper;
import org.apache.druid.query.QueryTimeoutException;
@@ -56,10 +59,15 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
@RunWith(Parameterized.class)
public class GroupByQueryRunnerFailureTest
{
+ private QueryProcessingPool processingPool;
private static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new
DruidProcessingConfig()
{
@@ -140,6 +148,10 @@ public class GroupByQueryRunnerFailureTest
MERGE_BUFFER_POOL.maxSize(),
MERGE_BUFFER_POOL.getPoolSize()
);
+ processingPool = new ForwardingQueryProcessingPool(
+ Execs.multiThreaded(2, "GroupByQueryRunnerFailureTestExecutor-%d"),
+
Execs.scheduledSingleThreaded("GroupByQueryRunnerFailureTestExecutor-Timeout-%d")
+ );
}
@After
@@ -150,6 +162,7 @@ public class GroupByQueryRunnerFailureTest
MERGE_BUFFER_POOL.maxSize(),
MERGE_BUFFER_POOL.getPoolSize()
);
+ processingPool.shutdown();
}
@AfterClass
@@ -277,7 +290,7 @@ public class GroupByQueryRunnerFailureTest
}
@Test(timeout = 60_000L)
- public void testTimeoutExceptionOnQueryable()
+ public void testTimeoutExceptionOnQueryable_singleThreaded()
{
final GroupByQuery query = GroupByQuery
.builder()
@@ -287,6 +300,7 @@ public class GroupByQueryRunnerFailureTest
.setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
.setGranularity(Granularities.ALL)
.overrideContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1))
+ .queryId("test")
.build();
GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
@@ -313,9 +327,278 @@ public class GroupByQueryRunnerFailureTest
QueryRunner<ResultRow> mergeRunners =
factory.mergeRunners(Execs.directExecutor(), ImmutableList.of(runner,
mockRunner));
- Assert.assertThrows(
+ QueryTimeoutException ex = Assert.assertThrows(
+ QueryTimeoutException.class,
+ () -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners,
query)
+ );
+ // Assert overall timeout is triggered
+ Assert.assertEquals("Query [test] timed out", ex.getMessage());
+ }
+
+ @Test(timeout = 60_000L)
+ public void testTimeoutExceptionOnQueryable_multiThreaded()
+ {
+ final GroupByQuery query = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+ .setGranularity(Granularities.ALL)
+ .overrideContext(Map.of(QueryContexts.TIMEOUT_KEY, 1))
+ .queryId("test")
+ .build();
+
+ GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+ GroupByQueryRunnerTest.DEFAULT_MAPPER,
+ new GroupByQueryConfig()
+ {
+
+ @Override
+ public boolean isSingleThreaded()
+ {
+ return true;
+ }
+ }
+ );
+ QueryRunner<ResultRow> mockRunner = (queryPlus, responseContext) -> {
+ try {
+ Thread.sleep(100);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return Sequences.empty();
+ };
+
+ QueryRunner<ResultRow> mergeRunners = factory.mergeRunners(
+ Execs.directExecutor(),
+ List.of(runner, mockRunner)
+ );
+
+ QueryTimeoutException ex = Assert.assertThrows(
+ QueryTimeoutException.class,
+ () -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners,
query)
+ );
+ // Assert overall timeout is triggered
+ Assert.assertEquals("Query [test] timed out", ex.getMessage());
+ }
+
+ @Test(timeout = 20_000L)
+ public void test_multiThreaded_perSegmentTimeout_causes_queryTimeout()
+ {
+ final GroupByQuery query = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+ .setGranularity(Granularities.ALL)
+ .overrideContext(Map.of(
+ QueryContexts.TIMEOUT_KEY,
+ 300_000,
+ QueryContexts.PER_SEGMENT_TIMEOUT_KEY,
+ 100
+ ))
+ .queryId("test")
+ .build();
+
+ GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+ GroupByQueryRunnerTest.DEFAULT_MAPPER,
+ new GroupByQueryConfig()
+ {
+
+ @Override
+ public boolean isSingleThreaded()
+ {
+ return false;
+ }
+ }
+ );
+ QueryRunner<ResultRow> mockRunner = (queryPlus, responseContext) -> {
+ try {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return Sequences.empty();
+ };
+
+ QueryRunner<ResultRow> mergeRunners = factory.mergeRunners(
+ processingPool,
+ List.of(runner, mockRunner)
+ );
+
+ QueryTimeoutException ex = Assert.assertThrows(
+ QueryTimeoutException.class,
+ () -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners,
query)
+ );
+ // Assert per-segment timeout is triggered
+ Assert.assertEquals("Query timeout, cancelling pending results for query
[test]. Per-segment timeout exceeded.", ex.getMessage());
+ }
+
+ @Test(timeout = 20_000L)
+ public void test_singleThreaded_perSegmentTimeout_causes_queryTimeout()
+ {
+ final GroupByQuery query = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+ .setGranularity(Granularities.ALL)
+ .overrideContext(Map.of(
+ QueryContexts.TIMEOUT_KEY,
+ 300_000,
+ QueryContexts.PER_SEGMENT_TIMEOUT_KEY,
+ 100
+ ))
+ .queryId("test")
+ .build();
+
+ GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+ GroupByQueryRunnerTest.DEFAULT_MAPPER,
+ new GroupByQueryConfig()
+ {
+
+ @Override
+ public boolean isSingleThreaded()
+ {
+ return true;
+ }
+ }
+ );
+ QueryRunner<ResultRow> mockRunner = (queryPlus, responseContext) -> {
+ try {
+ Thread.sleep(1000);
+ }
+ catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ return Sequences.empty();
+ };
+
+ QueryRunner<ResultRow> mergeRunners = factory.mergeRunners(
+ processingPool,
+ List.of(runner, mockRunner)
+ );
+
+ QueryTimeoutException ex = Assert.assertThrows(
QueryTimeoutException.class,
() -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners,
query)
);
+ // Assert per-segment timeout is triggered
+ Assert.assertEquals("Query timeout, cancelling pending results for query
[test]. Per-segment timeout exceeded.", ex.getMessage());
+ }
+
+ @Test(timeout = 5_000L)
+ public void test_perSegmentTimeout_crossQuery() throws Exception
+ {
+ GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+ GroupByQueryRunnerTest.DEFAULT_MAPPER,
+ new GroupByQueryConfig()
+ {
+ @Override
+ public boolean isSingleThreaded()
+ {
+ return false;
+ }
+ }
+ );
+
+ final GroupByQuery slowQuery = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+ .setGranularity(Granularities.ALL)
+ .overrideContext(Map.of(
+ QueryContexts.TIMEOUT_KEY, 300_000,
+ QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 1_000
+ ))
+ .queryId("slow")
+ .build();
+
+ final GroupByQuery fastQuery = GroupByQuery
+ .builder()
+ .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+ .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+ .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+ .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+ .setGranularity(Granularities.ALL)
+ .overrideContext(Map.of(
+ QueryContexts.TIMEOUT_KEY, 5_000,
+ QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 100
+ ))
+ .queryId("fast")
+ .build();
+
+ CountDownLatch slowStart = new CountDownLatch(2);
+ CountDownLatch fastStart = new CountDownLatch(1);
+
+ QueryRunner<ResultRow> signalingSlowRunner = (queryPlus, responseContext)
-> {
+ slowStart.countDown();
+ try {
+ Thread.sleep(60_000L);
+ }
+ catch (InterruptedException e) {
+ throw new QueryInterruptedException(e);
+ }
+ return Sequences.empty();
+ };
+ QueryRunner<ResultRow> fastRunner = (queryPlus, responseContext) -> {
+ fastStart.countDown();
+ return Sequences.empty();
+ };
+
+ AtomicReference<Throwable> thrown = new AtomicReference<>();
+ Thread slowQueryThread = new Thread(() -> {
+ try {
+ GroupByQueryRunnerTestHelper.runQuery(
+ factory,
+ factory.mergeRunners(
+ processingPool,
+ List.of(signalingSlowRunner, signalingSlowRunner)
+ ), slowQuery
+ );
+ }
+ catch (QueryTimeoutException e) {
+ thrown.set(e);
+ return;
+ }
+ Assert.fail("Expected QueryTimeoutException for slow query");
+ });
+
+ slowQueryThread.start();
+ slowStart.await();
+
+ Thread fastQueryThread = new Thread(() -> {
+ try {
+ GroupByQueryRunnerTestHelper.runQuery(
+ factory,
+ factory.mergeRunners(
+ processingPool,
+ List.of(fastRunner)
+ ), fastQuery
+ );
+ }
+ catch (Exception e) {
+ Assert.fail("Expected fast query to succeed");
+ }
+ });
+
+ fastQueryThread.start();
+ boolean fastStartedEarly = fastStart.await(500, TimeUnit.MILLISECONDS);
+ Assert.assertFalse(
+ "Fast query should be blocked and not started while slow queries are
running",
+ fastStartedEarly
+ );
+
+ fastQueryThread.join();
+ slowQueryThread.join();
+
+ Assert.assertEquals("Query timeout, cancelling pending results for query
[slow]. Per-segment timeout exceeded.", thrown.get().getMessage());
}
}
diff --git
a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
index e737032015e..6362f585333 100644
--- a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
@@ -84,7 +84,7 @@ public class BrokerProcessingModule implements Module
DruidProcessingConfig config
)
{
- return new ForwardingQueryProcessingPool(Execs.dummy());
+ return new ForwardingQueryProcessingPool(Execs.dummy(), null);
}
@Provides
diff --git
a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
index 2cffbcfae4e..33bec735d5d 100644
--- a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
@@ -38,6 +38,7 @@ import org.apache.druid.guice.annotations.Global;
import org.apache.druid.guice.annotations.Merging;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.Lifecycle;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.offheap.OffheapBufferGenerator;
@@ -157,6 +158,7 @@ public class DruidProcessingModule implements Module
lifecycle,
config
),
+ config.getNumTimeoutThreads() > 0 ?
ScheduledExecutors.fixed(config.getNumTimeoutThreads(),
"PrioritizedExecutorService-Timeout-%d") : null,
executorServiceMonitor
);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]