This is an automated email from the ASF dual-hosted git repository.

lakshsingla pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 72fbaf2e56e Non querying tasks shouldn't use processing buffers / 
merge buffers (#16887)
72fbaf2e56e is described below

commit 72fbaf2e56e7606e4ba8862dbcd70a2687bb843b
Author: Laksh Singla <[email protected]>
AuthorDate: Tue Sep 10 11:36:36 2024 +0530

    Non querying tasks shouldn't use processing buffers / merge buffers (#16887)
    
    Tasks that do not support querying or query processing i.e. supportsQueries 
= false do not require processing threads, processing buffers, and merge 
buffers.
---
 .../apache/druid/guice/PeonProcessingModule.java   | 143 +++++++++++++++++++++
 .../apache/druid/indexing/common/task/Task.java    |   4 +-
 .../druid/query/NoopQueryProcessingPool.java       | 134 +++++++++++++++++++
 .../apache/druid/guice/BrokerProcessingModule.java |  31 +----
 .../apache/druid/guice/DruidProcessingModule.java  |  85 ++++++++----
 .../apache/druid/guice/RouterProcessingModule.java |  12 +-
 .../main/java/org/apache/druid/cli/CliPeon.java    |   4 +-
 7 files changed, 352 insertions(+), 61 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java
 
b/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java
new file mode 100644
index 00000000000..8961da7a555
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/guice/PeonProcessingModule.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.guice;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Provides;
+import org.apache.druid.client.cache.CacheConfig;
+import org.apache.druid.client.cache.CachePopulator;
+import org.apache.druid.client.cache.CachePopulatorStats;
+import org.apache.druid.collections.BlockingPool;
+import org.apache.druid.collections.DummyBlockingPool;
+import org.apache.druid.collections.DummyNonBlockingPool;
+import org.apache.druid.collections.NonBlockingPool;
+import org.apache.druid.guice.annotations.Global;
+import org.apache.druid.guice.annotations.Merging;
+import org.apache.druid.guice.annotations.Smile;
+import org.apache.druid.indexing.common.task.Task;
+import org.apache.druid.java.util.common.lifecycle.Lifecycle;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.query.DruidProcessingConfig;
+import org.apache.druid.query.ExecutorServiceMonitor;
+import org.apache.druid.query.NoopQueryProcessingPool;
+import org.apache.druid.query.QueryProcessingPool;
+import org.apache.druid.query.groupby.GroupByQueryConfig;
+import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
+
+import java.nio.ByteBuffer;
+
+/**
+ * This module fulfills the dependency injection of query processing and 
caching resources: buffer pools and
+ * thread pools on Peon selectively. Only the peons for the tasks supporting 
queries need to allocate direct buffers
+ * and thread pools. Thus, this is separate from the {@link 
DruidProcessingModule} to separate the needs of the peons and
+ * the historicals
+ *
+ * @see DruidProcessingModule
+ */
+public class PeonProcessingModule implements Module
+{
+  private static final Logger log = new Logger(PeonProcessingModule.class);
+
+  @Override
+  public void configure(Binder binder)
+  {
+    DruidProcessingModule.registerConfigsAndMonitor(binder);
+  }
+
+  @Provides
+  @LazySingleton
+  public CachePopulator getCachePopulator(
+      @Smile ObjectMapper smileMapper,
+      CachePopulatorStats cachePopulatorStats,
+      CacheConfig cacheConfig
+  )
+  {
+    return DruidProcessingModule.createCachePopulator(smileMapper, 
cachePopulatorStats, cacheConfig);
+  }
+
+  @Provides
+  @ManageLifecycle
+  public QueryProcessingPool getProcessingExecutorPool(
+      Task task,
+      DruidProcessingConfig config,
+      ExecutorServiceMonitor executorServiceMonitor,
+      Lifecycle lifecycle
+  )
+  {
+    if (task.supportsQueries()) {
+      return DruidProcessingModule.createProcessingExecutorPool(config, 
executorServiceMonitor, lifecycle);
+    } else {
+      if (config.isNumThreadsConfigured()) {
+        log.warn(
+            "Ignoring the configured numThreads[%d] because task[%s] of 
type[%s] does not support queries",
+            config.getNumThreads(),
+            task.getId(),
+            task.getType()
+        );
+      }
+      return NoopQueryProcessingPool.instance();
+    }
+  }
+
+  @Provides
+  @LazySingleton
+  @Global
+  public NonBlockingPool<ByteBuffer> getIntermediateResultsPool(Task task, 
DruidProcessingConfig config)
+  {
+    if (task.supportsQueries()) {
+      return DruidProcessingModule.createIntermediateResultsPool(config);
+    } else {
+      return DummyNonBlockingPool.instance();
+    }
+  }
+
+  @Provides
+  @LazySingleton
+  @Merging
+  public BlockingPool<ByteBuffer> getMergeBufferPool(Task task, 
DruidProcessingConfig config)
+  {
+    if (task.supportsQueries()) {
+      return DruidProcessingModule.createMergeBufferPool(config);
+    } else {
+      if (config.isNumMergeBuffersConfigured()) {
+        log.warn(
+            "Ignoring the configured numMergeBuffers[%d] because task[%s] of 
type[%s] does not support queries",
+            config.getNumThreads(),
+            task.getId(),
+            task.getType()
+        );
+      }
+      return DummyBlockingPool.instance();
+    }
+  }
+
+  @Provides
+  @LazySingleton
+  @Merging
+  public GroupByResourcesReservationPool getGroupByResourcesReservationPool(
+      @Merging BlockingPool<ByteBuffer> mergeBufferPool,
+      GroupByQueryConfig groupByQueryConfig
+  )
+  {
+    return new GroupByResourcesReservationPool(mergeBufferPool, 
groupByQueryConfig);
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index 9b882e2e8d2..003b39e606b 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -176,7 +176,9 @@ public interface Task
 
   /**
    * True if this task type embeds a query stack, and therefore should preload 
resources (like broadcast tables)
-   * that may be needed by queries.
+   * that may be needed by queries. Tasks supporting queries are also 
allocated processing buffers, processing threads
+   * and merge buffers. Those which do not should not assume that these 
resources are present and must explicitly allocate
+   * any direct buffers or processing pools if required.
    *
    * If true, {@link #getQueryRunner(Query)} does not necessarily return 
nonnull query runners. For example,
    * MSQWorkerTask returns true from this method (because it embeds a query 
stack for running multi-stage queries)
diff --git 
a/processing/src/main/java/org/apache/druid/query/NoopQueryProcessingPool.java 
b/processing/src/main/java/org/apache/druid/query/NoopQueryProcessingPool.java
new file mode 100644
index 00000000000..efb9a53776a
--- /dev/null
+++ 
b/processing/src/main/java/org/apache/druid/query/NoopQueryProcessingPool.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.query;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.druid.error.DruidException;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of {@link QueryProcessingPool} that throws when any query 
execution task unit is submitted to it. It is
+ * semantically shutdown from the moment it is created, and since the shutdown 
methods are supposed to be idempotent,
+ * they do not throw like the execution methods
+ */
+public class NoopQueryProcessingPool implements QueryProcessingPool
+{
+  private static final NoopQueryProcessingPool INSTANCE = new 
NoopQueryProcessingPool();
+
+  public static NoopQueryProcessingPool instance()
+  {
+    return INSTANCE;
+  }
+
+  @Override
+  public <T, V> ListenableFuture<T> 
submitRunnerTask(PrioritizedQueryRunnerCallable<T, V> task)
+  {
+    throw unsupportedException();
+  }
+
+  @Override
+  public <T> ListenableFuture<T> submit(Callable<T> callable)
+  {
+    throw unsupportedException();
+  }
+
+  @Override
+  public ListenableFuture<?> submit(Runnable runnable)
+  {
+    throw unsupportedException();
+  }
+
+  @Override
+  public <T> ListenableFuture<T> submit(Runnable runnable, T t)
+  {
+    throw unsupportedException();
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
collection)
+  {
+    throw unsupportedException();
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> 
collection, long l, TimeUnit timeUnit)
+  {
+    throw unsupportedException();
+  }
+
+  @Override
+  public void shutdown()
+  {
+    // No op, since it is already shutdown
+  }
+
+  @Override
+  public List<Runnable> shutdownNow()
+  {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public boolean isShutdown()
+  {
+    return true;
+  }
+
+  @Override
+  public boolean isTerminated()
+  {
+    return true;
+  }
+
+  @Override
+  public boolean awaitTermination(long l, TimeUnit timeUnit)
+  {
+    return true;
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> collection)
+  {
+    throw unsupportedException();
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> collection, long l, 
TimeUnit timeUnit)
+  {
+    throw unsupportedException();
+  }
+
+  @Override
+  public void execute(Runnable runnable)
+  {
+    throw unsupportedException();
+  }
+
+  private DruidException unsupportedException()
+  {
+    return DruidException.defensive("Unexpected call made to 
NoopQueryProcessingPool");
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java 
b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
index d70a2157e15..bc12e929219 100644
--- a/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/BrokerProcessingModule.java
@@ -20,16 +20,13 @@
 package org.apache.druid.guice;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Binder;
 import com.google.inject.Module;
 import com.google.inject.Provides;
 import com.google.inject.ProvisionException;
-import org.apache.druid.client.cache.BackgroundCachePopulator;
 import org.apache.druid.client.cache.CacheConfig;
 import org.apache.druid.client.cache.CachePopulator;
 import org.apache.druid.client.cache.CachePopulatorStats;
-import org.apache.druid.client.cache.ForegroundCachePopulator;
 import org.apache.druid.collections.BlockingPool;
 import org.apache.druid.collections.DefaultBlockingPool;
 import org.apache.druid.collections.NonBlockingPool;
@@ -43,25 +40,22 @@ import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.offheap.OffheapBufferGenerator;
 import org.apache.druid.query.BrokerParallelMergeConfig;
 import org.apache.druid.query.DruidProcessingConfig;
-import org.apache.druid.query.ExecutorServiceMonitor;
 import org.apache.druid.query.ForwardingQueryProcessingPool;
 import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.groupby.GroupByQueryConfig;
 import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
-import org.apache.druid.server.metrics.MetricsModule;
 import org.apache.druid.utils.JvmUtils;
 
 import java.nio.ByteBuffer;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ForkJoinPool;
 
 /**
  * This module is used to fulfill dependency injection of query processing and 
caching resources: buffer pools and
  * thread pools on Broker. Broker does not need to be allocated an 
intermediate results pool.
  * This is separated from DruidProcessingModule to separate the needs of the 
broker from the historicals
+ *
+ * @see DruidProcessingModule
  */
-
 public class BrokerProcessingModule implements Module
 {
   private static final Logger log = new Logger(BrokerProcessingModule.class);
@@ -69,9 +63,8 @@ public class BrokerProcessingModule implements Module
   @Override
   public void configure(Binder binder)
   {
-    JsonConfigProvider.bind(binder, "druid.processing.merge", 
BrokerParallelMergeConfig.class);
-    JsonConfigProvider.bind(binder, "druid.processing", 
DruidProcessingConfig.class);
-    MetricsModule.register(binder, ExecutorServiceMonitor.class);
+    JsonConfigProvider.bind(binder, 
DruidProcessingModule.PROCESSING_PROPERTY_PREFIX + ".merge", 
BrokerParallelMergeConfig.class);
+    DruidProcessingModule.registerConfigsAndMonitor(binder);
   }
 
   @Provides
@@ -82,20 +75,7 @@ public class BrokerProcessingModule implements Module
       CacheConfig cacheConfig
   )
   {
-    if (cacheConfig.getNumBackgroundThreads() > 0) {
-      final ExecutorService exec = Executors.newFixedThreadPool(
-          cacheConfig.getNumBackgroundThreads(),
-          new ThreadFactoryBuilder()
-              .setNameFormat("background-cacher-%d")
-              .setDaemon(true)
-              .setPriority(Thread.MIN_PRIORITY)
-              .build()
-      );
-
-      return new BackgroundCachePopulator(exec, smileMapper, 
cachePopulatorStats, cacheConfig.getMaxEntrySize());
-    } else {
-      return new ForegroundCachePopulator(smileMapper, cachePopulatorStats, 
cacheConfig.getMaxEntrySize());
-    }
+    return DruidProcessingModule.createCachePopulator(smileMapper, 
cachePopulatorStats, cacheConfig);
   }
 
   @Provides
@@ -113,7 +93,6 @@ public class BrokerProcessingModule implements Module
   public NonBlockingPool<ByteBuffer> 
getIntermediateResultsPool(DruidProcessingConfig config)
   {
     verifyDirectMemory(config);
-
     return new StupidPool<>(
         "intermediate processing pool",
         new OffheapBufferGenerator("intermediate processing", 
config.intermediateComputeSizeBytes()),
diff --git 
a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java 
b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
index a2daa25e214..4879b5cd3c7 100644
--- a/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/DruidProcessingModule.java
@@ -59,13 +59,14 @@ import java.util.concurrent.Executors;
  */
 public class DruidProcessingModule implements Module
 {
+  public static final String PROCESSING_PROPERTY_PREFIX = "druid.processing";
+
   private static final Logger log = new Logger(DruidProcessingModule.class);
 
   @Override
   public void configure(Binder binder)
   {
-    JsonConfigProvider.bind(binder, "druid.processing", 
DruidProcessingConfig.class);
-    MetricsModule.register(binder, ExecutorServiceMonitor.class);
+    registerConfigsAndMonitor(binder);
   }
 
   @Provides
@@ -75,6 +76,59 @@ public class DruidProcessingModule implements Module
       CachePopulatorStats cachePopulatorStats,
       CacheConfig cacheConfig
   )
+  {
+    return createCachePopulator(smileMapper, cachePopulatorStats, cacheConfig);
+  }
+
+  @Provides
+  @ManageLifecycle
+  public QueryProcessingPool getProcessingExecutorPool(
+      DruidProcessingConfig config,
+      ExecutorServiceMonitor executorServiceMonitor,
+      Lifecycle lifecycle
+  )
+  {
+    return createProcessingExecutorPool(config, executorServiceMonitor, 
lifecycle);
+  }
+
+  @Provides
+  @LazySingleton
+  @Global
+  public NonBlockingPool<ByteBuffer> 
getIntermediateResultsPool(DruidProcessingConfig config)
+  {
+    return createIntermediateResultsPool(config);
+  }
+
+  @Provides
+  @LazySingleton
+  @Merging
+  public BlockingPool<ByteBuffer> getMergeBufferPool(DruidProcessingConfig 
config)
+  {
+    return createMergeBufferPool(config);
+  }
+
+  @Provides
+  @LazySingleton
+  @Merging
+  public GroupByResourcesReservationPool getGroupByResourcesReservationPool(
+      @Merging BlockingPool<ByteBuffer> mergeBufferPool,
+      GroupByQueryConfig groupByQueryConfig
+  )
+  {
+    return new GroupByResourcesReservationPool(mergeBufferPool, 
groupByQueryConfig);
+  }
+
+  public static void registerConfigsAndMonitor(Binder binder)
+  {
+    JsonConfigProvider.bind(binder, PROCESSING_PROPERTY_PREFIX, 
DruidProcessingConfig.class);
+    MetricsModule.register(binder, ExecutorServiceMonitor.class);
+  }
+
+  public static CachePopulator createCachePopulator(
+      ObjectMapper smileMapper,
+      CachePopulatorStats cachePopulatorStats,
+      CacheConfig cacheConfig
+  )
   {
     if (cacheConfig.getNumBackgroundThreads() > 0) {
       final ExecutorService exec = Executors.newFixedThreadPool(
@@ -92,9 +146,7 @@ public class DruidProcessingModule implements Module
     }
   }
 
-  @Provides
-  @ManageLifecycle
-  public QueryProcessingPool getProcessingExecutorPool(
+  public static QueryProcessingPool createProcessingExecutorPool(
       DruidProcessingConfig config,
       ExecutorServiceMonitor executorServiceMonitor,
       Lifecycle lifecycle
@@ -109,10 +161,7 @@ public class DruidProcessingModule implements Module
     );
   }
 
-  @Provides
-  @LazySingleton
-  @Global
-  public NonBlockingPool<ByteBuffer> 
getIntermediateResultsPool(DruidProcessingConfig config)
+  public static NonBlockingPool<ByteBuffer> 
createIntermediateResultsPool(final DruidProcessingConfig config)
   {
     verifyDirectMemory(config);
     return new StupidPool<>(
@@ -123,10 +172,7 @@ public class DruidProcessingModule implements Module
     );
   }
 
-  @Provides
-  @LazySingleton
-  @Merging
-  public BlockingPool<ByteBuffer> getMergeBufferPool(DruidProcessingConfig 
config)
+  public static BlockingPool<ByteBuffer> createMergeBufferPool(final 
DruidProcessingConfig config)
   {
     verifyDirectMemory(config);
     return new DefaultBlockingPool<>(
@@ -135,18 +181,7 @@ public class DruidProcessingModule implements Module
     );
   }
 
-  @Provides
-  @LazySingleton
-  @Merging
-  public GroupByResourcesReservationPool getGroupByResourcesReservationPool(
-      @Merging BlockingPool<ByteBuffer> mergeBufferPool,
-      GroupByQueryConfig groupByQueryConfig
-  )
-  {
-    return new GroupByResourcesReservationPool(mergeBufferPool, 
groupByQueryConfig);
-  }
-
-  private void verifyDirectMemory(DruidProcessingConfig config)
+  private static void verifyDirectMemory(DruidProcessingConfig config)
   {
     try {
       final long maxDirectMemory = 
JvmUtils.getRuntimeInfo().getDirectMemorySizeBytes();
diff --git 
a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java 
b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java
index f76b5ed940d..85357a7fa04 100644
--- a/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java
+++ b/server/src/main/java/org/apache/druid/guice/RouterProcessingModule.java
@@ -28,15 +28,12 @@ import org.apache.druid.collections.DummyNonBlockingPool;
 import org.apache.druid.collections.NonBlockingPool;
 import org.apache.druid.guice.annotations.Global;
 import org.apache.druid.guice.annotations.Merging;
-import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
 import org.apache.druid.query.DruidProcessingConfig;
-import org.apache.druid.query.ExecutorServiceMonitor;
-import org.apache.druid.query.ForwardingQueryProcessingPool;
+import org.apache.druid.query.NoopQueryProcessingPool;
 import org.apache.druid.query.QueryProcessingPool;
 import org.apache.druid.query.groupby.GroupByQueryConfig;
 import org.apache.druid.query.groupby.GroupByResourcesReservationPool;
-import org.apache.druid.server.metrics.MetricsModule;
 
 import java.nio.ByteBuffer;
 
@@ -46,6 +43,8 @@ import java.nio.ByteBuffer;
  * {@link org.apache.druid.query.QueryToolChest}s, and they couple query type 
aspects not related to processing and
  * caching, which Router uses, and related to processing and caching, which 
Router doesn't use, but they inject the
  * resources.
+ *
+ * @see DruidProcessingModule
  */
 public class RouterProcessingModule implements Module
 {
@@ -54,8 +53,7 @@ public class RouterProcessingModule implements Module
   @Override
   public void configure(Binder binder)
   {
-    JsonConfigProvider.bind(binder, "druid.processing", 
DruidProcessingConfig.class);
-    MetricsModule.register(binder, ExecutorServiceMonitor.class);
+    DruidProcessingModule.registerConfigsAndMonitor(binder);
   }
 
   @Provides
@@ -65,7 +63,7 @@ public class RouterProcessingModule implements Module
     if (config.isNumThreadsConfigured()) {
       log.warn("numThreads[%d] configured, that is ignored on Router", 
config.getNumThreads());
     }
-    return new ForwardingQueryProcessingPool(Execs.dummy());
+    return NoopQueryProcessingPool.instance();
   }
 
   @Provides
diff --git a/services/src/main/java/org/apache/druid/cli/CliPeon.java 
b/services/src/main/java/org/apache/druid/cli/CliPeon.java
index eb572850cda..61a8ab7374e 100644
--- a/services/src/main/java/org/apache/druid/cli/CliPeon.java
+++ b/services/src/main/java/org/apache/druid/cli/CliPeon.java
@@ -47,7 +47,6 @@ import org.apache.druid.curator.ZkEnablementConfig;
 import org.apache.druid.discovery.NodeRole;
 import org.apache.druid.guice.Binders;
 import org.apache.druid.guice.CacheModule;
-import org.apache.druid.guice.DruidProcessingModule;
 import org.apache.druid.guice.IndexingServiceInputSourceModule;
 import org.apache.druid.guice.IndexingServiceTaskLogsModule;
 import org.apache.druid.guice.IndexingServiceTuningConfigModule;
@@ -58,6 +57,7 @@ import org.apache.druid.guice.LazySingleton;
 import org.apache.druid.guice.LifecycleModule;
 import org.apache.druid.guice.ManageLifecycle;
 import org.apache.druid.guice.ManageLifecycleServer;
+import org.apache.druid.guice.PeonProcessingModule;
 import org.apache.druid.guice.PolyBind;
 import org.apache.druid.guice.QueryRunnerFactoryModule;
 import org.apache.druid.guice.QueryableModule;
@@ -205,7 +205,7 @@ public class CliPeon extends GuiceRunnable
   protected List<? extends Module> getModules()
   {
     return ImmutableList.of(
-        new DruidProcessingModule(),
+        new PeonProcessingModule(),
         new QueryableModule(),
         new QueryRunnerFactoryModule(),
         new SegmentWranglerModule(),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to