This is an automated email from the ASF dual-hosted git repository.

gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new f9ae483effc feat: Replace Task.supportsQueries() with finer-grained 
config. (#19444)
f9ae483effc is described below

commit f9ae483effc7856e1072ab9a19cdbc19ea305104
Author: Gian Merlino <[email protected]>
AuthorDate: Tue May 12 09:58:28 2026 -0700

    feat: Replace Task.supportsQueries() with finer-grained config. (#19444)
    
    Tasks can now declare exactly which resources they need. MSQWorkerTask
    takes advantage of this to declare that it does not need merge buffers,
    which saves some memory.
---
 .../rabbitstream/RabbitStreamIndexTask.java        |   6 -
 .../druid/indexing/kafka/KafkaIndexTask.java       |   6 -
 .../druid/indexing/kafka/KafkaIndexTaskTest.java   |   6 -
 .../druid/indexing/kinesis/KinesisIndexTask.java   |   6 -
 .../kinesis/KinesisIndexTaskSerdeTest.java         |   1 +
 .../indexing/kinesis/KinesisIndexTaskTest.java     |   1 -
 .../k8s/overlord/taskadapter/K8sTaskAdapter.java   |   2 +-
 .../taskadapter/PodTemplateTaskAdapter.java        |   2 +-
 .../taskadapter/PodTemplateTaskAdapterTest.java    |   2 -
 .../src/test/resources/expectedNoopJobLongIds.yaml |   2 +-
 .../test/resources/expectedNoopJobNoTaskJson.yaml  |   2 +-
 .../resources/expectedNoopJobTlsEnabledBase.yaml   |   2 +-
 .../apache/druid/guice/PeonProcessingModule.java   |  55 ++++++++-
 .../druid/indexing/common/task/AbstractTask.java   |   6 -
 .../druid/indexing/common/task/NoopTask.java       |  14 +++
 .../apache/druid/indexing/common/task/Task.java    |  16 ++-
 .../druid/indexing/overlord/ForkingTaskRunner.java |   2 +-
 .../seekablestream/SeekableStreamIndexTask.java    |  10 ++
 .../druid/guice/PeonProcessingModuleTest.java      | 125 +++++++++++++++++++++
 .../druid/indexing/common/task/IndexTaskTest.java  |   6 -
 .../druid/indexing/common/task/TaskTest.java       |   6 -
 .../apache/druid/msq/indexing/MSQWorkerTask.java   |  10 +-
 .../druid/msq/indexing/MSQWorkerTaskTest.java      |  10 ++
 .../BroadcastDatasourceLoadingSpec.java            |  14 ++-
 .../BroadcastDatasourceLoadingSpecTest.java        |   8 ++
 25 files changed, 249 insertions(+), 71 deletions(-)

diff --git 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
index dc9dee8ad43..53e1588501c 100644
--- 
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
+++ 
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
@@ -145,10 +145,4 @@ public class RabbitStreamIndexTask extends 
SeekableStreamIndexTask<String, Long,
   {
     return TYPE;
   }
-
-  @Override
-  public boolean supportsQueries()
-  {
-    return true;
-  }
 }
diff --git 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 936bd32718e..f16a9a35cf2 100644
--- 
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++ 
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -163,10 +163,4 @@ public class KafkaIndexTask extends 
SeekableStreamIndexTask<KafkaTopicPartition,
   {
     return INPUT_SOURCE_RESOURCES;
   }
-
-  @Override
-  public boolean supportsQueries()
-  {
-    return true;
-  }
 }
diff --git 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 1374cc4abf7..e3d98588d0a 100644
--- 
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++ 
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -373,7 +373,6 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
             Duration.standardHours(2).getStandardMinutes()
         )
     );
-    Assert.assertTrue(task.supportsQueries());
 
     final ListenableFuture<TaskStatus> future = runTask(task);
 
@@ -1296,7 +1295,6 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
             Duration.standardHours(2).getStandardMinutes()
         )
     );
-    Assert.assertTrue(task.supportsQueries());
 
     final ListenableFuture<TaskStatus> future = runTask(task);
 
@@ -1369,7 +1367,6 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
             Duration.standardHours(2).getStandardMinutes()
         )
     );
-    Assert.assertTrue(task.supportsQueries());
 
     final ListenableFuture<TaskStatus> future = runTask(task);
 
@@ -3095,7 +3092,6 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         )
     );
 
-    Assert.assertTrue(task.supportsQueries());
     final ListenableFuture<TaskStatus> future = runTask(task);
 
     // Wait for task to exit
