This is an automated email from the ASF dual-hosted git repository.

tuglu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 78a1daea3f1 Implement per-segment query timeout on data nodes (#18148)
78a1daea3f1 is described below

commit 78a1daea3f1e0283c03c5e7abcb34ce1975ac76d
Author: jtuglu1 <[email protected]>
AuthorDate: Tue Oct 7 17:56:53 2025 -0700

    Implement per-segment query timeout on data nodes (#18148)
    
    Adds settable per-segment processing query timeout on data nodes.
    
    This feature can be enabled via setting `perSegmentTimeout` query context 
parameter in the query context and by setting 
`druid.processing.numTimeoutThreads` to a small number (e.g. 1-3) on all data 
nodes where you want this feature enabled.
---
 docs/configuration/index.md                        |   3 +
 docs/querying/query-context-reference.md           |   1 +
 .../SeekableStreamAppenderatorConfigTest.java      |   1 +
 .../druid/query/ChainedExecutionQueryRunner.java   | 109 ++++----
 .../druid/query/DirectQueryProcessingPool.java     |  14 +
 .../apache/druid/query/DruidProcessingConfig.java  |  15 +-
 .../druid/query/ForwardingQueryProcessingPool.java |  46 +++-
 .../query/MetricsEmittingQueryProcessingPool.java  |   6 +-
 .../druid/query/NoopQueryProcessingPool.java       |  10 +
 .../java/org/apache/druid/query/QueryContext.java  |  27 +-
 .../java/org/apache/druid/query/QueryContexts.java |   1 +
 .../apache/druid/query/QueryProcessingPool.java    |  15 ++
 .../epinephelinae/GroupByMergingQueryRunner.java   |  83 +++---
 .../SegmentMetadataQueryRunnerFactory.java         |   4 -
 .../query/ChainedExecutionQueryRunnerTest.java     | 168 +++++++++++-
 .../MetricsEmittingQueryProcessingPoolTest.java    |  24 +-
 .../groupby/GroupByQueryRunnerFailureTest.java     | 287 ++++++++++++++++++++-
 .../apache/druid/guice/BrokerProcessingModule.java |   2 +-
 .../apache/druid/guice/DruidProcessingModule.java  |   2 +
 19 files changed, 721 insertions(+), 97 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index c8e1d200cfb..3198f6529f4 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -1371,6 +1371,7 @@ Processing properties set on the Middle Manager are 
passed through to Peons.
 |`druid.processing.formatString`|Realtime and Historical processes use this 
format string to name their processing threads.|processing-%s|
 |`druid.processing.numMergeBuffers`|The number of direct memory buffers 
available for merging query results. The buffers are sized by 
`druid.processing.buffer.sizeBytes`. This property is effectively a concurrency 
limit for queries that require merging buffers. If you are using any queries 
that require merge buffers (currently, just groupBy) then you should have at 
least two of these.|`max(2, druid.processing.numThreads / 4)`|
 |`druid.processing.numThreads`|The number of processing threads to have 
available for parallel processing of segments. Our rule of thumb is `num_cores 
- 1`, which means that even under heavy load there will still be one core 
available to do background tasks like talking with ZooKeeper and pulling down 
segments. If only one core is available, this property defaults to the value 
`1`.|Number of cores - 1 (or 1)|
+|`druid.processing.numTimeoutThreads`|The number of processing threads to have 
available for handling per-segment query timeouts. Setting this value to `0` 
removes the ability to service per-segment timeouts, irrespective of 
`perSegmentTimeout` query context parameter. As these threads are just 
servicing timers, it's recommended to set this value to some small percent 
(e.g. 5%) of the total query processing cores available to the peon.|0|
 |`druid.processing.fifo`|Enables the processing queue to treat tasks of equal 
priority in a FIFO manner.|`true`|
 |`druid.processing.tmpDir`|Path where temporary files created while processing 
a query should be stored. If specified, this configuration takes priority over 
the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
 |`druid.processing.intermediaryData.storage.type`|Storage type for 
intermediary segments of data shuffle between native parallel index tasks. <br 
/>Set to `local` to store segment files in the local storage of the Middle 
Manager or Indexer. <br />Set to `deepstore` to use configured deep storage for 
better fault tolerance during rolling updates. When the storage type is 
`deepstore`, Druid stores the data in the `shuffle-data` directory under the 
configured deep storage path. Druid does n [...]
@@ -1514,6 +1515,7 @@ Druid uses Jetty to serve HTTP requests.
 |`druid.processing.formatString`|Indexer processes use this format string to 
name their processing threads.|processing-%s|
 |`druid.processing.numMergeBuffers`|The number of direct memory buffers 
available for merging query results. The buffers are sized by 
`druid.processing.buffer.sizeBytes`. This property is effectively a concurrency 
limit for queries that require merging buffers. If you are using any queries 
that require merge buffers (currently, just groupBy) then you should have at 
least two of these.|`max(2, druid.processing.numThreads / 4)`|
 |`druid.processing.numThreads`|The number of processing threads to have 
available for parallel processing of segments. Our rule of thumb is `num_cores 
- 1`, which means that even under heavy load there will still be one core 
available to do background tasks like talking with ZooKeeper and pulling down 
segments. If only one core is available, this property defaults to the value 
`1`.|Number of cores - 1 (or 1)|
+|`druid.processing.numTimeoutThreads`|The number of processing threads to have 
available for handling per-segment query timeouts. Setting this value to `0` 
removes the ability to service per-segment timeouts, irrespective of 
`perSegmentTimeout` query context parameter. As these threads are just 
servicing timers, it's recommended to set this value to some small percent 
(e.g. 5%) of the total query processing cores available to the indexer.|0|
 |`druid.processing.fifo`|If the processing queue should treat tasks of equal 
priority in a FIFO manner|`true`|
 |`druid.processing.tmpDir`|Path where temporary files created while processing 
a query should be stored. If specified, this configuration takes priority over 
the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
 
@@ -1622,6 +1624,7 @@ Druid uses Jetty to serve HTTP requests.
 |`druid.processing.formatString`|Realtime and Historical processes use this 
format string to name their processing threads.|processing-%s|
 |`druid.processing.numMergeBuffers`|The number of direct memory buffers 
available for merging query results. The buffers are sized by 
`druid.processing.buffer.sizeBytes`. This property is effectively a concurrency 
limit for queries that require merging buffers. If you are using any queries 
that require merge buffers (currently, just groupBy) then you should have at 
least two of these.|`max(2, druid.processing.numThreads / 4)`|
 |`druid.processing.numThreads`|The number of processing threads to have 
available for parallel processing of segments. Our rule of thumb is `num_cores 
- 1`, which means that even under heavy load there will still be one core 
available to do background tasks like talking with ZooKeeper and pulling down 
segments. If only one core is available, this property defaults to the value 
`1`.|Number of cores - 1 (or 1)|
+|`druid.processing.numTimeoutThreads`|The number of processing threads to have 
available for handling per-segment query timeouts. Setting this value to `0` 
removes the ability to service per-segment timeouts, irrespective of 
`perSegmentTimeout` query context parameter. As these threads are just 
servicing timers, it's recommended to set this value to some small percent 
(e.g. 5%) of the total query processing cores available to the historical.|0|
 |`druid.processing.fifo`|If the processing queue should treat tasks of equal 
priority in a FIFO manner|`true`|
 |`druid.processing.tmpDir`|Path where temporary files created while processing 
a query should be stored. If specified, this configuration takes priority over 
the default `java.io.tmpdir` path.|path represented by `java.io.tmpdir`|
 
diff --git a/docs/querying/query-context-reference.md 
b/docs/querying/query-context-reference.md
index 52515236620..a7532b265d0 100644
--- a/docs/querying/query-context-reference.md
+++ b/docs/querying/query-context-reference.md
@@ -44,6 +44,7 @@ Unless otherwise noted, the following parameters apply to all 
query types, and t
 |Parameter          |Default                                 | Description     
     |
 
|-------------------|----------------------------------------|----------------------|
 |`timeout`          | `druid.server.http.defaultQueryTimeout`| Query timeout 
in millis, beyond which unfinished queries will be cancelled. 0 timeout means 
`no timeout` (up to the server-side maximum query timeout, 
`druid.server.http.maxQueryTimeout`). To set the default timeout and maximum 
timeout, see [Broker configuration](../configuration/index.md#broker) |
+|`perSegmentTimeout`| `null`                                 | Per-segment 
processing timeout in millis, beyond which unfinished queries will be 
cancelled. Should be ≤ `timeout`. 0 `perSegmentTimeout` means `no per-segment 
timeout`. Generally, a standard default should be O(X seconds). A cluster-wide 
default value for this query context can be specified via 
`druid.query.default.context.perSegmentTimeout`.|
 |`priority`         | The default priority is one of the following: 
<ul><li>Value of `priority` in the query context, if set</li><li>The value of 
the runtime property `druid.query.default.context.priority`, if set and not 
null</li><li>`0` if the priority is not set in the query context or runtime 
properties</li></ul>| Query priority. Queries with higher priority get 
precedence for computational resources.|
 |`lane`             | `null`                                 | Query lane, 
used to control usage limits on classes of queries. See [Broker 
configuration](../configuration/index.md#broker) for more details.|
 |`queryId`          | auto-generated                         | Unique 
identifier given to this query. If a query ID is set or known, this can be used 
to cancel the query |
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
index 2cc2a1b6c25..85c4f2ea5eb 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamAppenderatorConfigTest.java
@@ -192,6 +192,7 @@ public class SeekableStreamAppenderatorConfigTest
       super(
           null,
           numThreads,
+          null,
           numMergeBuffers,
           null,
           null,
diff --git 
a/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java
 
b/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java
index 4cc5a9fbd2d..74f2ffc634a 100644
--- 
a/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java
+++ 
b/processing/src/main/java/org/apache/druid/query/ChainedExecutionQueryRunner.java
@@ -81,6 +81,10 @@ public class ChainedExecutionQueryRunner<T> implements 
QueryRunner<T>
     final int priority = query.context().getPriority();
     final Ordering ordering = query.getResultOrdering();
     final QueryPlus<T> threadSafeQueryPlus = 
queryPlus.withoutThreadUnsafeState();
+
+    final QueryContext context = query.context();
+    final boolean usePerSegmentTimeout = context.usePerSegmentTimeout();
+    final long perSegmentTimeout = context.getPerSegmentTimeout();
     return new BaseSequence<>(
         new BaseSequence.IteratorMaker<>()
         {
@@ -97,42 +101,54 @@ public class ChainedExecutionQueryRunner<T> implements 
QueryRunner<T>
                             throw new ISE("Null queryRunner! Looks to be some 
segment unmapping action happening");
                           }
 
-                          return queryProcessingPool.submitRunnerTask(
-                              new 
AbstractPrioritizedQueryRunnerCallable<>(priority, input)
-                              {
-                                @Override
-                                public Iterable<T> call()
-                                {
-                                  try {
-                                    Sequence<T> result = 
input.run(threadSafeQueryPlus, responseContext);
-                                    if (result == null) {
-                                      throw new ISE("Got a null result! 
Segments are missing!");
-                                    }
-
-                                    List<T> retVal = result.toList();
-                                    if (retVal == null) {
-                                      throw new ISE("Got a null list of 
results");
-                                    }
-
-                                    return retVal;
-                                  }
-                                  catch (QueryInterruptedException e) {
-                                    throw new RuntimeException(e);
-                                  }
-                                  catch (QueryTimeoutException e) {
-                                    throw e;
-                                  }
-                                  catch (Exception e) {
-                                    if (query.context().isDebug()) {
-                                      log.error(e, "Exception with one of the 
sequences!");
-                                    } else {
-                                      log.noStackTrace().error(e, "Exception 
with one of the sequences!");
-                                    }
-                                    Throwables.propagateIfPossible(e);
-                                    throw new RuntimeException(e);
-                                  }
+                          final 
AbstractPrioritizedQueryRunnerCallable<Iterable<T>, T> callable = new 
AbstractPrioritizedQueryRunnerCallable<>(
+                              priority,
+                              input
+                          )
+                          {
+                            @Override
+                            public Iterable<T> call()
+                            {
+                              try {
+                                Sequence<T> result = 
input.run(threadSafeQueryPlus, responseContext);
+                                if (result == null) {
+                                  throw new ISE("Got a null result! Segments 
are missing!");
+                                }
+
+                                List<T> retVal = result.toList();
+                                if (retVal == null) {
+                                  throw new ISE("Got a null list of results");
+                                }
+
+                                return retVal;
+                              }
+                              catch (QueryInterruptedException e) {
+                                throw new RuntimeException(e);
+                              }
+                              catch (QueryTimeoutException e) {
+                                throw e;
+                              }
+                              catch (Exception e) {
+                                if (query.context().isDebug()) {
+                                  log.error(e, "Exception with one of the 
sequences!");
+                                } else {
+                                  log.noStackTrace().error(e, "Exception with 
one of the sequences!");
                                 }
-                              });
+                                Throwables.throwIfUnchecked(e);
+                                throw new RuntimeException(e);
+                              }
+                            }
+                          };
+
+                          if (usePerSegmentTimeout) {
+                            return queryProcessingPool.submitRunnerTask(
+                                callable,
+                                perSegmentTimeout,
+                                TimeUnit.MILLISECONDS
+                            );
+                          } else {
+                            return 
queryProcessingPool.submitRunnerTask(callable);
+                          }
                         }
                     )
                 );
@@ -141,7 +157,6 @@ public class ChainedExecutionQueryRunner<T> implements 
QueryRunner<T>
             queryWatcher.registerQueryFuture(query, future);
 
             try {
-              final QueryContext context = query.context();
               return new MergeIterable<>(
                   context.hasTimeout() ?
                       future.get(context.getTimeout(), TimeUnit.MILLISECONDS) :
@@ -149,24 +164,26 @@ public class ChainedExecutionQueryRunner<T> implements 
QueryRunner<T>
                   ordering.nullsFirst()
               ).iterator();
             }
-            catch (InterruptedException e) {
-              log.noStackTrace().warn(e, "Query interrupted, cancelling 
pending results, query id [%s]", query.getId());
-              //Note: canceling combinedFuture first so that it can complete 
with INTERRUPTED as its final state. See 
ChainedExecutionQueryRunnerTest.testQueryTimeout()
+            catch (CancellationException | InterruptedException e) {
+              log.noStackTrace().warn(e, "Query interrupted, cancelling 
pending results for query [%s]", query.getId());
               GuavaUtils.cancelAll(true, future, futures);
               throw new QueryInterruptedException(e);
             }
-            catch (CancellationException e) {
-              throw new QueryInterruptedException(e);
-            }
-            catch (TimeoutException e) {
-              log.warn("Query timeout, cancelling pending results for query id 
[%s]", query.getId());
+            catch (TimeoutException | QueryTimeoutException e) {
+              log.noStackTrace().warn(e, "Query timeout, cancelling pending 
results for query [%s]", query.getId());
               GuavaUtils.cancelAll(true, future, futures);
               throw new 
QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] timed out", 
query.getId()));
             }
             catch (ExecutionException e) {
+              log.noStackTrace().warn(e, "Query error, cancelling pending 
results for query [%s]", query.getId());
               GuavaUtils.cancelAll(true, future, futures);
-              Throwables.propagateIfPossible(e.getCause());
-              throw new RuntimeException(e.getCause());
+              Throwable cause = e.getCause();
+              // Nested per-segment future timeout
+              if (cause instanceof TimeoutException) {
+                throw new 
QueryTimeoutException(StringUtils.nonStrictFormat("Query timeout, cancelling 
pending results for query [%s]. Per-segment timeout exceeded.", query.getId()));
+              }
+              Throwables.throwIfUnchecked(cause);
+              throw new RuntimeException(cause);
             }
           }
 
diff --git 
a/processing/src/main/java/org/apache/druid/query/DirectQueryProcessingPool.java
 
b/processing/src/main/java/org/apache/druid/query/DirectQueryProcessingPool.java
index c24b0157af6..054ba1bd616 100644
--- 
a/processing/src/main/java/org/apache/druid/query/DirectQueryProcessingPool.java
+++ 
b/processing/src/main/java/org/apache/druid/query/DirectQueryProcessingPool.java
@@ -24,6 +24,8 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import org.apache.druid.java.util.common.concurrent.Execs;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * {@link QueryProcessingPool} wrapper over {@link Execs#directExecutor()}
  */
@@ -42,6 +44,18 @@ public class DirectQueryProcessingPool extends 
ForwardingListeningExecutorServic
     return delegate().submit(task);
   }
 
+  @Override
+  public <T, V> ListenableFuture<T> submitRunnerTask(
+      PrioritizedQueryRunnerCallable<T, V> task,
+      long timeout,
+      TimeUnit unit
+  )
+  {
+    // As the thread that will call .get() on this future will be the one 
doing the execution, having
+    // a supported timeout here will not work since the executing thread will 
not receive the timeout interrupt.
+    return submitRunnerTask(task);
+  }
+
   @Override
   public ListeningExecutorService delegate()
   {
diff --git 
a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java 
b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
index c469226a62e..7a538910fe5 100644
--- a/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
+++ b/processing/src/main/java/org/apache/druid/query/DruidProcessingConfig.java
@@ -52,6 +52,9 @@ public class DruidProcessingConfig implements ColumnConfig
   private final DruidProcessingBufferConfig buffer;
   @JsonProperty
   private final DruidProcessingIndexesConfig indexes;
+  @JsonProperty
+  private final int numTimeoutThreads;
+
   private final AtomicReference<Integer> computedBufferSizeBytes = new 
AtomicReference<>();
   private final boolean numThreadsConfigured;
   private final boolean numMergeBuffersConfigured;
@@ -60,6 +63,7 @@ public class DruidProcessingConfig implements ColumnConfig
   public DruidProcessingConfig(
       @JsonProperty("formatString") @Nullable String formatString,
       @JsonProperty("numThreads") @Nullable Integer numThreads,
+      @JsonProperty("numTimeoutThreads") @Nullable Integer numTimeoutThreads,
       @JsonProperty("numMergeBuffers") @Nullable Integer numMergeBuffers,
       @JsonProperty("fifo") @Nullable Boolean fifo,
       @JsonProperty("tmpDir") @Nullable String tmpDir,
@@ -73,6 +77,10 @@ public class DruidProcessingConfig implements ColumnConfig
         numThreads,
         Math.max(runtimeInfo.getAvailableProcessors() - 1, 1)
     );
+    this.numTimeoutThreads = Configs.valueOrDefault(
+        numTimeoutThreads,
+        0
+    );
     this.numMergeBuffers = Configs.valueOrDefault(numMergeBuffers, Math.max(2, 
this.numThreads / 4));
     this.fifo = fifo == null || fifo;
     this.tmpDir = Configs.valueOrDefault(tmpDir, 
System.getProperty("java.io.tmpdir"));
@@ -87,7 +95,7 @@ public class DruidProcessingConfig implements ColumnConfig
   @VisibleForTesting
   public DruidProcessingConfig()
   {
-    this(null, null, null, null, null, null, null, JvmUtils.getRuntimeInfo());
+    this(null, null, null, null, null, null, null, null, 
JvmUtils.getRuntimeInfo());
   }
 
   private void initializeBufferSize(RuntimeInfo runtimeInfo)
@@ -143,6 +151,11 @@ public class DruidProcessingConfig implements ColumnConfig
     return numThreads;
   }
 
+  public int getNumTimeoutThreads()
+  {
+    return numTimeoutThreads;
+  }
+
   public int getNumMergeBuffers()
   {
     return numMergeBuffers;
diff --git 
a/processing/src/main/java/org/apache/druid/query/ForwardingQueryProcessingPool.java
 
b/processing/src/main/java/org/apache/druid/query/ForwardingQueryProcessingPool.java
index fdb98756c53..532cdd85f4d 100644
--- 
a/processing/src/main/java/org/apache/druid/query/ForwardingQueryProcessingPool.java
+++ 
b/processing/src/main/java/org/apache/druid/query/ForwardingQueryProcessingPool.java
@@ -19,24 +19,42 @@
 
 package org.apache.druid.query;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 
+import javax.annotation.Nullable;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * Default implementation of {@link QueryProcessingPool} that just forwards 
operations, including query execution tasks,
  * to an underlying {@link ExecutorService}
+ * Exposes a method {@link #submitRunnerTask(PrioritizedQueryRunnerCallable, 
long, TimeUnit)} which allows execution tasks to be serviced a custom timeout.
  */
 public class ForwardingQueryProcessingPool extends 
ForwardingListeningExecutorService implements QueryProcessingPool
 {
   private final ListeningExecutorService delegate;
+  @Nullable private final ScheduledExecutorService timeoutService;
 
-  public ForwardingQueryProcessingPool(ExecutorService executorService)
+  public ForwardingQueryProcessingPool(ExecutorService executorService, 
@Nullable ScheduledExecutorService timeoutService)
   {
     this.delegate = MoreExecutors.listeningDecorator(executorService);
+    if (timeoutService != null) {
+      this.timeoutService = MoreExecutors.listeningDecorator(timeoutService);
+    } else {
+      this.timeoutService = null;
+    }
+  }
+
+  @VisibleForTesting
+  public ForwardingQueryProcessingPool(ExecutorService executorService)
+  {
+    this(executorService, null);
   }
 
   @Override
@@ -45,10 +63,36 @@ public class ForwardingQueryProcessingPool extends 
ForwardingListeningExecutorSe
     return delegate().submit(task);
   }
 
+  @Override
+  public <T, V> ListenableFuture<T> submitRunnerTask(
+      PrioritizedQueryRunnerCallable<T, V> task,
+      long timeout,
+      TimeUnit unit
+  )
+  {
+    if (timeoutService != null) {
+      return Futures.withTimeout(
+          delegate().submit(task),
+          timeout,
+          unit,
+          timeoutService
+      );
+    }
+    return submitRunnerTask(task);
+  }
+
   @Override
   protected ListeningExecutorService delegate()
   {
     return delegate;
   }
 
+  @Override
+  public void shutdown()
+  {
+    super.shutdown(); // shutdown delegate()
+    if (timeoutService != null) {
+      timeoutService.shutdown();
+    }
+  }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryProcessingPool.java
 
b/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryProcessingPool.java
index 1f831d7953e..e8d1b36815b 100644
--- 
a/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryProcessingPool.java
+++ 
b/processing/src/main/java/org/apache/druid/query/MetricsEmittingQueryProcessingPool.java
@@ -23,16 +23,20 @@ import 
com.google.common.util.concurrent.ListeningExecutorService;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
 
+import javax.annotation.Nullable;
+import java.util.concurrent.ScheduledExecutorService;
+
 public class MetricsEmittingQueryProcessingPool extends 
ForwardingQueryProcessingPool
     implements ExecutorServiceMonitor.MetricEmitter
 {
 
   public MetricsEmittingQueryProcessingPool(
       ListeningExecutorService delegate,
+      @Nullable ScheduledExecutorService timeoutService,
       ExecutorServiceMonitor executorServiceMonitor
   )
   {
-    super(delegate);
+    super(delegate, timeoutService);
     executorServiceMonitor.add(this);
   }
 
diff --git 
a/processing/src/main/java/org/apache/druid/query/NoopQueryProcessingPool.java 
b/processing/src/main/java/org/apache/druid/query/NoopQueryProcessingPool.java
index efb9a53776a..abf646f9f3e 100644
--- 
a/processing/src/main/java/org/apache/druid/query/NoopQueryProcessingPool.java
+++ 
b/processing/src/main/java/org/apache/druid/query/NoopQueryProcessingPool.java
@@ -49,6 +49,16 @@ public class NoopQueryProcessingPool implements 
QueryProcessingPool
     throw unsupportedException();
   }
 
+  @Override
+  public <T, V> ListenableFuture<T> submitRunnerTask(
+      PrioritizedQueryRunnerCallable<T, V> task,
+      long timeout,
+      TimeUnit unit
+  )
+  {
+    throw unsupportedException();
+  }
+
   @Override
   public <T> ListenableFuture<T> submit(Callable<T> callable)
   {
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContext.java 
b/processing/src/main/java/org/apache/druid/query/QueryContext.java
index 096fd4afe9b..42ed66978ff 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContext.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContext.java
@@ -32,7 +32,6 @@ import org.apache.druid.query.filter.InDimFilter;
 import org.apache.druid.query.filter.TypedInFilter;
 
 import javax.annotation.Nullable;
-
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Collections;
@@ -545,6 +544,32 @@ public class QueryContext
     }
   }
 
+  public long getPerSegmentTimeout()
+  {
+    return getPerSegmentTimeout(QueryContexts.NO_TIMEOUT);
+  }
+
+  public long getPerSegmentTimeout(long defaultPerSegmentTimeout)
+  {
+    final long timeout = getLong(QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 
defaultPerSegmentTimeout);
+    if (timeout >= 0) {
+      return timeout;
+    }
+
+    throw new BadQueryContextException(
+        StringUtils.format(
+            "Per-segment timeout [%s] must be a non negative value, but was 
[%d]",
+            QueryContexts.PER_SEGMENT_TIMEOUT_KEY,
+            timeout
+        )
+    );
+  }
+
+  public boolean usePerSegmentTimeout()
+  {
+    return getPerSegmentTimeout() != QueryContexts.NO_TIMEOUT;
+  }
+
   public void verifyMaxScatterGatherBytes(long maxScatterGatherBytesLimit)
   {
     long curr = getLong(QueryContexts.MAX_SCATTER_GATHER_BYTES_KEY, 0);
diff --git a/processing/src/main/java/org/apache/druid/query/QueryContexts.java 
b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
index 0619e521b7a..6df392d9571 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryContexts.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryContexts.java
@@ -44,6 +44,7 @@ public class QueryContexts
   public static final String PRIORITY_KEY = "priority";
   public static final String LANE_KEY = "lane";
   public static final String TIMEOUT_KEY = "timeout";
+  public static final String PER_SEGMENT_TIMEOUT_KEY = "perSegmentTimeout";
   public static final String MAX_SCATTER_GATHER_BYTES_KEY = 
"maxScatterGatherBytes";
   public static final String MAX_QUEUED_BYTES_KEY = "maxQueuedBytes";
   public static final String DEFAULT_TIMEOUT_KEY = "defaultTimeout";
diff --git 
a/processing/src/main/java/org/apache/druid/query/QueryProcessingPool.java 
b/processing/src/main/java/org/apache/druid/query/QueryProcessingPool.java
index da423c6cddd..9091164a8ee 100644
--- a/processing/src/main/java/org/apache/druid/query/QueryProcessingPool.java
+++ b/processing/src/main/java/org/apache/druid/query/QueryProcessingPool.java
@@ -23,6 +23,8 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import org.apache.druid.guice.annotations.ExtensionPoint;
 
+import java.util.concurrent.TimeUnit;
+
 /**
  * This class implements the logic of how units of query execution run 
concurrently. It is used in {@link 
QueryRunnerFactory#mergeRunners(QueryProcessingPool, Iterable)}.
  * In a most straightforward implementation, each unit will be submitted to an 
{@link PrioritizedExecutorService}. Extensions,
@@ -48,4 +50,17 @@ public interface QueryProcessingPool extends 
ListeningExecutorService
    * @return - Future object for tracking the task completion.
    */
   <T, V> ListenableFuture<T> 
submitRunnerTask(PrioritizedQueryRunnerCallable<T, V> task);
+
+
+  /**
+   * Submits the query execution task for asynchronous execution, with a 
provided timeout.
+   *
+   * @param task    - Task to be submitted.
+   * @param <T>     - Task result type
+   * @param <V>     - Query runner sequence type
+   * @param timeout - Timeout value
+   * @param unit   - Timeout unit
+   * @return - Future object for tracking the task completion.
+   */
+  <T, V> ListenableFuture<T> 
submitRunnerTask(PrioritizedQueryRunnerCallable<T, V> task, long timeout, 
TimeUnit unit);
 }
diff --git 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java
 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java
index 7ff0258b949..43bec66de7c 100644
--- 
a/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java
+++ 
b/processing/src/main/java/org/apache/druid/query/groupby/epinephelinae/GroupByMergingQueryRunner.java
@@ -176,6 +176,8 @@ public class GroupByMergingQueryRunner implements 
QueryRunner<ResultRow>
     // query processing together.
     final long queryTimeout = queryContext.getTimeout();
     final boolean hasTimeout = queryContext.hasTimeout();
+    final boolean hasPerSegmentTimeout = queryContext.usePerSegmentTimeout();
+    final long perSegmentTimeout = queryContext.getPerSegmentTimeout();
     final long timeoutAt = System.currentTimeMillis() + queryTimeout;
 
     return new BaseSequence<>(
@@ -247,34 +249,43 @@ public class GroupByMergingQueryRunner implements 
QueryRunner<ResultRow>
                                 throw new ISE("Null queryRunner! Looks to be 
some segment unmapping action happening");
                               }
 
-                              ListenableFuture<AggregateResult> future = 
queryProcessingPool.submitRunnerTask(
-                                  new 
AbstractPrioritizedQueryRunnerCallable<>(priority, input)
-                                  {
-                                    @Override
-                                    public AggregateResult call()
-                                    {
-                                      try (
-                                          // These variables are used to close 
releasers automatically.
-                                          @SuppressWarnings("unused")
-                                          Closeable bufferReleaser = 
mergeBufferHolder.increment();
-                                          @SuppressWarnings("unused")
-                                          Closeable grouperReleaser = 
grouperHolder.increment()
-                                      ) {
-                                        // Return true if OK, false if 
resources were exhausted.
-                                        return input.run(queryPlusForRunners, 
responseContext)
-                                            .accumulate(AggregateResult.ok(), 
accumulator);
-                                      }
-                                      catch (QueryInterruptedException | 
QueryTimeoutException e) {
-                                        throw e;
-                                      }
-                                      catch (Exception e) {
-                                        log.error(e, "Exception with one of 
the sequences!");
-                                        Throwables.propagateIfPossible(e);
-                                        throw new RuntimeException(e);
-                                      }
-                                    }
+                              final 
AbstractPrioritizedQueryRunnerCallable<AggregateResult, ResultRow> callable = 
new AbstractPrioritizedQueryRunnerCallable<>(priority, input)
+                              {
+                                @Override
+                                public AggregateResult call()
+                                {
+                                  try (
+                                      // These variables are used to close 
releasers automatically.
+                                      @SuppressWarnings("unused")
+                                      Closeable bufferReleaser = 
mergeBufferHolder.increment();
+                                      @SuppressWarnings("unused")
+                                      Closeable grouperReleaser = 
grouperHolder.increment()
+                                  ) {
+                                    // Return true if OK, false if resources 
were exhausted.
+                                    return input.run(queryPlusForRunners, 
responseContext)
+                                                
.accumulate(AggregateResult.ok(), accumulator);
                                   }
-                              );
+                                  catch (QueryInterruptedException | 
QueryTimeoutException e) {
+                                    throw e;
+                                  }
+                                  catch (Exception e) {
+                                    log.error(e, "Exception with one of the 
sequences!");
+                                    Throwables.throwIfUnchecked(e);
+                                    throw new RuntimeException(e);
+                                  }
+                                }
+                              };
+
+                              final ListenableFuture<AggregateResult> future;
+                              if (hasPerSegmentTimeout) {
+                                future = queryProcessingPool.submitRunnerTask(
+                                    callable,
+                                    perSegmentTimeout,
+                                    TimeUnit.MILLISECONDS
+                                );
+                              } else {
+                                future = 
queryProcessingPool.submitRunnerTask(callable);
+                              }
 
                               if (isSingleThreaded) {
                                 waitForFutureCompletion(
@@ -376,22 +387,24 @@ public class GroupByMergingQueryRunner implements 
QueryRunner<ResultRow>
         }
       }
     }
-    catch (InterruptedException e) {
-      log.warn(e, "Query interrupted, cancelling pending results, query id 
[%s]", query.getId());
-      GuavaUtils.cancelAll(true, future, futures);
-      throw new QueryInterruptedException(e);
-    }
-    catch (CancellationException e) {
+    catch (InterruptedException | CancellationException e) {
+      log.noStackTrace().warn(e, "Query interrupted, cancelling pending 
results for query [%s]", query.getId());
       GuavaUtils.cancelAll(true, future, futures);
       throw new QueryInterruptedException(e);
     }
     catch (QueryTimeoutException | TimeoutException e) {
-      log.info("Query timeout, cancelling pending results for query id [%s]", 
query.getId());
+      log.noStackTrace().warn(e, "Query timeout, cancelling pending results 
for query [%s]", query.getId());
       GuavaUtils.cancelAll(true, future, futures);
-      throw new QueryTimeoutException();
+      throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query [%s] 
timed out", query.getId()));
     }
     catch (ExecutionException e) {
+      log.noStackTrace().warn(e, "Query error, cancelling pending results for 
query [%s]", query.getId());
       GuavaUtils.cancelAll(true, future, futures);
+      Throwable cause = e.getCause();
+      // Nested per-segment future timeout
+      if (cause instanceof TimeoutException) {
+        throw new QueryTimeoutException(StringUtils.nonStrictFormat("Query 
timeout, cancelling pending results for query [%s]. Per-segment timeout 
exceeded.", query.getId()));
+      }
       throw new RuntimeException(e);
     }
   }
diff --git 
a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
 
b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
index bf7b3877e51..2047e8ee159 100644
--- 
a/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
+++ 
b/processing/src/main/java/org/apache/druid/query/metadata/SegmentMetadataQueryRunnerFactory.java
@@ -24,7 +24,6 @@ import org.apache.druid.data.input.impl.TimestampSpec;
 import org.apache.druid.java.util.common.granularity.Granularity;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.java.util.common.guava.Sequences;
-import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.ChainedExecutionQueryRunner;
 import org.apache.druid.query.QueryPlus;
 import org.apache.druid.query.QueryProcessingPool;
@@ -55,9 +54,6 @@ import java.util.stream.Collectors;
 
 public class SegmentMetadataQueryRunnerFactory implements 
QueryRunnerFactory<SegmentAnalysis, SegmentMetadataQuery>
 {
-  private static final Logger log = new 
Logger(SegmentMetadataQueryRunnerFactory.class);
-
-
   private final SegmentMetadataQueryQueryToolChest toolChest;
   private final QueryWatcher queryWatcher;
 
diff --git 
a/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java
 
b/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java
index f7401a0dd99..726300d8ae8 100644
--- 
a/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/ChainedExecutionQueryRunnerTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.query;
 
+import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -35,6 +36,7 @@ import 
org.apache.druid.query.timeseries.TimeseriesResultValue;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -52,6 +54,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
@@ -60,13 +63,24 @@ import java.util.stream.Collectors;
 public class ChainedExecutionQueryRunnerTest
 {
   private final Lock neverRelease = new ReentrantLock();
+  private QueryProcessingPool processingPool;
 
   @Before
   public void setup()
   {
     neverRelease.lock();
+    processingPool = new ForwardingQueryProcessingPool(
+        Execs.multiThreaded(2, "ChainedExecutionQueryRunnerTestExecutor-%d"),
+        
Execs.scheduledSingleThreaded("ChainedExecutionQueryRunnerTestExecutor-Timeout-%d")
+    );
+  }
+
+  @After
+  public void tearDown()
+  {
+    processingPool.shutdown();
   }
-  
+
   @Test(timeout = 60_000L)
   public void testQueryCancellation() throws Exception
   {
@@ -121,7 +135,7 @@ public class ChainedExecutionQueryRunnerTest
     );
 
     ChainedExecutionQueryRunner chainedRunner = new 
ChainedExecutionQueryRunner<>(
-        new ForwardingQueryProcessingPool(exec),
+        processingPool,
         watcher,
         Lists.newArrayList(
             runners
@@ -245,7 +259,7 @@ public class ChainedExecutionQueryRunnerTest
     );
 
     ChainedExecutionQueryRunner chainedRunner = new 
ChainedExecutionQueryRunner<>(
-        new ForwardingQueryProcessingPool(exec),
+        processingPool,
         watcher,
         Lists.newArrayList(
             runners
@@ -344,6 +358,154 @@ public class ChainedExecutionQueryRunnerTest
     Assert.assertEquals(runners, actual);
   }
 
+  @Test(timeout = 10_000L)
+  public void testPerSegmentTimeout()
+  {
+    QueryRunner<Integer> slowRunner = (queryPlus, responseContext) -> {
+      try {
+        Thread.sleep(500);
+        return Sequences.of(2);
+      }
+      catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+    };
+    QueryRunner<Integer> fastRunner = (queryPlus, responseContext) -> 
Sequences.of(1);
+
+    QueryWatcher watcher = EasyMock.createStrictMock(QueryWatcher.class);
+    watcher.registerQueryFuture(
+        EasyMock.anyObject(),
+        EasyMock.anyObject()
+    );
+    EasyMock.expectLastCall().anyTimes();
+    EasyMock.replay(watcher);
+
+    ChainedExecutionQueryRunner chainedRunner = new 
ChainedExecutionQueryRunner<>(
+        processingPool,
+        watcher,
+        Arrays.asList(slowRunner, fastRunner)
+    );
+    TimeseriesQuery query = Druids.newTimeseriesQueryBuilder()
+                                  .dataSource("test")
+                                  .intervals("2014/2015")
+                                  .aggregators(Collections.singletonList(new 
CountAggregatorFactory("count")))
+                                  .context(
+                                      ImmutableMap.of(
+                                          
QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 100L,
+                                          QueryContexts.TIMEOUT_KEY, 5_000L
+                                      )
+                                  )
+                                  .queryId("test")
+                                  .build();
+    Sequence seq = chainedRunner.run(QueryPlus.wrap(query));
+
+    List<Integer> results = null;
+    Exception thrown = null;
+    try {
+      results = seq.toList();
+    }
+    catch (Exception e) {
+      thrown = e;
+    }
+
+    Assert.assertNull("No results expected due to timeout", results);
+    Assert.assertNotNull("Exception should be thrown", thrown);
+    Assert.assertTrue(
+        "Should be QueryTimeoutException or caused by it",
+        Throwables.getRootCause(thrown) instanceof QueryTimeoutException
+    );
+    Assert.assertEquals("Query timeout, cancelling pending results for query 
[test]. Per-segment timeout exceeded.", thrown.getMessage());
+
+    EasyMock.verify(watcher);
+  }
+
+  @Test(timeout = 5_000L)
+  public void test_perSegmentTimeout_crossQuery() throws Exception
+  {
+    final CountDownLatch slowStarted = new CountDownLatch(2);
+    final CountDownLatch fastStarted = new CountDownLatch(1);
+
+    QueryRunner<Result<TimeseriesResultValue>> slowRunner = (queryPlus, 
responseContext) -> {
+      slowStarted.countDown();
+      try {
+        Thread.sleep(60_000L);
+      }
+      catch (InterruptedException e) {
+        throw new QueryInterruptedException(e);
+      }
+      return Sequences.empty();
+    };
+
+    QueryRunner<Result<TimeseriesResultValue>> fastRunner = (queryPlus, 
responseContext) -> {
+      fastStarted.countDown();
+      return Sequences.simple(Collections.singletonList(
+          new Result<>(null, new 
TimeseriesResultValue(ImmutableMap.of("count", 1)))
+      ));
+    };
+
+    TimeseriesQuery slowQuery = Druids.newTimeseriesQueryBuilder()
+                                      .dataSource("test")
+                                      .intervals("2014/2015")
+                                      
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
+                                      .context(ImmutableMap.of(
+                                          QueryContexts.TIMEOUT_KEY, 300_000L,
+                                          
QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 1_000L
+                                      ))
+                                      .queryId("slow")
+                                      .build();
+
+    TimeseriesQuery fastQuery = Druids.newTimeseriesQueryBuilder()
+                                      .dataSource("test")
+                                      .intervals("2014/2015")
+                                      
.aggregators(Collections.singletonList(new CountAggregatorFactory("count")))
+                                      .context(ImmutableMap.of(
+                                          QueryContexts.TIMEOUT_KEY, 5_000L,
+                                          
QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 3_000L
+                                      ))
+                                      .queryId("fast")
+                                      .build();
+
+    ChainedExecutionQueryRunner<Result<TimeseriesResultValue>> 
slowChainedRunner = new ChainedExecutionQueryRunner<>(
+        processingPool,
+        QueryRunnerTestHelper.NOOP_QUERYWATCHER,
+        Arrays.asList(slowRunner, slowRunner)
+    );
+    ChainedExecutionQueryRunner<Result<TimeseriesResultValue>> 
fastChainedRunner = new ChainedExecutionQueryRunner<>(
+        processingPool,
+        QueryRunnerTestHelper.NOOP_QUERYWATCHER,
+        Collections.singletonList(fastRunner)
+    );
+
+    ExecutorService exec = Execs.multiThreaded(2, "QueryExecutor-%d");
+    try {
+      Future<List<Result<TimeseriesResultValue>>> slowFuture = exec.submit(() 
-> slowChainedRunner.run(QueryPlus.wrap(
+          slowQuery)).toList());
+
+      slowStarted.await();
+
+      Future<List<Result<TimeseriesResultValue>>> fastFuture = exec.submit(() 
-> fastChainedRunner.run(QueryPlus.wrap(
+          fastQuery)).toList());
+
+      boolean fastStartedEarly = fastStarted.await(500, TimeUnit.MILLISECONDS);
+      Assert.assertFalse(
+          "Fast query should be blocked and not started while slow queries are 
running",
+          fastStartedEarly
+      );
+
+      ExecutionException ex = Assert.assertThrows(ExecutionException.class, 
slowFuture::get);
+      Assert.assertTrue(Throwables.getRootCause(ex) instanceof 
QueryTimeoutException);
+      Assert.assertEquals("Query timeout, cancelling pending results for query 
[slow]. Per-segment timeout exceeded.", ex.getCause().getMessage());
+      Assert.assertEquals(
+          Collections.singletonList(
+              new Result<>(null, new 
TimeseriesResultValue(ImmutableMap.of("count", 1)))
+          ), fastFuture.get()
+      );
+    }
+    finally {
+      exec.shutdownNow();
+    }
+  }
+
   private class DyingQueryRunner implements QueryRunner<Integer>
   {
     private final CountDownLatch start;
diff --git 
a/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java
 
b/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java
index 3a8f235ab0f..8e3172812cb 100644
--- 
a/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/MetricsEmittingQueryProcessingPoolTest.java
@@ -23,20 +23,36 @@ import 
com.google.common.util.concurrent.ListeningExecutorService;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
 import org.apache.druid.java.util.metrics.StubServiceEmitter;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.concurrent.ScheduledExecutorService;
+
 @SuppressWarnings("DoNotMock")
 public class MetricsEmittingQueryProcessingPoolTest
 {
+  private ScheduledExecutorService timeoutService;
+
+  @Before
+  public void setUp()
+  {
+    timeoutService = Mockito.mock(ScheduledExecutorService.class);
+  }
+
   @Test
   public void testPrioritizedExecutorDelegate()
   {
     PrioritizedExecutorService service = 
Mockito.mock(PrioritizedExecutorService.class);
+    ScheduledExecutorService timeoutService = 
Mockito.mock(ScheduledExecutorService.class);
     Mockito.when(service.getQueueSize()).thenReturn(10);
     Mockito.when(service.getActiveTasks()).thenReturn(2);
     ExecutorServiceMonitor monitor = new ExecutorServiceMonitor();
-    MetricsEmittingQueryProcessingPool processingPool = new 
MetricsEmittingQueryProcessingPool(service, monitor);
+    MetricsEmittingQueryProcessingPool processingPool = new 
MetricsEmittingQueryProcessingPool(
+        service,
+        timeoutService,
+        monitor
+    );
     Assert.assertSame(service, processingPool.delegate());
 
     final StubServiceEmitter serviceEmitter = new 
StubServiceEmitter("service", "host");
@@ -51,7 +67,11 @@ public class MetricsEmittingQueryProcessingPoolTest
   {
     ListeningExecutorService service = 
Mockito.mock(ListeningExecutorService.class);
     ExecutorServiceMonitor monitor = new ExecutorServiceMonitor();
-    MetricsEmittingQueryProcessingPool processingPool = new 
MetricsEmittingQueryProcessingPool(service, monitor);
+    MetricsEmittingQueryProcessingPool processingPool = new 
MetricsEmittingQueryProcessingPool(
+        service,
+        timeoutService,
+        monitor
+    );
     Assert.assertSame(service, processingPool.delegate());
 
     ServiceEmitter serviceEmitter = Mockito.mock(ServiceEmitter.class);
diff --git 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
index 2453aee882a..2e52da567b6 100644
--- 
a/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
+++ 
b/processing/src/test/java/org/apache/druid/query/groupby/GroupByQueryRunnerFailureTest.java
@@ -31,8 +31,11 @@ import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Sequences;
 import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.ForwardingQueryProcessingPool;
 import org.apache.druid.query.QueryContexts;
 import org.apache.druid.query.QueryDataSource;
+import org.apache.druid.query.QueryInterruptedException;
+import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.QueryRunner;
 import org.apache.druid.query.QueryRunnerTestHelper;
 import org.apache.druid.query.QueryTimeoutException;
@@ -56,10 +59,15 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 @RunWith(Parameterized.class)
 public class GroupByQueryRunnerFailureTest
 {
+  private QueryProcessingPool processingPool;
   private static final DruidProcessingConfig DEFAULT_PROCESSING_CONFIG = new 
DruidProcessingConfig()
   {
 
@@ -140,6 +148,10 @@ public class GroupByQueryRunnerFailureTest
         MERGE_BUFFER_POOL.maxSize(),
         MERGE_BUFFER_POOL.getPoolSize()
     );
+    processingPool = new ForwardingQueryProcessingPool(
+        Execs.multiThreaded(2, "GroupByQueryRunnerFailureTestExecutor-%d"),
+        
Execs.scheduledSingleThreaded("GroupByQueryRunnerFailureTestExecutor-Timeout-%d")
+    );
   }
 
   @After
@@ -150,6 +162,7 @@ public class GroupByQueryRunnerFailureTest
         MERGE_BUFFER_POOL.maxSize(),
         MERGE_BUFFER_POOL.getPoolSize()
     );
+    processingPool.shutdown();
   }
 
   @AfterClass
@@ -277,7 +290,7 @@ public class GroupByQueryRunnerFailureTest
   }
 
   @Test(timeout = 60_000L)
-  public void testTimeoutExceptionOnQueryable()
+  public void testTimeoutExceptionOnQueryable_singleThreaded()
   {
     final GroupByQuery query = GroupByQuery
         .builder()
@@ -287,6 +300,7 @@ public class GroupByQueryRunnerFailureTest
         .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
         .setGranularity(Granularities.ALL)
         .overrideContext(ImmutableMap.of(QueryContexts.TIMEOUT_KEY, 1))
+        .queryId("test")
         .build();
 
     GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
@@ -313,9 +327,278 @@ public class GroupByQueryRunnerFailureTest
 
     QueryRunner<ResultRow> mergeRunners = 
factory.mergeRunners(Execs.directExecutor(), ImmutableList.of(runner, 
mockRunner));
 
-    Assert.assertThrows(
+    QueryTimeoutException ex = Assert.assertThrows(
+        QueryTimeoutException.class,
+        () -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners, 
query)
+    );
+    // Assert overall timeout is triggered
+    Assert.assertEquals("Query [test] timed out", ex.getMessage());
+  }
+
+  @Test(timeout = 60_000L)
+  public void testTimeoutExceptionOnQueryable_multiThreaded()
+  {
+    final GroupByQuery query = GroupByQuery
+        .builder()
+        .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+        .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+        .setGranularity(Granularities.ALL)
+        .overrideContext(Map.of(QueryContexts.TIMEOUT_KEY, 1))
+        .queryId("test")
+        .build();
+
+    GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+        GroupByQueryRunnerTest.DEFAULT_MAPPER,
+        new GroupByQueryConfig()
+        {
+
+          @Override
+          public boolean isSingleThreaded()
+          {
+            return true;
+          }
+        }
+    );
+    QueryRunner<ResultRow> mockRunner = (queryPlus, responseContext) -> {
+      try {
+        Thread.sleep(100);
+      }
+      catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      return Sequences.empty();
+    };
+
+    QueryRunner<ResultRow> mergeRunners = factory.mergeRunners(
+        Execs.directExecutor(),
+        List.of(runner, mockRunner)
+    );
+
+    QueryTimeoutException ex = Assert.assertThrows(
+        QueryTimeoutException.class,
+        () -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners, 
query)
+    );
+    // Assert overall timeout is triggered
+    Assert.assertEquals("Query [test] timed out", ex.getMessage());
+  }
+
+  @Test(timeout = 20_000L)
+  public void test_multiThreaded_perSegmentTimeout_causes_queryTimeout()
+  {
+    final GroupByQuery query = GroupByQuery
+        .builder()
+        .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+        .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+        .setGranularity(Granularities.ALL)
+        .overrideContext(Map.of(
+            QueryContexts.TIMEOUT_KEY,
+            300_000,
+            QueryContexts.PER_SEGMENT_TIMEOUT_KEY,
+            100
+        ))
+        .queryId("test")
+        .build();
+
+    GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+        GroupByQueryRunnerTest.DEFAULT_MAPPER,
+        new GroupByQueryConfig()
+        {
+
+          @Override
+          public boolean isSingleThreaded()
+          {
+            return false;
+          }
+        }
+    );
+    QueryRunner<ResultRow> mockRunner = (queryPlus, responseContext) -> {
+      try {
+        Thread.sleep(1000);
+      }
+      catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      return Sequences.empty();
+    };
+
+    QueryRunner<ResultRow> mergeRunners = factory.mergeRunners(
+        processingPool,
+        List.of(runner, mockRunner)
+    );
+
+    QueryTimeoutException ex = Assert.assertThrows(
+        QueryTimeoutException.class,
+        () -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners, 
query)
+    );
+    // Assert per-segment timeout is triggered
+    Assert.assertEquals("Query timeout, cancelling pending results for query 
[test]. Per-segment timeout exceeded.", ex.getMessage());
+  }
+
+  @Test(timeout = 20_000L)
+  public void test_singleThreaded_perSegmentTimeout_causes_queryTimeout()
+  {
+    final GroupByQuery query = GroupByQuery
+        .builder()
+        .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+        .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+        .setGranularity(Granularities.ALL)
+        .overrideContext(Map.of(
+            QueryContexts.TIMEOUT_KEY,
+            300_000,
+            QueryContexts.PER_SEGMENT_TIMEOUT_KEY,
+            100
+        ))
+        .queryId("test")
+        .build();
+
+    GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+        GroupByQueryRunnerTest.DEFAULT_MAPPER,
+        new GroupByQueryConfig()
+        {
+
+          @Override
+          public boolean isSingleThreaded()
+          {
+            return true;
+          }
+        }
+    );
+    QueryRunner<ResultRow> mockRunner = (queryPlus, responseContext) -> {
+      try {
+        Thread.sleep(1000);
+      }
+      catch (InterruptedException e) {
+        throw new RuntimeException(e);
+      }
+      return Sequences.empty();
+    };
+
+    QueryRunner<ResultRow> mergeRunners = factory.mergeRunners(
+        processingPool,
+        List.of(runner, mockRunner)
+    );
+
+    QueryTimeoutException ex = Assert.assertThrows(
         QueryTimeoutException.class,
         () -> GroupByQueryRunnerTestHelper.runQuery(factory, mergeRunners, 
query)
     );
+    // Assert per-segment timeout is triggered
+    Assert.assertEquals("Query timeout, cancelling pending results for query 
[test]. Per-segment timeout exceeded.", ex.getMessage());
+  }
+
+  @Test(timeout = 5_000L)
+  public void test_perSegmentTimeout_crossQuery() throws Exception
+  {
+    GroupByQueryRunnerFactory factory = makeQueryRunnerFactory(
+        GroupByQueryRunnerTest.DEFAULT_MAPPER,
+        new GroupByQueryConfig()
+        {
+          @Override
+          public boolean isSingleThreaded()
+          {
+            return false;
+          }
+        }
+    );
+
+    final GroupByQuery slowQuery = GroupByQuery
+        .builder()
+        .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+        .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+        .setGranularity(Granularities.ALL)
+        .overrideContext(Map.of(
+            QueryContexts.TIMEOUT_KEY, 300_000,
+            QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 1_000
+        ))
+        .queryId("slow")
+        .build();
+
+    final GroupByQuery fastQuery = GroupByQuery
+        .builder()
+        .setDataSource(QueryRunnerTestHelper.DATA_SOURCE)
+        .setQuerySegmentSpec(QueryRunnerTestHelper.FIRST_TO_THIRD)
+        .setDimensions(new DefaultDimensionSpec("quality", "alias"))
+        .setAggregatorSpecs(new LongSumAggregatorFactory("rows", "rows"))
+        .setGranularity(Granularities.ALL)
+        .overrideContext(Map.of(
+            QueryContexts.TIMEOUT_KEY, 5_000,
+            QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 100
+        ))
+        .queryId("fast")
+        .build();
+
+    CountDownLatch slowStart = new CountDownLatch(2);
+    CountDownLatch fastStart = new CountDownLatch(1);
+
+    QueryRunner<ResultRow> signalingSlowRunner = (queryPlus, responseContext) 
-> {
+      slowStart.countDown();
+      try {
+        Thread.sleep(60_000L);
+      }
+      catch (InterruptedException e) {
+        throw new QueryInterruptedException(e);
+      }
+      return Sequences.empty();
+    };
+    QueryRunner<ResultRow> fastRunner = (queryPlus, responseContext) -> {
+      fastStart.countDown();
+      return Sequences.empty();
+    };
+
+    AtomicReference<Throwable> thrown = new AtomicReference<>();
+    Thread slowQueryThread = new Thread(() -> {
+      try {
+        GroupByQueryRunnerTestHelper.runQuery(
+            factory,
+            factory.mergeRunners(
+                processingPool,
+                List.of(signalingSlowRunner, signalingSlowRunner)
+            ), slowQuery
+        );
+      }
+      catch (QueryTimeoutException e) {
+        thrown.set(e);
+        return;
+      }
+      Assert.fail("Expected QueryTimeoutException for slow query");
+    });
+
+    slowQueryThread.start();
+    slowStart.await();
+
+    Thread fastQueryThread = new Thread(() -> {
+      try {
+        GroupByQueryRunnerTestHelper.runQuery(
+            factory,
+            factory.mergeRunners(
+                processingPool,
+                List.of(fastRunner)
+            ), fastQuery
+        );
+      }
+      catch (Exception e) {
+        Assert.fail("Expected fast query to succeed");
+      }
+    });
+
+    fastQueryThread.start();
+    boolean fastStartedEarly = fastStart.await(500, TimeUnit.MILLISECONDS);
+    Assert.assertFalse(
+        "Fast query should be blocked and not started while slow queries are 
running",
+        fastStartedEarly
+    );
+
+    fastQueryThread.join();
+    slowQueryThread.join();
+
+    Assert.assertEquals("Query timeout, cancelling pending results for query 
[slow]. Per-segment timeout exceeded.", thrown.get().getMessage());
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java 
b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
index e737032015e..6362f585333 100644
--- a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
@@ -84,7 +84,7 @@ public class BrokerProcessingModule implements Module
       DruidProcessingConfig config
   )
   {
-    return new ForwardingQueryProcessingPool(Execs.dummy());
+    return new ForwardingQueryProcessingPool(Execs.dummy(), null);
   }
 
   @Provides
diff --git 
a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java 
b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
index 2cffbcfae4e..33bec735d5d 100644
--- a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
@@ -38,6 +38,7 @@ import org.apache.druid.guice.annotations.Global;
 import org.apache.druid.guice.annotations.Merging;
 import org.apache.druid.guice.annotations.Smile;
 import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
 import org.apache.druid.java.util.common.lifecycle.Lifecycle;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.offheap.OffheapBufferGenerator;
@@ -157,6 +158,7 @@ public class DruidProcessingModule implements Module
             lifecycle,
             config
         ),
+        config.getNumTimeoutThreads() > 0 ? 
ScheduledExecutors.fixed(config.getNumTimeoutThreads(), 
"PrioritizedExecutorService-Timeout-%d") : null,
         executorServiceMonitor
     );
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to