This is an automated email from the ASF dual-hosted git repository.
lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 72fbaf2e56e Non querying tasks shouldn't use processing buffers /
merge buffers (#16887)
72fbaf2e56e is described below
commit 72fbaf2e56e7606e4ba8862dbcd70a2687bb843b
Author: Laksh Singla <[email protected]>
AuthorDate: Tue Sep 10 11:36:36 2024 +0530
Non querying tasks shouldn't use processing buffers / merge buffers (#16887)
Tasks that do not support querying or query processing i.e. supportsQueries
= false do not require processing threads, processing buffers, and merge
buffers.
---
.../apache/druid/guice/PeonProcessingModule.java | 143 +++++++++++++++++++++
.../apache/druid/indexing/common/task/Task.java | 4 +-
.../druid/query/NoopQueryProcessingPool.java | 134 +++++++++++++++++++
.../apache/druid/guice/BrokerProcessingModule.java | 31 +----
.../apache/druid/guice/DruidProcessingModule.java | 85 ++++++++----
.../apache/druid/guice/RouterProcessingModule.java | 12 +-
.../main/java/org/apache/druid/cli/CliPeon.java | 4 +-
7 files changed, 352 insertions(+), 61 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java
b/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java
new file mode 100644
index 00000000000..8961da7a555
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulator;
+import org.apache.druid.client.cache.CachePopulatorStats;
+import org.apache.druid.collections.BlockingPool;
+import org.apache.druid.collections.DummyBlockingPool;
+import org.apache.druid.collections.DummyNonBlockingPool;
+import org.apache.druid.collections.NonBlockingPool;
+import org.apache.druid.guice.annotations.Global;
+import org.apache.druid.guice.annotations.Merging;
+import org.apache.druid.guice.annotations.Smile;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.lifecycle.Lifecycle;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.ExecutorServiceMonitor;
+import org.apache.druid.query.NoopQueryProcessingPool;
+import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This module fulfills the dependency injection of query processing and
caching resources: buffer pools and
+ * thread pools on Peon selectively. Only the peons for the tasks supporting
queries need to allocate direct buffers
+ * and thread pools. Thus, this is separate from the {@link
DruidProcessingModule} to separate the needs of the peons and
+ * the historicals
+ *
+ * @see DruidProcessingModule
+ */
+public class PeonProcessingModule implements Module
+{
+ private static final Logger log = new Logger(PeonProcessingModule.class);
+
+ @Override
+ public void configure(Binder binder)
+ {
+ DruidProcessingModule.registerConfigsAndMonitor(binder);
+ }
+
+ @Provides
+ @LazySingleton
+ public CachePopulator getCachePopulator(
+ @Smile ObjectMapper smileMapper,
+ CachePopulatorStats cachePopulatorStats,
+ CacheConfig cacheConfig
+ )
+ {
+ return DruidProcessingModule.createCachePopulator(smileMapper,
cachePopulatorStats, cacheConfig);
+ }
+
+ @Provides
+ @ManageLifecycle
+ public QueryProcessingPool getProcessingExecutorPool(
+ Task task,
+ DruidProcessingConfig config,
+ ExecutorServiceMonitor executorServiceMonitor,
+ Lifecycle lifecycle
+ )
+ {
+ if (task.supportsQueries()) {
+ return DruidProcessingModule.createProcessingExecutorPool(config,
executorServiceMonitor, lifecycle);
+ } else {
+ if (config.isNumThreadsConfigured()) {
+ log.warn(
+ "Ignoring the configured numThreads[%d] because task[%s] of
type[%s] does not support queries",
+ config.getNumThreads(),
+ task.getId(),
+ task.getType()
+ );
+ }
+ return NoopQueryProcessingPool.instance();
+ }
+ }
+
+ @Provides
+ @LazySingleton
+ @Global
+ public NonBlockingPool<ByteBuffer> getIntermediateResultsPool(Task task,
DruidProcessingConfig config)
+ {
+ if (task.supportsQueries()) {
+ return DruidProcessingModule.createIntermediateResultsPool(config);
+ } else {
+ return DummyNonBlockingPool.instance();
+ }
+ }
+
+ @Provides
+ @LazySingleton
+ @Merging
+ public BlockingPool<ByteBuffer> getMergeBufferPool(Task task,
DruidProcessingConfig config)
+ {
+ if (task.supportsQueries()) {
+ return DruidProcessingModule.createMergeBufferPool(config);
+ } else {
+ if (config.isNumMergeBuffersConfigured()) {
+ log.warn(
+ "Ignoring the configured numMergeBuffers[%d] because task[%s] of
type[%s] does not support queries",
+ config.getNumThreads(),
+ task.getId(),
+ task.getType()
+ );
+ }
+ return DummyBlockingPool.instance();
+ }
+ }
+
+ @Provides
+ @LazySingleton
+ @Merging
+ public GroupByResourcesReservationPool getGroupByResourcesReservationPool(
+ @Merging BlockingPool<ByteBuffer> mergeBufferPool,
+ GroupByQueryConfig groupByQueryConfig
+ )
+ {
+ return new GroupByResourcesReservationPool(mergeBufferPool,
groupByQueryConfig);
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index 9b882e2e8d2..003b39e606b 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -176,7 +176,9 @@ public interface Task
/**
* True if this task type embeds a query stack, and therefore should preload
resources (like broadcast tables)
- * that may be needed by queries.
+ * that may be needed by queries. Tasks supporting queries are also
allocated processing buffers, processing threads
+ * and merge buffers. Those which do not should not assume that these
resources are present and must explicitly allocate
+ * any direct buffers or processing pools if required.
*
* If true, {@link #getQueryRunner(Query)} does not necessarily return
nonnull query runners. For example,
* MSQWorkerTask returns true from this method (because it embeds a query
stack for running multi-stage queries)
diff --git
a/processing/src/main/java/org/apache/druid/query/NoopQueryProcessingPool.java
b/processing/src/main/java/org/apache/druid/query/NoopQueryProcessingPool.java
new file mode 100644
index 00000000000..efb9a53776a
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/query/NoopQueryProcessingPool.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.error.DruidException;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of {@link QueryProcessingPool} that throws when any query
execution task unit is submitted to it. It is
+ * semantically shutdown from the moment it is created, and since the shutdown
methods are supposed to be idempotent,
+ * they do not throw like the execution methods
+ */
+public class NoopQueryProcessingPool implements QueryProcessingPool
+{
+ private static final NoopQueryProcessingPool INSTANCE = new
NoopQueryProcessingPool();
+
+ public static NoopQueryProcessingPool instance()
+ {
+ return INSTANCE;
+ }
+
+ @Override
+ public <T, V> ListenableFuture<T>
submitRunnerTask(PrioritizedQueryRunnerCallable<T, V> task)
+ {
+ throw unsupportedException();
+ }
+
+ @Override
+ public <T> ListenableFuture<T> submit(Callable<T> callable)
+ {
+ throw unsupportedException();
+ }
+
+ @Override
+ public ListenableFuture<?> submit(Runnable runnable)
+ {
+ throw unsupportedException();
+ }
+
+ @Override
+ public <T> ListenableFuture<T> submit(Runnable runnable, T t)
+ {
+ throw unsupportedException();
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>>
collection)
+ {
+ throw unsupportedException();
+ }
+
+ @Override
+ public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>>
collection, long l, TimeUnit timeUnit)
+ {
+ throw unsupportedException();
+ }
+
+ @Override
+ public void shutdown()
+ {
+ // No op, since it is already shutdown
+ }
+
+ @Override
+ public List<Runnable> shutdownNow()
+ {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public boolean isShutdown()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean isTerminated()
+ {
+ return true;
+ }
+
+ @Override
+ public boolean awaitTermination(long l, TimeUnit timeUnit)
+ {
+ return true;
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> collection)
+ {
+ throw unsupportedException();
+ }
+
+ @Override
+ public <T> T invokeAny(Collection<? extends Callable<T>> collection, long l,
TimeUnit timeUnit)
+ {
+ throw unsupportedException();
+ }
+
+ @Override
+ public void execute(Runnable runnable)
+ {
+ throw unsupportedException();
+ }
+
+ private DruidException unsupportedException()
+ {
+ return DruidException.defensive("Unexpected call made to
NoopQueryProcessingPool");
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
index d70a2157e15..bc12e929219 100644
--- a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
@@ -20,16 +20,13 @@
package org.apache.druid.guice;
import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.ProvisionException;
-import org.apache.druid.client.cache.BackgroundCachePopulator;
import org.apache.druid.client.cache.CacheConfig;
import org.apache.druid.client.cache.CachePopulator;
import org.apache.druid.client.cache.CachePopulatorStats;
-import org.apache.druid.client.cache.ForegroundCachePopulator;
import org.apache.druid.collections.BlockingPool;
import org.apache.druid.collections.DefaultBlockingPool;
import org.apache.druid.collections.NonBlockingPool;
@@ -43,25 +40,22 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.offheap.OffheapBufferGenerator;
import org.apache.druid.query.BrokerParallelMergeConfig;
import org.apache.druid.query.DruidProcessingConfig;
-import org.apache.druid.query.ExecutorServiceMonitor;
import org.apache.druid.query.ForwardingQueryProcessingPool;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
-import org.apache.druid.server.metrics.MetricsModule;
import org.apache.druid.utils.JvmUtils;
import java.nio.ByteBuffer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
/**
* This module is used to fulfill dependency injection of query processing and
caching resources: buffer pools and
* thread pools on Broker. Broker does not need to be allocated an
intermediate results pool.
* This is separated from DruidProcessingModule to separate the needs of the
broker from the historicals
+ *
+ * @see DruidProcessingModule
*/
-
public class BrokerProcessingModule implements Module
{
private static final Logger log = new Logger(BrokerProcessingModule.class);
@@ -69,9 +63,8 @@ public class BrokerProcessingModule implements Module
@Override
public void configure(Binder binder)
{
- JsonConfigProvider.bind(binder, "druid.processing.merge",
BrokerParallelMergeConfig.class);
- JsonConfigProvider.bind(binder, "druid.processing",
DruidProcessingConfig.class);
- MetricsModule.register(binder, ExecutorServiceMonitor.class);
+ JsonConfigProvider.bind(binder,
DruidProcessingModule.PROCESSING_PROPERTY_PREFIX + ".merge",
BrokerParallelMergeConfig.class);
+ DruidProcessingModule.registerConfigsAndMonitor(binder);
}
@Provides
@@ -82,20 +75,7 @@ public class BrokerProcessingModule implements Module
CacheConfig cacheConfig
)
{
- if (cacheConfig.getNumBackgroundThreads() > 0) {
- final ExecutorService exec = Executors.newFixedThreadPool(
- cacheConfig.getNumBackgroundThreads(),
- new ThreadFactoryBuilder()
- .setNameFormat("background-cacher-%d")
- .setDaemon(true)
- .setPriority(Thread.MIN_PRIORITY)
- .build()
- );
-
- return new BackgroundCachePopulator(exec, smileMapper,
cachePopulatorStats, cacheConfig.getMaxEntrySize());
- } else {
- return new ForegroundCachePopulator(smileMapper, cachePopulatorStats,
cacheConfig.getMaxEntrySize());
- }
+ return DruidProcessingModule.createCachePopulator(smileMapper,
cachePopulatorStats, cacheConfig);
}
@Provides
@@ -113,7 +93,6 @@ public class BrokerProcessingModule implements Module
public NonBlockingPool<ByteBuffer>
getIntermediateResultsPool(DruidProcessingConfig config)
{
verifyDirectMemory(config);
-
return new StupidPool<>(
"intermediate processing pool",
new OffheapBufferGenerator("intermediate processing",
config.intermediateComputeSizeBytes()),
diff --git
a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
index a2daa25e214..4879b5cd3c7 100644
--- a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
@@ -59,13 +59,14 @@ import java.util.concurrent.Executors;
*/
public class DruidProcessingModule implements Module
{
+ public static final String PROCESSING_PROPERTY_PREFIX = "druid.processing";
+
private static final Logger log = new Logger(DruidProcessingModule.class);
@Override
public void configure(Binder binder)
{
- JsonConfigProvider.bind(binder, "druid.processing",
DruidProcessingConfig.class);
- MetricsModule.register(binder, ExecutorServiceMonitor.class);
+ registerConfigsAndMonitor(binder);
}
@Provides
@@ -75,6 +76,59 @@ public class DruidProcessingModule implements Module
CachePopulatorStats cachePopulatorStats,
CacheConfig cacheConfig
)
+ {
+ return createCachePopulator(smileMapper, cachePopulatorStats, cacheConfig);
+ }
+
+ @Provides
+ @ManageLifecycle
+ public QueryProcessingPool getProcessingExecutorPool(
+ DruidProcessingConfig config,
+ ExecutorServiceMonitor executorServiceMonitor,
+ Lifecycle lifecycle
+ )
+ {
+ return createProcessingExecutorPool(config, executorServiceMonitor,
lifecycle);
+ }
+
+ @Provides
+ @LazySingleton
+ @Global
+ public NonBlockingPool<ByteBuffer>
getIntermediateResultsPool(DruidProcessingConfig config)
+ {
+ return createIntermediateResultsPool(config);
+ }
+
+ @Provides
+ @LazySingleton
+ @Merging
+ public BlockingPool<ByteBuffer> getMergeBufferPool(DruidProcessingConfig
config)
+ {
+ return createMergeBufferPool(config);
+ }
+
+ @Provides
+ @LazySingleton
+ @Merging
+ public GroupByResourcesReservationPool getGroupByResourcesReservationPool(
+ @Merging BlockingPool<ByteBuffer> mergeBufferPool,
+ GroupByQueryConfig groupByQueryConfig
+ )
+ {
+ return new GroupByResourcesReservationPool(mergeBufferPool,
groupByQueryConfig);
+ }
+
+ public static void registerConfigsAndMonitor(Binder binder)
+ {
+ JsonConfigProvider.bind(binder, PROCESSING_PROPERTY_PREFIX,
DruidProcessingConfig.class);
+ MetricsModule.register(binder, ExecutorServiceMonitor.class);
+ }
+
+ public static CachePopulator createCachePopulator(
+ ObjectMapper smileMapper,
+ CachePopulatorStats cachePopulatorStats,
+ CacheConfig cacheConfig
+ )
{
if (cacheConfig.getNumBackgroundThreads() > 0) {
final ExecutorService exec = Executors.newFixedThreadPool(
@@ -92,9 +146,7 @@ public class DruidProcessingModule implements Module
}
}
- @Provides
- @ManageLifecycle
- public QueryProcessingPool getProcessingExecutorPool(
+ public static QueryProcessingPool createProcessingExecutorPool(
DruidProcessingConfig config,
ExecutorServiceMonitor executorServiceMonitor,
Lifecycle lifecycle
@@ -109,10 +161,7 @@ public class DruidProcessingModule implements Module
);
}
- @Provides
- @LazySingleton
- @Global
- public NonBlockingPool<ByteBuffer>
getIntermediateResultsPool(DruidProcessingConfig config)
+ public static NonBlockingPool<ByteBuffer>
createIntermediateResultsPool(final DruidProcessingConfig config)
{
verifyDirectMemory(config);
return new StupidPool<>(
@@ -123,10 +172,7 @@ public class DruidProcessingModule implements Module
);
}
- @Provides
- @LazySingleton
- @Merging
- public BlockingPool<ByteBuffer> getMergeBufferPool(DruidProcessingConfig
config)
+ public static BlockingPool<ByteBuffer> createMergeBufferPool(final
DruidProcessingConfig config)
{
verifyDirectMemory(config);
return new DefaultBlockingPool<>(
@@ -135,18 +181,7 @@ public class DruidProcessingModule implements Module
);
}
- @Provides
- @LazySingleton
- @Merging
- public GroupByResourcesReservationPool getGroupByResourcesReservationPool(
- @Merging BlockingPool<ByteBuffer> mergeBufferPool,
- GroupByQueryConfig groupByQueryConfig
- )
- {
- return new GroupByResourcesReservationPool(mergeBufferPool,
groupByQueryConfig);
- }
-
- private void verifyDirectMemory(DruidProcessingConfig config)
+ private static void verifyDirectMemory(DruidProcessingConfig config)
{
try {
final long maxDirectMemory =
JvmUtils.getRuntimeInfo().getDirectMemorySizeBytes();
diff --git
a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java
b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java
index f76b5ed940d..85357a7fa04 100644
--- a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java
@@ -28,15 +28,12 @@ import org.apache.druid.collections.DummyNonBlockingPool;
import org.apache.druid.collections.NonBlockingPool;
import org.apache.druid.guice.annotations.Global;
import org.apache.druid.guice.annotations.Merging;
-import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.DruidProcessingConfig;
-import org.apache.druid.query.ExecutorServiceMonitor;
-import org.apache.druid.query.ForwardingQueryProcessingPool;
+import org.apache.druid.query.NoopQueryProcessingPool;
import org.apache.druid.query.QueryProcessingPool;
import org.apache.druid.query.groupby.GroupByQueryConfig;
import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
-import org.apache.druid.server.metrics.MetricsModule;
import java.nio.ByteBuffer;
@@ -46,6 +43,8 @@ import java.nio.ByteBuffer;
* {@link org.apache.druid.query.QueryToolChest}s, and they couple query type
aspects not related to processing and
* caching, which Router uses, and related to processing and caching, which
Router doesn't use, but they inject the
* resources.
+ *
+ * @see DruidProcessingModule
*/
public class RouterProcessingModule implements Module
{
@@ -54,8 +53,7 @@ public class RouterProcessingModule implements Module
@Override
public void configure(Binder binder)
{
- JsonConfigProvider.bind(binder, "druid.processing",
DruidProcessingConfig.class);
- MetricsModule.register(binder, ExecutorServiceMonitor.class);
+ DruidProcessingModule.registerConfigsAndMonitor(binder);
}
@Provides
@@ -65,7 +63,7 @@ public class RouterProcessingModule implements Module
if (config.isNumThreadsConfigured()) {
log.warn("numThreads[%d] configured, that is ignored on Router",
config.getNumThreads());
}
- return new ForwardingQueryProcessingPool(Execs.dummy());
+ return NoopQueryProcessingPool.instance();
}
@Provides
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java
b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index eb572850cda..61a8ab7374e 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -47,7 +47,6 @@ import org.apache.druid.curator.ZkEnablementConfig;
import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.Binders;
import org.apache.druid.guice.CacheModule;
-import org.apache.druid.guice.DruidProcessingModule;
import org.apache.druid.guice.IndexingServiceInputSourceModule;
import org.apache.druid.guice.IndexingServiceTaskLogsModule;
import org.apache.druid.guice.IndexingServiceTuningConfigModule;
@@ -58,6 +57,7 @@ import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
import org.apache.druid.guice.ManageLifecycle;
import org.apache.druid.guice.ManageLifecycleServer;
+import org.apache.druid.guice.PeonProcessingModule;
import org.apache.druid.guice.PolyBind;
import org.apache.druid.guice.QueryRunnerFactoryModule;
import org.apache.druid.guice.QueryableModule;
@@ -205,7 +205,7 @@ public class CliPeon extends GuiceRunnable
protected List<? extends Module> getModules()
{
return ImmutableList.of(
- new DruidProcessingModule(),
+ new PeonProcessingModule(),
new QueryableModule(),
new QueryRunnerFactoryModule(),
new SegmentWranglerModule(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]