This is an automated email from the ASF dual-hosted git repository.
gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new f9ae483effc feat: Replace Task.supportsQueries() with finer-grained
config. (#19444)
f9ae483effc is described below
commit f9ae483effc7856e1072ab9a19cdbc19ea305104
Author: Gian Merlino <[email protected]>
AuthorDate: Tue May 12 09:58:28 2026 -0700
feat: Replace Task.supportsQueries() with finer-grained config. (#19444)
Tasks can now declare exactly which resources they need. MSQWorkerTask
takes advantage of this to declare that it does not need merge buffers,
which saves some memory.
---
.../rabbitstream/RabbitStreamIndexTask.java | 6 -
.../druid/indexing/kafka/KafkaIndexTask.java | 6 -
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 6 -
.../druid/indexing/kinesis/KinesisIndexTask.java | 6 -
.../kinesis/KinesisIndexTaskSerdeTest.java | 1 +
.../indexing/kinesis/KinesisIndexTaskTest.java | 1 -
.../k8s/overlord/taskadapter/K8sTaskAdapter.java | 2 +-
.../taskadapter/PodTemplateTaskAdapter.java | 2 +-
.../taskadapter/PodTemplateTaskAdapterTest.java | 2 -
.../src/test/resources/expectedNoopJobLongIds.yaml | 2 +-
.../test/resources/expectedNoopJobNoTaskJson.yaml | 2 +-
.../resources/expectedNoopJobTlsEnabledBase.yaml | 2 +-
.../apache/druid/guice/PeonProcessingModule.java | 55 ++++++++-
.../druid/indexing/common/task/AbstractTask.java | 6 -
.../druid/indexing/common/task/NoopTask.java | 14 +++
.../apache/druid/indexing/common/task/Task.java | 16 ++-
.../druid/indexing/overlord/ForkingTaskRunner.java | 2 +-
.../seekablestream/SeekableStreamIndexTask.java | 10 ++
.../druid/guice/PeonProcessingModuleTest.java | 125 +++++++++++++++++++++
.../druid/indexing/common/task/IndexTaskTest.java | 6 -
.../druid/indexing/common/task/TaskTest.java | 6 -
.../apache/druid/msq/indexing/MSQWorkerTask.java | 10 +-
.../druid/msq/indexing/MSQWorkerTaskTest.java | 10 ++
.../BroadcastDatasourceLoadingSpec.java | 14 ++-
.../BroadcastDatasourceLoadingSpecTest.java | 8 ++
25 files changed, 249 insertions(+), 71 deletions(-)
diff --git
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
index dc9dee8ad43..53e1588501c 100644
---
a/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
+++
b/extensions-contrib/rabbit-stream-indexing-service/src/main/java/org/apache/druid/indexing/rabbitstream/RabbitStreamIndexTask.java
@@ -145,10 +145,4 @@ public class RabbitStreamIndexTask extends
SeekableStreamIndexTask<String, Long,
{
return TYPE;
}
-
- @Override
- public boolean supportsQueries()
- {
- return true;
- }
}
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
index 936bd32718e..f16a9a35cf2 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTask.java
@@ -163,10 +163,4 @@ public class KafkaIndexTask extends
SeekableStreamIndexTask<KafkaTopicPartition,
{
return INPUT_SOURCE_RESOURCES;
}
-
- @Override
- public boolean supportsQueries()
- {
- return true;
- }
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 1374cc4abf7..e3d98588d0a 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -373,7 +373,6 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
Duration.standardHours(2).getStandardMinutes()
)
);
- Assert.assertTrue(task.supportsQueries());
final ListenableFuture<TaskStatus> future = runTask(task);
@@ -1296,7 +1295,6 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
Duration.standardHours(2).getStandardMinutes()
)
);
- Assert.assertTrue(task.supportsQueries());
final ListenableFuture<TaskStatus> future = runTask(task);
@@ -1369,7 +1367,6 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
Duration.standardHours(2).getStandardMinutes()
)
);
- Assert.assertTrue(task.supportsQueries());
final ListenableFuture<TaskStatus> future = runTask(task);
@@ -3095,7 +3092,6 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
)
);
- Assert.assertTrue(task.supportsQueries());
final ListenableFuture<TaskStatus> future = runTask(task);
// Wait for task to exit
@@ -3168,7 +3164,6 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
)
);
- Assert.assertTrue(task.supportsQueries());
final ListenableFuture<TaskStatus> future = runTask(task);
// Wait for task to exit
@@ -3243,7 +3238,6 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
)
);
- Assert.assertTrue(task.supportsQueries());
final ListenableFuture<TaskStatus> future = runTask(task);
// Wait for task to exit. Should fail and trip up with the first two bad
messages in the stream
diff --git
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
index b6c47d05bb3..81b3e38200d 100644
---
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
+++
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/KinesisIndexTask.java
@@ -179,12 +179,6 @@ public class KinesisIndexTask extends
SeekableStreamIndexTask<String, String, Ki
return INPUT_SOURCE_RESOURCES;
}
- @Override
- public boolean supportsQueries()
- {
- return true;
- }
-
@VisibleForTesting
AWSCredentialsConfig getAwsCredentialsConfig()
{
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
index 76b08bb7b85..3089a39537c 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskSerdeTest.java
@@ -138,6 +138,7 @@ public class KinesisIndexTaskSerdeTest
Assert.assertEquals(ACCESS_KEY,
awsCredentialsConfig.getAccessKey().getPassword());
Assert.assertEquals(SECRET_KEY,
awsCredentialsConfig.getSecretKey().getPassword());
Assert.assertEquals(FILE_SESSION_CREDENTIALS,
awsCredentialsConfig.getFileSessionCredentials());
+ Assert.assertNotNull(target.getPeonProcessingModuleConfig());
Assert.assertEquals(
Collections.singleton(
new ResourceAction(new Resource(
diff --git
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
index ed96e947345..99fa37734ac 100644
---
a/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
+++
b/extensions-core/kinesis-indexing-service/src/test/java/org/apache/druid/indexing/kinesis/KinesisIndexTaskTest.java
@@ -320,7 +320,6 @@ public class KinesisIndexTaskTest extends
SeekableStreamIndexTaskTestBase
ImmutableMap.of(SHARD_ID1, "2"),
ImmutableMap.of(SHARD_ID1, "4")
);
- Assert.assertTrue(task.supportsQueries());
final ListenableFuture<TaskStatus> future = runTask(task);
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
index 58a44b8cdb3..6f7d36dbe74 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/K8sTaskAdapter.java
@@ -448,7 +448,7 @@ public abstract class K8sTaskAdapter implements TaskAdapter
// If the task type is queryable, we need to load broadcast segments on
the peon, used for
// join queries. This is replaced by --loadBroadcastDatasourceMode option,
but is preserved here
// for backwards compatibility and can be removed in a future release.
- if (task.supportsQueries()) {
+ if
(task.getBroadcastDatasourceLoadingSpec().getMode().needsBroadcastSegments()) {
command.add("--loadBroadcastSegments");
command.add("true");
}
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
index 8c68d9324e8..0ad3385ca21 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapter.java
@@ -234,7 +234,7 @@ public class PodTemplateTaskAdapter implements TaskAdapter
.build(),
new EnvVarBuilder()
.withName(DruidK8sConstants.LOAD_BROADCAST_SEGMENTS_ENV)
- .withValue(Boolean.toString(task.supportsQueries()))
+
.withValue(Boolean.toString(task.getBroadcastDatasourceLoadingSpec().getMode().needsBroadcastSegments()))
.build()
);
if (!shouldUseDeepStorageForTaskPayload(task)) {
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
index 098a2eb805f..a6ef7f547ee 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/java/org/apache/druid/k8s/overlord/taskadapter/PodTemplateTaskAdapterTest.java
@@ -422,7 +422,6 @@ public class PodTemplateTaskAdapterTest
);
Task task = EasyMock.mock(Task.class);
- EasyMock.expect(task.supportsQueries()).andReturn(true);
EasyMock.expect(task.getType()).andReturn("queryable").anyTimes();
EasyMock.expect(task.getId()).andReturn("id").anyTimes();
EasyMock.expect(task.getGroupId()).andReturn("groupid").anyTimes();
@@ -456,7 +455,6 @@ public class PodTemplateTaskAdapterTest
);
Task task = EasyMock.mock(Task.class);
- EasyMock.expect(task.supportsQueries()).andReturn(true);
EasyMock.expect(task.getType()).andReturn("queryable").anyTimes();
EasyMock.expect(task.getId()).andReturn("id").anyTimes();
EasyMock.expect(task.getGroupId()).andReturn("groupid").anyTimes();
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
b/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
index 9568386fb02..eec68115b1d 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobLongIds.yaml
@@ -49,7 +49,7 @@ spec:
- name: "TASK_ID"
value:
"api-issued_kill_wikipedia3_omjobnbc_1000-01-01T00:00:00.000Z_2023-05-14T00:00:00.000Z_2023-05-15T17:03:01.220Z"
- name: "LOAD_BROADCAST_DATASOURCE_MODE"
- value: "ALL"
+ value: "NONE"
- name: "LOAD_BROADCAST_SEGMENTS"
value: "false"
- name: "TASK_JSON"
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
b/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
index 2ac2dea4759..499eff1df05 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobNoTaskJson.yaml
@@ -48,7 +48,7 @@ spec:
- name: "TASK_ID"
value: "id"
- name: "LOAD_BROADCAST_DATASOURCE_MODE"
- value: "ALL"
+ value: "NONE"
- name: "LOAD_BROADCAST_SEGMENTS"
value: "false"
image: one
diff --git
a/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml
b/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml
index a6fda6e2cb6..ef7641ec987 100644
---
a/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml
+++
b/extensions-core/kubernetes-overlord-extensions/src/test/resources/expectedNoopJobTlsEnabledBase.yaml
@@ -49,7 +49,7 @@ spec:
- name: "TASK_ID"
value: "id"
- name: "LOAD_BROADCAST_DATASOURCE_MODE"
- value: "ALL"
+ value: "NONE"
- name: "LOAD_BROADCAST_SEGMENTS"
value: "false"
- name: "TASK_JSON"
diff --git
a/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java
b/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java
index e3bbeb1efaf..17c4669dd86 100644
---
a/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java
+++
b/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java
@@ -84,12 +84,12 @@ public class PeonProcessingModule implements Module
Lifecycle lifecycle
)
{
- if (task.supportsQueries()) {
+ if (task.getPeonProcessingModuleConfig().hasProcessingThreads()) {
return DruidProcessingModule.createProcessingExecutorPool(config,
executorServiceMonitor, lifecycle);
} else {
if (config.isNumThreadsConfigured()) {
log.warn(
- "Ignoring the configured numThreads[%d] because task[%s] of
type[%s] does not support queries",
+ "Ignoring the configured numThreads[%d] because task[%s] of
type[%s] does not need processing threads",
config.getNumThreads(),
task.getId(),
task.getType()
@@ -108,7 +108,7 @@ public class PeonProcessingModule implements Module
RuntimeInfo runtimeInfo
)
{
- if (task.supportsQueries()) {
+ if (task.getPeonProcessingModuleConfig().hasProcessingBuffers()) {
return DruidProcessingModule.createIntermediateResultsPool(config,
runtimeInfo);
} else {
return DummyNonBlockingPool.instance();
@@ -120,13 +120,13 @@ public class PeonProcessingModule implements Module
@Merging
public BlockingPool<ByteBuffer> getMergeBufferPool(Task task,
DruidProcessingConfig config, RuntimeInfo runtimeInfo)
{
- if (task.supportsQueries()) {
+ if (task.getPeonProcessingModuleConfig().hasMergeBuffers()) {
return DruidProcessingModule.createMergeBufferPool(config, runtimeInfo);
} else {
if (config.isNumMergeBuffersConfigured()) {
log.warn(
- "Ignoring the configured numMergeBuffers[%d] because task[%s] of
type[%s] does not support queries",
- config.getNumThreads(),
+ "Ignoring the configured numMergeBuffers[%d] because task[%s] of
type[%s] does not need merge buffers",
+ config.getNumMergeBuffers(),
task.getId(),
task.getType()
);
@@ -145,4 +145,47 @@ public class PeonProcessingModule implements Module
{
return new GroupByResourcesReservationPool(mergeBufferPool,
groupByQueryConfig);
}
+
+ /**
+ * Returned by {@link Task#getPeonProcessingModuleConfig()} to declare which
resources the task actually needs.
+ */
+ public static class Config
+ {
+ private boolean processingBuffers;
+ private boolean processingThreads;
+ private boolean mergeBuffers;
+
+ public Config withProcessingBuffers()
+ {
+ this.processingBuffers = true;
+ return this;
+ }
+
+ public Config withProcessingThreads()
+ {
+ this.processingThreads = true;
+ return this;
+ }
+
+ public Config withMergeBuffers()
+ {
+ this.mergeBuffers = true;
+ return this;
+ }
+
+ public boolean hasProcessingBuffers()
+ {
+ return processingBuffers;
+ }
+
+ public boolean hasProcessingThreads()
+ {
+ return processingThreads;
+ }
+
+ public boolean hasMergeBuffers()
+ {
+ return mergeBuffers;
+ }
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
index f073de61aae..32652487e73 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/AbstractTask.java
@@ -301,12 +301,6 @@ public abstract class AbstractTask implements Task
return null;
}
- @Override
- public boolean supportsQueries()
- {
- return false;
- }
-
@Override
public String getClasspathPrefix()
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
index e838da2da7d..99c71562e3c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/NoopTask.java
@@ -30,6 +30,8 @@ import org.apache.druid.indexing.common.config.TaskConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.server.coordination.BroadcastDatasourceLoadingSpec;
+import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.apache.druid.server.security.ResourceAction;
import javax.annotation.Nonnull;
@@ -146,4 +148,16 @@ public class NoopTask extends AbstractTask implements
PendingSegmentAllocatingTa
context.put(Tasks.PRIORITY_KEY, priority);
return new NoopTask(null, null, null, 0, 0, context);
}
+
+ @Override
+ public LookupLoadingSpec getLookupLoadingSpec()
+ {
+ return LookupLoadingSpec.NONE;
+ }
+
+ @Override
+ public BroadcastDatasourceLoadingSpec getBroadcastDatasourceLoadingSpec()
+ {
+ return BroadcastDatasourceLoadingSpec.NONE;
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index 625e1fed212..a9884438d5b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonSubTypes;
import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.guice.PeonProcessingModule;
import org.apache.druid.indexer.TaskIdStatus;
import org.apache.druid.indexer.TaskIdentifier;
import org.apache.druid.indexer.TaskInfo;
@@ -176,16 +177,13 @@ public interface Task
<T> QueryRunner<T> getQueryRunner(Query<T> query);
/**
- * True if this task type embeds a query stack, and therefore should preload
resources (like broadcast tables)
- * that may be needed by queries. Tasks supporting queries are also
allocated processing buffers, processing threads
- * and merge buffers. Those which do not should not assume that these
resources are present and must explicitly allocate
- * any direct buffers or processing pools if required.
- *
- * If true, {@link #getQueryRunner(Query)} does not necessarily return
nonnull query runners. For example,
- * MSQWorkerTask returns true from this method (because it embeds a query
stack for running multi-stage queries)
- * even though it is not directly queryable via HTTP.
+ * Declares which resources provided by {@link PeonProcessingModule} this
task actually needs. The default
+ * implementation has all the optional items disabled.
*/
- boolean supportsQueries();
+ default PeonProcessingModule.Config getPeonProcessingModuleConfig()
+ {
+ return new PeonProcessingModule.Config();
+ }
/**
* Returns an extra classpath that should be prepended to the default
classpath when running this task. If no
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
index 1f7e4218107..fb09cb5f154 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/overlord/ForkingTaskRunner.java
@@ -389,7 +389,7 @@ public class ForkingTaskRunner
// If the task type is queryable, we need to load
broadcast segments on the peon, used for
// join queries. This is replaced by
--loadBroadcastDatasourceMode option, but is preserved here
// for backwards compatibility and can be removed in a
future release.
- if (task.supportsQueries()) {
+ if
(task.getBroadcastDatasourceLoadingSpec().getMode().needsBroadcastSegments()) {
command.add("--loadBroadcastSegments");
command.add("true");
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
index 390fe68cfee..816cfc1574d 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTask.java
@@ -26,6 +26,7 @@ import com.google.common.base.Supplier;
import com.google.common.base.Suppliers;
import org.apache.druid.common.config.Configs;
import org.apache.druid.data.input.impl.ByteEntity;
+import org.apache.druid.guice.PeonProcessingModule;
import org.apache.druid.indexer.TaskStatus;
import
org.apache.druid.indexing.appenderator.ActionBasedPublishedSegmentRetriever;
import org.apache.druid.indexing.appenderator.ActionBasedSegmentAllocator;
@@ -335,4 +336,13 @@ public abstract class
SeekableStreamIndexTask<PartitionIdType, SequenceOffsetTyp
{
return runnerSupplier.get();
}
+
+ @Override
+ public PeonProcessingModule.Config getPeonProcessingModuleConfig()
+ {
+ return new PeonProcessingModule.Config()
+ .withProcessingBuffers()
+ .withProcessingThreads()
+ .withMergeBuffers();
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/guice/PeonProcessingModuleTest.java
b/indexing-service/src/test/java/org/apache/druid/guice/PeonProcessingModuleTest.java
new file mode 100644
index 00000000000..13599ee7fab
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/guice/PeonProcessingModuleTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import org.apache.druid.collections.BlockingPool;
+import org.apache.druid.collections.DummyBlockingPool;
+import org.apache.druid.collections.DummyNonBlockingPool;
+import org.apache.druid.collections.NonBlockingPool;
+import org.apache.druid.indexing.common.task.NoopTask;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.lifecycle.Lifecycle;
+import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.ExecutorServiceMonitor;
+import org.apache.druid.query.NoopQueryProcessingPool;
+import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.utils.RuntimeInfo;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.nio.ByteBuffer;
+
+public class PeonProcessingModuleTest
+{
+ private final PeonProcessingModule module = new PeonProcessingModule();
+
+ @Test
+ public void testConfigBuilder()
+ {
+ final PeonProcessingModule.Config config = new
PeonProcessingModule.Config()
+ .withProcessingBuffers()
+ .withProcessingThreads()
+ .withMergeBuffers();
+ Assert.assertTrue(config.hasProcessingBuffers());
+ Assert.assertTrue(config.hasProcessingThreads());
+ Assert.assertTrue(config.hasMergeBuffers());
+ }
+
+ @Test
+ public void testGetProcessingExecutorPool_whenNotNeeded()
+ {
+ final Task task = NoopTask.create();
+ final DruidProcessingConfig config =
Mockito.mock(DruidProcessingConfig.class);
+ Mockito.when(config.isNumThreadsConfigured()).thenReturn(false);
+
+ final QueryProcessingPool pool = module.getProcessingExecutorPool(
+ task,
+ config,
+ new ExecutorServiceMonitor(),
+ new Lifecycle()
+ );
+ Assert.assertSame(NoopQueryProcessingPool.instance(), pool);
+ }
+
+ @Test
+ public void
testGetProcessingExecutorPool_whenNotNeeded_andThreadsConfigured()
+ {
+ final Task task = NoopTask.create();
+ final DruidProcessingConfig config =
Mockito.mock(DruidProcessingConfig.class);
+ Mockito.when(config.isNumThreadsConfigured()).thenReturn(true);
+ Mockito.when(config.getNumThreads()).thenReturn(2);
+
+ final QueryProcessingPool pool = module.getProcessingExecutorPool(
+ task,
+ config,
+ new ExecutorServiceMonitor(),
+ new Lifecycle()
+ );
+ Assert.assertSame(NoopQueryProcessingPool.instance(), pool);
+ }
+
+ @Test
+ public void testGetIntermediateResultsPool_whenNotNeeded()
+ {
+ final Task task = NoopTask.create();
+ final DruidProcessingConfig config =
Mockito.mock(DruidProcessingConfig.class);
+ final RuntimeInfo runtimeInfo = Mockito.mock(RuntimeInfo.class);
+
+ final NonBlockingPool<ByteBuffer> pool =
module.getIntermediateResultsPool(task, config, runtimeInfo);
+ Assert.assertSame(DummyNonBlockingPool.instance(), pool);
+ }
+
+ @Test
+ public void testGetMergeBufferPool_whenNotNeeded()
+ {
+ final Task task = NoopTask.create();
+ final DruidProcessingConfig config =
Mockito.mock(DruidProcessingConfig.class);
+ Mockito.when(config.isNumMergeBuffersConfigured()).thenReturn(false);
+ final RuntimeInfo runtimeInfo = Mockito.mock(RuntimeInfo.class);
+
+ final BlockingPool<ByteBuffer> pool = module.getMergeBufferPool(task,
config, runtimeInfo);
+ Assert.assertSame(DummyBlockingPool.instance(), pool);
+ }
+
+ @Test
+ public void testGetMergeBufferPool_whenNotNeeded_andMergeBuffersConfigured()
+ {
+ // Covers the inner "if (config.isNumMergeBuffersConfigured())" warning
path.
+ final Task task = NoopTask.create();
+ final DruidProcessingConfig config =
Mockito.mock(DruidProcessingConfig.class);
+ Mockito.when(config.isNumMergeBuffersConfigured()).thenReturn(true);
+ Mockito.when(config.getNumMergeBuffers()).thenReturn(2);
+ final RuntimeInfo runtimeInfo = Mockito.mock(RuntimeInfo.class);
+
+ final BlockingPool<ByteBuffer> pool = module.getMergeBufferPool(task,
config, runtimeInfo);
+ Assert.assertSame(DummyBlockingPool.instance(), pool);
+ }
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
index 13e016f2880..a2b0b225553 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/IndexTaskTest.java
@@ -276,8 +276,6 @@ public class IndexTaskTest extends IngestionTestBase
null
);
- Assert.assertFalse(indexTask.supportsQueries());
-
final DataSegmentsWithSchemas segmentWithSchemas =
runSuccessfulTask(indexTask);
final List<DataSegment> segments = new
ArrayList<>(segmentWithSchemas.getSegments());
@@ -321,8 +319,6 @@ public class IndexTaskTest extends IngestionTestBase
ImmutableMap.of(Tasks.STORE_EMPTY_COLUMNS_KEY, false)
);
- Assert.assertFalse(indexTask.supportsQueries());
-
final DataSegmentsWithSchemas segmentWithSchemas =
runSuccessfulTask(indexTask);
final List<DataSegment> segments = new
ArrayList<>(segmentWithSchemas.getSegments());
Assert.assertEquals(1, segments.size());
@@ -361,8 +357,6 @@ public class IndexTaskTest extends IngestionTestBase
null
);
- Assert.assertFalse(indexTask.supportsQueries());
-
final DataSegmentsWithSchemas segmentWithSchemas =
runSuccessfulTask(indexTask);
final List<DataSegment> segments = new
ArrayList<>(segmentWithSchemas.getSegments());
Assert.assertEquals(2, segments.size());
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java
index 33502ecb3fb..24998b8029b 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskTest.java
@@ -78,12 +78,6 @@ public class TaskTest
return null;
}
- @Override
- public boolean supportsQueries()
- {
- return false;
- }
-
@Override
public String getClasspathPrefix()
{
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
index d7279e60347..a6517d88dc3 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTask.java
@@ -31,6 +31,7 @@ import
com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Injector;
import org.apache.druid.common.guava.FutureUtils;
+import org.apache.druid.guice.PeonProcessingModule;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
@@ -180,11 +181,12 @@ public class MSQWorkerTask extends AbstractTask
}
@Override
- public boolean supportsQueries()
+ public PeonProcessingModule.Config getPeonProcessingModuleConfig()
{
- // Even though we don't have a QueryResource, we do embed a query stack,
and so we need preloaded resources
- // such as broadcast tables.
- return true;
+ // No merge buffers needed.
+ return new PeonProcessingModule.Config()
+ .withProcessingBuffers()
+ .withProcessingThreads();
}
@Override
diff --git
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
index 3672e9d1c29..8bafecb1986 100644
---
a/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
+++
b/multi-stage-query/src/test/java/org/apache/druid/msq/indexing/MSQWorkerTaskTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.msq.indexing;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.error.DruidException;
+import org.apache.druid.guice.PeonProcessingModule;
import org.apache.druid.server.lookup.cache.LookupLoadingSpec;
import org.junit.Assert;
import org.junit.Test;
@@ -114,6 +115,15 @@ public class MSQWorkerTaskTest
Assert.assertTrue(msqWorkerTask.getInputSourceResources().isEmpty());
}
+ @Test
+ public void testGetPeonProcessingModuleConfig()
+ {
+ final PeonProcessingModule.Config config =
msqWorkerTask.getPeonProcessingModuleConfig();
+ Assert.assertTrue(config.hasProcessingThreads());
+ Assert.assertTrue(config.hasProcessingBuffers());
+ Assert.assertFalse(config.hasMergeBuffers());
+ }
+
@Test
public void testGetDefaultLookupLoadingSpec()
{
diff --git
a/server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java
b/server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java
index 3a11027311e..06abdb902c4 100644
---
a/server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java
+++
b/server/src/main/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpec.java
@@ -51,7 +51,19 @@ public class BroadcastDatasourceLoadingSpec
public enum Mode
{
- ALL, NONE, ONLY_REQUIRED
+ ALL(true), NONE(false), ONLY_REQUIRED(true);
+
+ private final boolean needsBroadcastSegments;
+
+ Mode(boolean needsBroadcastSegments)
+ {
+ this.needsBroadcastSegments = needsBroadcastSegments;
+ }
+
+ public boolean needsBroadcastSegments()
+ {
+ return needsBroadcastSegments;
+ }
}
private final Mode mode;
diff --git
a/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java
b/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java
index 146862135b6..5c662b612dd 100644
---
a/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java
+++
b/server/src/test/java/org/apache/druid/server/coordination/BroadcastDatasourceLoadingSpecTest.java
@@ -43,6 +43,14 @@ public class BroadcastDatasourceLoadingSpecTest
Assert.assertNull(spec.getBroadcastDatasourcesToLoad());
}
+ @Test
+ public void testModeNeedsBroadcastSegments()
+ {
+
Assert.assertTrue(BroadcastDatasourceLoadingSpec.Mode.ALL.needsBroadcastSegments());
+
Assert.assertFalse(BroadcastDatasourceLoadingSpec.Mode.NONE.needsBroadcastSegments());
+
Assert.assertTrue(BroadcastDatasourceLoadingSpec.Mode.ONLY_REQUIRED.needsBroadcastSegments());
+ }
+
@Test
public void testLoadingNoLookups()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]