@@ -3168,7 +3164,6 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         )
     );
 
-    Assert.assertTrue(task.supportsQueries());
     final ListenableFuture<TaskStatus> future = runTask(task);
 
     // Wait for task to exit
@@ -3243,7 +3238,6 @@ public class KafkaIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         )
     );
 
-    Assert.assertTrue(task.supportsQueries());
     final ListenableFuture<TaskStatus> future = runTask(task);
 
     // Wait for task to exit. Should fail and trip up with the first two bad 
messages in the stream
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index b6c47d05bb3..81b3e38200d 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -179,12 +179,6 @@ public class KinesisIndexTask extends 
SeekableStreamIndexTask<String, String, Ki
     return INPUT_SOURCE_RESOURCES;
   }
 
-  @Override
-  public boolean supportsQueries()
-  {
-    return true;
-  }
-
   @VisibleForTesting
   AWSCredentialsConfig getAwsCredentialsConfig()
   {
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
index 76b08bb7b85..3089a39537c 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
@@ -138,6 +138,7 @@ public class KinesisIndexTaskSerdeTest
     Assert.assertEquals(ACCESS_KEY, 
awsCredentialsConfig.getAccessKey().getPassword());
     Assert.assertEquals(SECRET_KEY, 
awsCredentialsConfig.getSecretKey().getPassword());
     Assert.assertEquals(FILE_SESSION_CREDENTIALS, 
awsCredentialsConfig.getFileSessionCredentials());
+    Assert.assertNotNull(target.getPeonProcessingModuleConfig());
     Assert.assertEquals(
         Collections.singleton(
             new ResourceAction(new Resource(
diff --git 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index ed96e947345..99fa37734ac 100644
--- 
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++ 
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -320,7 +320,6 @@ public class KinesisIndexTaskTest extends 
SeekableStreamIndexTaskTestBase
         ImmutableMap.of(SHARD_ID1, "2"),
         ImmutableMap.of(SHARD_ID1, "4")
     );
-    Assert.assertTrue(task.supportsQueries());
 
     final ListenableFuture<TaskStatus> future = runTask(task);
 
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
index 58a44b8cdb3..6f7d36dbe74 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
@@ -448,7 +448,7 @@ public abstract class K8sTaskAdapter implements TaskAdapter
     // If the task type is queryable, we need to load broadcast segments on 
the peon, used for
     // join queries. This is replaced by --loadBroadcastDatasourceMode option, 
but is preserved here
     // for backwards compatibility and can be removed in a future release.
-    if (task.supportsQueries()) {
+    if 
(task.getBroadcastDatasourceLoadingSpec().getMode().needsBroadcastSegments()) {
       command.add("--loadBroadcastSegments");
       command.add("true");
     }
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
index 8c68d9324e8..0ad3385ca21 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
@@ -234,7 +234,7 @@ public class PodTemplateTaskAdapter implements TaskAdapter
             .build(),
         new EnvVarBuilder()
             .withName(DruidK8sConstants.LOAD_BROADCAST_SEGMENTS_ENV)
-            .withValue(Boolean.toString(task.supportsQueries()))
+            
.withValue(Boolean.toString(task.getBroadcastDatasourceLoadingSpec().getMode().needsBroadcastSegments()))
             .build()
     );
     if (!shouldUseDeepStorageForTaskPayload(task)) {
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
index 098a2eb805f..a6ef7f547ee 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
@@ -422,7 +422,6 @@ public class PodTemplateTaskAdapterTest
     );
 
     Task task = EasyMock.mock(Task.class);
-    EasyMock.expect(task.supportsQueries()).andReturn(true);
     EasyMock.expect(task.getType()).andReturn("queryable").anyTimes();
     EasyMock.expect(task.getId()).andReturn("id").anyTimes();
     EasyMock.expect(task.getGroupId()).andReturn("groupid").anyTimes();
@@ -456,7 +455,6 @@ public class PodTemplateTaskAdapterTest
     );
 
     Task task = EasyMock.mock(Task.class);
-    EasyMock.expect(task.supportsQueries()).andReturn(true);
     EasyMock.expect(task.getType()).andReturn("queryable").anyTimes();
     EasyMock.expect(task.getId()).andReturn("id").anyTimes();
     EasyMock.expect(task.getGroupId()).andReturn("groupid").anyTimes();
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
 
b/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
index 9568386fb02..eec68115b1d 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
@@ -49,7 +49,7 @@ spec:
             - name: "TASK_ID"
               value: 
"api-issued_kill_wikipedia3_omjobnbc_1000-01-01T00:00:00.000Z_2023-05-14T00:00:00.000Z_2023-05-15T17:03:01.220Z"
             - name: "LOAD_BROADCAST_DATASOURCE_MODE"
-              value: "ALL"
+              value: "NONE"
             - name: "LOAD_BROADCAST_SEGMENTS"
               value: "false"
             - name: "TASK_JSON"
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
 
b/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
index 2ac2dea4759..499eff1df05 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
@@ -48,7 +48,7 @@ spec:
             - name: "TASK_ID"
               value: "id"
             - name: "LOAD_BROADCAST_DATASOURCE_MODE"
-              value: "ALL"
+              value: "NONE"
             - name: "LOAD_BROADCAST_SEGMENTS"
               value: "false"
           image: one
diff --git 
a/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml
 
b/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml
index a6fda6e2cb6..ef7641ec987 100644
--- 
a/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml
+++ 
b/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml
@@ -49,7 +49,7 @@ spec:
             - name: "TASK_ID"
               value: "id"
             - name: "LOAD_BROADCAST_DATASOURCE_MODE"
-              value: "ALL"
+              value: "NONE"
             - name: "LOAD_BROADCAST_SEGMENTS"
               value: "false"
             - name: "TASK_JSON"
diff --git 
a/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java
 
b/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java
index e3bbeb1efaf..17c4669dd86 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java
@@ -84,12 +84,12 @@ public class PeonProcessingModule implements Module
       Lifecycle lifecycle
   )
   {
-    if (task.supportsQueries()) {
+    if (task.getPeonProcessingModuleConfig().hasProcessingThreads()) {
       return DruidProcessingModule.createProcessingExecutorPool(config, 
executorServiceMonitor, lifecycle);
     } else {
       if (config.isNumThreadsConfigured()) {
         log.warn(
-            "Ignoring the configured numThreads[%d] because task[%s] of 
type[%s] does not support queries",
+            "Ignoring the configured numThreads[%d] because task[%s] of 
type[%s] does not need processing threads",
             config.getNumThreads(),
             task.getId(),
             task.getType()
@@ -108,7 +108,7 @@ public class PeonProcessingModule implements Module
       RuntimeInfo runtimeInfo
   )
   {
-    if (task.supportsQueries()) {
+    if (task.getPeonProcessingModuleConfig().hasProcessingBuffers()) {
       return DruidProcessingModule.createIntermediateResultsPool(config, 
runtimeInfo);
     } else {
       return DummyNonBlockingPool.instance();
@@ -120,13 +120,13 @@ public class PeonProcessingModule implements Module
   @Merging
   public BlockingPool<ByteBuffer> getMergeBufferPool(Task task, 
DruidProcessingConfig config, RuntimeInfo runtimeInfo)
   {
-    if (task.supportsQueries()) {
+    if (task.getPeonProcessingModuleConfig().hasMergeBuffers()) {
       return DruidProcessingModule.createMergeBufferPool(config, runtimeInfo);
     } else {
       if (config.isNumMergeBuffersConfigured()) {
         log.warn(
-            "Ignoring the configured numMergeBuffers[%d] because task[%s] of 
type[%s] does not support queries",
-            config.getNumThreads(),
+            "Ignoring the configured numMergeBuffers[%d] because task[%s] of 
type[%s] does not need merge buffers",
+            config.getNumMergeBuffers(),
             task.getId(),
             task.getType()
         );
@@ -145,4 +145,47 @@ public class PeonProcessingModule implements Module
   {
     return new GroupByResourcesReservationPool(mergeBufferPool, 
groupByQueryConfig);
   }
+
+  /**
+   * Returned by {@link Task#getPeonProcessingModuleConfig()} to declare which 
resources the task actually needs.
+   */
+  public static class Config
+  {
+    private boolean processingBuffers;
+    private boolean processingThreads;
+    private boolean mergeBuffers;
+
+    public Config withProcessingBuffers()
+    {
+      this.processingBuffers = true;
+      return this;
+    }
+
+    public Config withProcessingThreads()
+    {
+      this.processingThreads = true;
+      return this;
+    }
+
+    public Config withMergeBuffers()
+    {
+      this.mergeBuffers = true;
+      return this;
+    }
+
+    public boolean hasProcessingBuffers()
+    {
+      return processingBuffers;
+    }
+
+    public boolean hasProcessingThreads()
+    {
+      return processingThreads;
+    }
+
+    public boolean hasMergeBuffers()
+    {
+      return mergeBuffers;
+    }
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index f073de61aae..32652487e73 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -301,12 +301,6 @@ public abstract class AbstractTask implements Task
     return null;
   }
 
-  @Override
-  public boolean supportsQueries()
-  {
-    return false;
-  }
-
   @Override
   public String getClasspathPrefix()
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
index e838da2da7d..99c71562e3c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
@@ -30,6 +30,8 @@ import org.apache.druid.indexing.common.config.TaskConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.apache.druid.server.security.ResourceAction;
 
 import javax.annotation.Nonnull;
@@ -146,4 +148,16 @@ public class NoopTask extends AbstractTask implements 
PendingSegmentAllocatingTa
     context.put(Tasks.PRIORITY_KEY, priority);
     return new NoopTask(null, null, null, 0, 0, context);
   }
+
+  @Override
+  public LookupLoadingSpec getLookupLoadingSpec()
+  {
+    return LookupLoadingSpec.NONE;
+  }
+
+  @Override
+  public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec()
+  {
+    return BroadcastDatasourceLoadingSpec.NONE;
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index 625e1fed212..a9884438d5b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.guice.PeonProcessingModule;
 import org.apache.druid.indexer.TaskIdStatus;
 import org.apache.druid.indexer.TaskIdentifier;
 import org.apache.druid.indexer.TaskInfo;
@@ -176,16 +177,13 @@ public interface Task
   <T> QueryRunner<T> getQueryRunner(Query<T> query);
 
   /**
-   * True if this task type embeds a query stack, and therefore should preload 
resources (like broadcast tables)
-   * that may be needed by queries. Tasks supporting queries are also 
allocated processing buffers, processing threads
-   * and merge buffers. Those which do not should not assume that these 
resources are present and must explicitly allocate
-   * any direct buffers or processing pools if required.
-   *
-   * If true, {@link #getQueryRunner(Query)} does not necessarily return 
nonnull query runners. For example,
-   * MSQWorkerTask returns true from this method (because it embeds a query 
stack for running multi-stage queries)
-   * even though it is not directly queryable via HTTP.
+   * Declares which resources provided by {@link PeonProcessingModule} this 
task actually needs. The default
+   * implementation has all the optional items disabled.
    */
-  boolean supportsQueries();
+  default PeonProcessingModule.Config getPeonProcessingModuleConfig()
+  {
+    return new PeonProcessingModule.Config();
+  }
 
   /**
    * Returns an extra classpath that should be prepended to the default 
classpath when running this task. If no
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 1f7e4218107..fb09cb5f154 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -389,7 +389,7 @@ public class ForkingTaskRunner
                         // If the task type is queryable, we need to load 
broadcast segments on the peon, used for
                         // join queries. This is replaced by 
--loadBroadcastDatasourceMode option, but is preserved here
                         // for backwards compatibility and can be removed in a 
future release.
-                        if (task.supportsQueries()) {
+                        if 
(task.getBroadcastDatasourceLoadingSpec().getMode().needsBroadcastSegments()) {
                           command.add("--loadBroadcastSegments");
                           command.add("true");
                         }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 390fe68cfee..816cfc1574d 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -26,6 +26,7 @@ import com.google.common.base.Supplier;
 import com.google.common.base.Suppliers;
 import org.apache.druid.common.config.Configs;
 import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.guice.PeonProcessingModule;
 import org.apache.druid.indexer.TaskStatus;
 import 
org.apache.druid.indexing.appenderator.ActionBasedPublishedSegmentRetriever;
 import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
@@ -335,4 +336,13 @@ public abstract class 
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
   {
     return runnerSupplier.get();
   }
+
+  @Override
+  public PeonProcessingModule.Config getPeonProcessingModuleConfig()
+  {
+    return new PeonProcessingModule.Config()
+        .withProcessingBuffers()
+        .withProcessingThreads()
+        .withMergeBuffers();
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/guice/PeonProcessingModuleTest.java
 
b/indexing-service/src/test/java/org/apache/druid/guice/PeonProcessingModuleTest.java
new file mode 100644
index 00000000000..13599ee7fab
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/guice/PeonProcessingModuleTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import org.apache.druid.collections.BlockingPool;
+import org.apache.druid.collections.DummyBlockingPool;
+import org.apache.druid.collections.DummyNonBlockingPool;
+import org.apache.druid.collections.NonBlockingPool;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.lifecycle.Lifecycle;
+import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.ExecutorServiceMonitor;
+import org.apache.druid.query.NoopQueryProcessingPool;
+import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.utils.RuntimeInfo;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.nio.ByteBuffer;
+
+public class PeonProcessingModuleTest
+{
+  private final PeonProcessingModule module = new PeonProcessingModule();
+
+  @Test
+  public void testConfigBuilder()
+  {
+    final PeonProcessingModule.Config config = new 
PeonProcessingModule.Config()
+        .withProcessingBuffers()
+        .withProcessingThreads()
+        .withMergeBuffers();
+    Assert.assertTrue(config.hasProcessingBuffers());
+    Assert.assertTrue(config.hasProcessingThreads());
+    Assert.assertTrue(config.hasMergeBuffers());
+  }
+
+  @Test
+  public void testGetProcessingExecutorPool_whenNotNeeded()
+  {
+    final Task task = NoopTask.create();
+    final DruidProcessingConfig config = 
Mockito.mock(DruidProcessingConfig.class);
+    Mockito.when(config.isNumThreadsConfigured()).thenReturn(false);
+
+    final QueryProcessingPool pool = module.getProcessingExecutorPool(
+        task,
+        config,
+        new ExecutorServiceMonitor(),
+        new Lifecycle()
+    );
+    Assert.assertSame(NoopQueryProcessingPool.instance(), pool);
+  }
+
+  @Test
+  public void 
testGetProcessingExecutorPool_whenNotNeeded_andThreadsConfigured()
+  {
+    final Task task = NoopTask.create();
+    final DruidProcessingConfig config = 
Mockito.mock(DruidProcessingConfig.class);
+    Mockito.when(config.isNumThreadsConfigured()).thenReturn(true);
+    Mockito.when(config.getNumThreads()).thenReturn(2);
+
+    final QueryProcessingPool pool = module.getProcessingExecutorPool(
+        task,
+        config,
+        new ExecutorServiceMonitor(),
+        new Lifecycle()
+    );
+    Assert.assertSame(NoopQueryProcessingPool.instance(), pool);
+  }
+
+  @Test
+  public void testGetIntermediateResultsPool_whenNotNeeded()
+  {
+    final Task task = NoopTask.create();
+    final DruidProcessingConfig config = 
Mockito.mock(DruidProcessingConfig.class);
+    final RuntimeInfo runtimeInfo = Mockito.mock(RuntimeInfo.class);
+
+    final NonBlockingPool<ByteBuffer> pool = 
module.getIntermediateResultsPool(task, config, runtimeInfo);
+    Assert.assertSame(DummyNonBlockingPool.instance(), pool);
+  }
+
+  @Test
+  public void testGetMergeBufferPool_whenNotNeeded()
+  {
+    final Task task = NoopTask.create();
+    final DruidProcessingConfig config = 
Mockito.mock(DruidProcessingConfig.class);
+    Mockito.when(config.isNumMergeBuffersConfigured()).thenReturn(false);
+    final RuntimeInfo runtimeInfo = Mockito.mock(RuntimeInfo.class);
+
+    final BlockingPool<ByteBuffer> pool = module.getMergeBufferPool(task, 
config, runtimeInfo);
+    Assert.assertSame(DummyBlockingPool.instance(), pool);
+  }
+
+  @Test
+  public void testGetMergeBufferPool_whenNotNeeded_andMergeBuffersConfigured()
+  {
+    // Covers the inner "if (config.isNumMergeBuffersConfigured())" warning 
path.
+    final Task task = NoopTask.create();
+    final DruidProcessingConfig config = 
Mockito.mock(DruidProcessingConfig.class);
+    Mockito.when(config.isNumMergeBuffersConfigured()).thenReturn(true);
+    Mockito.when(config.getNumMergeBuffers()).thenReturn(2);
+    final RuntimeInfo runtimeInfo = Mockito.mock(RuntimeInfo.class);
+
+    final BlockingPool<ByteBuffer> pool = module.getMergeBufferPool(task, 
config, runtimeInfo);
+    Assert.assertSame(DummyBlockingPool.instance(), pool);
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 13e016f2880..a2b0b225553 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -276,8 +276,6 @@ public class IndexTaskTest extends IngestionTestBase
         null
     );
 
-    Assert.assertFalse(indexTask.supportsQueries());
-
     final DataSegmentsWithSchemas segmentWithSchemas = 
runSuccessfulTask(indexTask);
     final List<DataSegment> segments = new 
ArrayList<>(segmentWithSchemas.getSegments());
 
@@ -321,8 +319,6 @@ public class IndexTaskTest extends IngestionTestBase
         ImmutableMap.of(Tasks.STORE_EMPTY_COLUMNS_KEY, false)
     );
 
-    Assert.assertFalse(indexTask.supportsQueries());
-
     final DataSegmentsWithSchemas segmentWithSchemas = 
runSuccessfulTask(indexTask);
     final List<DataSegment> segments = new 
ArrayList<>(segmentWithSchemas.getSegments());
     Assert.assertEquals(1, segments.size());
@@ -361,8 +357,6 @@ public class IndexTaskTest extends IngestionTestBase
         null
     );
 
-    Assert.assertFalse(indexTask.supportsQueries());
-
     final DataSegmentsWithSchemas segmentWithSchemas = 
runSuccessfulTask(indexTask);
     final List<DataSegment> segments = new 
ArrayList<>(segmentWithSchemas.getSegments());
     Assert.assertEquals(2, segments.size());
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java
index 33502ecb3fb..24998b8029b 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java
@@ -78,12 +78,6 @@ public class TaskTest
       return null;
     }
 
-    @Override
-    public boolean supportsQueries()
-    {
-      return false;
-    }
-
     @Override
     public String getClasspathPrefix()
     {
diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
index d7279e60347..a6517d88dc3 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
@@ -31,6 +31,7 @@ import 
com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.inject.Injector;
 import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.guice.PeonProcessingModule;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -180,11 +181,12 @@ public class MSQWorkerTask extends AbstractTask
   }
 
   @Override
-  public boolean supportsQueries()
+  public PeonProcessingModule.Config getPeonProcessingModuleConfig()
   {
-    // Even though we don't have a QueryResource, we do embed a query stack, 
and so we need preloaded resources
-    // such as broadcast tables.
-    return true;
+    // No merge buffers needed.
+    return new PeonProcessingModule.Config()
+        .withProcessingBuffers()
+        .withProcessingThreads();
   }
 
   @Override
diff --git 
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
index 3672e9d1c29..8bafecb1986 100644
--- 
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
+++ 
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.msq.indexing;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.druid.error.DruidException;
+import org.apache.druid.guice.PeonProcessingModule;
 import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
 import org.junit.Assert;
 import org.junit.Test;
@@ -114,6 +115,15 @@ public class MSQWorkerTaskTest
     Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty());
   }
 
+  @Test
+  public void testGetPeonProcessingModuleConfig()
+  {
+    final PeonProcessingModule.Config config = 
msqWorkerTask.getPeonProcessingModuleConfig();
+    Assert.assertTrue(config.hasProcessingThreads());
+    Assert.assertTrue(config.hasProcessingBuffers());
+    Assert.assertFalse(config.hasMergeBuffers());
+  }
+
   @Test
   public void testGetDefaultLookupLoadingSpec()
   {
diff --git 
a/server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java
 
b/server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java
index 3a11027311e..06abdb902c4 100644
--- 
a/server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java
+++ 
b/server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java
@@ -51,7 +51,19 @@ public class BroadcastDatasourceLoadingSpec
 
   public enum Mode
   {
-    ALL, NONE, ONLY_REQUIRED
+    ALL(true), NONE(false), ONLY_REQUIRED(true);
+
+    private final boolean needsBroadcastSegments;
+
+    Mode(boolean needsBroadcastSegments)
+    {
+      this.needsBroadcastSegments = needsBroadcastSegments;
+    }
+
+    public boolean needsBroadcastSegments()
+    {
+      return needsBroadcastSegments;
+    }
   }
 
   private final Mode mode;
diff --git 
a/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java
 
b/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java
index 146862135b6..5c662b612dd 100644
--- 
a/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java
@@ -43,6 +43,14 @@ public class BroadcastDatasourceLoadingSpecTest
     Assert.assertNull(spec.getBroadcastDatasourcesToLoad());
   }
 
+  @Test
+  public void testModeNeedsBroadcastSegments()
+  {
+    
Assert.assertTrue(BroadcastDatasourceLoadingSpec.Mode.ALL.needsBroadcastSegments());
+    
Assert.assertFalse(BroadcastDatasourceLoadingSpec.Mode.NONE.needsBroadcastSegments());
+    
Assert.assertTrue(BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED.needsBroadcastSegments());
+  }
+
   @Test
   public void testLoadingNoLookups()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to