This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 514ff19cb95 Reduce default pool size of worker threads in 
SeekableStreamSupervisor (#18163)
514ff19cb95 is described below

commit 514ff19cb95edd43bbd0397975d1edd962a56508
Author: Uddeshya Singh <[email protected]>
AuthorDate: Tue Jun 24 15:40:54 2025 +0530

    Reduce default pool size of worker threads in SeekableStreamSupervisor 
(#18163)
    
    The core pool size of worker threads is calculated as follows:
    - If `tuningConfig.workerThreads` is set, use that.
    - If auto scaler is enabled, use `Math.max(2, taskCountMax/4)`
    - Otherwise use `Math.max(2, taskCount/2)`
---
 .../supervisor/SeekableStreamSupervisor.java       |  55 ++++++++---
 .../SeekableStreamSupervisorStateTest.java         | 108 ++++++++++++++++++++-
 .../util/common/concurrent/ScheduledExecutors.java |  27 ++++++
 3 files changed, 170 insertions(+), 20 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 01acc9de306..9c70314d5ad 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -126,6 +126,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
@@ -162,6 +163,9 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
 
   private static final long MAX_RUN_FREQUENCY_MILLIS = 1000;
   private static final int MAX_INITIALIZATION_RETRIES = 20;
+  private static final int MIN_WORKER_CORE_THREADS = 2;
+  private static final int DEFAULT_TASKS_PER_WORKER_THREAD = 4;
+  private static final int WORKER_THREAD_KEEPALIVE_TIME_MILLIS = 2000;
 
   private static final EmittingLogger log = new 
EmittingLogger(SeekableStreamSupervisor.class);
 
@@ -940,17 +944,8 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
         spec.isSuspended()
     );
 
-    int workerThreads;
     if (autoScalerConfig != null && 
autoScalerConfig.getEnableTaskAutoScaler()) {
       log.info("Running Task autoscaler for supervisor[%s] for 
datasource[%s]", supervisorId, dataSource);
-
-      workerThreads = (this.tuningConfig.getWorkerThreads() != null
-                       ? this.tuningConfig.getWorkerThreads()
-                       : Math.min(10, autoScalerConfig.getTaskCountMax()));
-    } else {
-      workerThreads = (this.tuningConfig.getWorkerThreads() != null
-                       ? this.tuningConfig.getWorkerThreads()
-                       : Math.min(10, this.ioConfig.getTaskCount()));
     }
 
     IdleConfig specIdleConfig = spec.getIoConfig().getIdleConfig();
@@ -970,13 +965,18 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
       );
     }
 
-    this.workerExec = MoreExecutors.listeningDecorator(
-        ScheduledExecutors.fixed(
-            workerThreads,
-            StringUtils.encodeForFormat(supervisorTag) + "-Worker-%d"
-        )
+    final int workerThreads = calculateWorkerThreads(tuningConfig, ioConfig);
+    ScheduledThreadPoolExecutor executor = 
ScheduledExecutors.fixedWithKeepAliveTime(
+        workerThreads,
+        StringUtils.encodeForFormat(supervisorTag) + "-Worker-%d",
+        WORKER_THREAD_KEEPALIVE_TIME_MILLIS
+    );
+
+    this.workerExec = MoreExecutors.listeningDecorator(executor);
+    log.info(
+        "Created worker pool with [%d] threads for supervisor[%s], 
dataSource[%s]",
+        workerThreads, this.supervisorId, this.dataSource
     );
-    log.info("Created worker pool with [%d] threads for supervisor[%s] for 
dataSource[%s]", workerThreads, this.supervisorId, this.dataSource);
 
     this.taskInfoProvider = new TaskInfoProvider()
     {
@@ -1009,6 +1009,31 @@ public abstract class 
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
     this.taskClient = taskClientFactory.build(dataSource, taskInfoProvider, 
this.tuningConfig, workerExec);
   }
 
+  /**
+   * Calculates the number of worker threads to use in a {@link 
SeekableStreamSupervisor}.
+   * These threads are used to interact with tasks in {@link 
SeekableStreamIndexTaskClient}
+   * and handle task interactions (discovery, updates etc.) in {@link 
SeekableStreamSupervisor}
+   * <p>
+   * If the tuning config explicitly specifies the field {@code 
workerThreads}, that value is used.
+   * Otherwise, the value is derived from either the auto-scaler config (if 
enabled) or the ioConfig task count,
+   * divided by the {@link DEFAULT_TASKS_PER_WORKER_THREAD}, with a minimum of 
{@link MIN_WORKER_CORE_THREADS}.
+   */
+  public static int calculateWorkerThreads(
+      SeekableStreamSupervisorTuningConfig tuningConfig,
+      SeekableStreamSupervisorIOConfig ioConfig
+  )
+  {
+    if (tuningConfig.getWorkerThreads() != null) {
+      return tuningConfig.getWorkerThreads();
+    }
+    final AutoScalerConfig autoScalerConfig = ioConfig.getAutoScalerConfig();
+    if (autoScalerConfig != null && 
autoScalerConfig.getEnableTaskAutoScaler()) {
+      return Math.max(MIN_WORKER_CORE_THREADS, 
autoScalerConfig.getTaskCountMax() / DEFAULT_TASKS_PER_WORKER_THREAD);
+    } else {
+      return Math.max(MIN_WORKER_CORE_THREADS, ioConfig.getTaskCount() / 
DEFAULT_TASKS_PER_WORKER_THREAD);
+    }
+  }
+
   @Override
   public int getActiveTaskGroupsCount()
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index e37ff561e3c..4ae5a44c1c8 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -72,6 +72,7 @@ import 
org.apache.druid.indexing.seekablestream.common.StreamPartition;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamState;
 import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
 import org.apache.druid.java.util.common.DateTimes;
 import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.Intervals;
@@ -135,6 +136,8 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
   private static final StreamPartition<String> SHARD0_PARTITION = 
StreamPartition.of(STREAM, SHARD_ID);
   private static final String EXCEPTION_MSG = "I had an exception";
   private static final Map<String, Object> METRIC_TAGS = ImmutableMap.of("k1", 
"v1", "k2", 20);
+  private static final int DEFAULT_WORKER_THREADS = 2;
+  private static final int DEFAULT_TASKS_PER_WORKER_THREAD = 4;
 
   private TaskStorage taskStorage;
   private TaskMaster taskMaster;
@@ -177,7 +180,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
 
     
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
-    EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig()).anyTimes();
+    
EasyMock.expect(spec.getIoConfig()).andReturn(createSupervisorIOConfig()).anyTimes();
     
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
     EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
     
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes();
@@ -2563,6 +2566,83 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
     EasyMock.verify(executorService, spec);
   }
 
+  @Test
+  public void test_calculateWorkerThreads_shouldHonourWorkerConfig()
+  {
+    final int numWorkerThreads = 5;
+    final int taskCount = 1;
+    SeekableStreamSupervisorTuningConfig tuningConfig = 
createSupervisorTuningConfigWithWorkerThreads(numWorkerThreads);
+    SeekableStreamSupervisorIOConfig ioConfig = 
createSupervisorIOConfig(taskCount, null);
+    Assert.assertEquals(numWorkerThreads, 
SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig));
+  }
+
+  @Test
+  public void test_calculateWorkerThreads_shouldUseDefaultWorkerThreads()
+  {
+    final int taskCount = 1;
+    SeekableStreamSupervisorTuningConfig tuningConfig = 
createSupervisorTuningConfig();
+    SeekableStreamSupervisorIOConfig ioConfig = 
createSupervisorIOConfig(taskCount, null);
+    Assert.assertEquals(
+        DEFAULT_WORKER_THREADS,
+        SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig)
+    );
+  }
+
+  @Test
+  public void 
test_calculateWorkerThreads_shouldUseMinimumWorkerThreadstWithTasks()
+  {
+    final int taskCount = 7;
+    SeekableStreamSupervisorTuningConfig tuningConfig = 
createSupervisorTuningConfig();
+    SeekableStreamSupervisorIOConfig ioConfig = 
createSupervisorIOConfig(taskCount, null);
+    Assert.assertEquals(
+        DEFAULT_WORKER_THREADS,
+        SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig)
+    );
+  }
+
+  @Test
+  public void test_calculateWorkerThreads_shouldUseFactorOfTaskCount()
+  {
+    final int taskCount = 18;
+    SeekableStreamSupervisorTuningConfig tuningConfig = 
createSupervisorTuningConfig();
+    SeekableStreamSupervisorIOConfig ioConfig = 
createSupervisorIOConfig(taskCount, null);
+    Assert.assertEquals(
+        taskCount / DEFAULT_TASKS_PER_WORKER_THREAD,
+        SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig)
+    );
+  }
+
+  @Test
+  public void test_calculateWorkerThreads_shouldUseAutoScalerConfig()
+  {
+    final int taskCountMax = 21;
+    final int taskCountMin = 5;
+    SeekableStreamSupervisorTuningConfig tuningConfig = 
createSupervisorTuningConfig();
+    AutoScalerConfig autoScalerConfig = new LagBasedAutoScalerConfig(
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        taskCountMax,
+        null,
+        taskCountMin,
+        null,
+        null,
+        true,
+        null,
+        null
+    );
+    SeekableStreamSupervisorIOConfig ioConfig = createSupervisorIOConfig(1, 
autoScalerConfig);
+    Assert.assertEquals(
+        taskCountMax / DEFAULT_TASKS_PER_WORKER_THREAD,
+        SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig)
+    );
+  }
+
   private void expectEmitterSupervisor(boolean suspended)
   {
     spec = createMock(SeekableStreamSupervisorSpec.class);
@@ -2630,13 +2710,21 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
                      .build();
   }
 
-  private static SeekableStreamSupervisorIOConfig getIOConfig()
+  private static SeekableStreamSupervisorIOConfig createSupervisorIOConfig()
+  {
+    return createSupervisorIOConfig(1, 
OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class));
+  }
+
+  private static SeekableStreamSupervisorIOConfig createSupervisorIOConfig(
+      int taskCount,
+      @Nullable AutoScalerConfig autoScalerConfig
+  )
   {
     return new SeekableStreamSupervisorIOConfig(
         "stream",
         new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), 
ImmutableMap.of(), false, false, false),
         1,
-        1,
+        taskCount,
         new Period("PT1H"),
         new Period("P1D"),
         new Period("PT30S"),
@@ -2644,7 +2732,7 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
         new Period("PT30M"),
         null,
         null,
-        OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class),
+        autoScalerConfig,
         LagAggregator.DEFAULT,
         null,
         null,
@@ -2675,13 +2763,23 @@ public class SeekableStreamSupervisorStateTest extends 
EasyMockSupport
   }
 
   private static SeekableStreamSupervisorTuningConfig getTuningConfig()
+  {
+    return createSupervisorTuningConfigWithWorkerThreads(1);
+  }
+
+  private static SeekableStreamSupervisorTuningConfig 
createSupervisorTuningConfig()
+  {
+    return createSupervisorTuningConfigWithWorkerThreads(null);
+  }
+
+  private static SeekableStreamSupervisorTuningConfig 
createSupervisorTuningConfigWithWorkerThreads(@Nullable Integer workerThreads)
   {
     return new SeekableStreamSupervisorTuningConfig()
     {
       @Override
       public Integer getWorkerThreads()
       {
-        return 1;
+        return workerThreads;
       }
 
       @Override
diff --git 
a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
 
b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
index 70e7aa18a2b..da8f5bf475a 100644
--- 
a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
+++ 
b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
@@ -26,6 +26,7 @@ import org.joda.time.Duration;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 public class ScheduledExecutors
@@ -172,8 +173,34 @@ public class ScheduledExecutors
     return (corePoolSize, nameFormat) -> 
ExecutorServices.manageLifecycle(lifecycle, fixed(corePoolSize, nameFormat));
   }
 
+  /**
+   * Creates a new {@link ScheduledExecutorService} with a minimum number of 
threads.
+   *
+   * @param corePoolSize the minimum number of threads in the pool
+   * @param nameFormat   the naming format for threads created by the pool
+   * @return a new {@link ScheduledExecutorService} with the specified 
configuration
+   */
   public static ScheduledExecutorService fixed(int corePoolSize, String 
nameFormat)
   {
     return Executors.newScheduledThreadPool(corePoolSize, 
Execs.makeThreadFactory(nameFormat));
   }
+
+  /**
+   * Creates a new {@link ScheduledExecutorService} with a minimum number of 
threads along with a
+   * keep-alive time for idle non-core threads.
+   * <p>
+   */
+  public static ScheduledThreadPoolExecutor fixedWithKeepAliveTime(
+      int corePoolSize,
+      String nameFormat,
+      long keepAliveTimeInMillis
+  )
+  {
+    ScheduledThreadPoolExecutor scheduledExecutor = new 
ScheduledThreadPoolExecutor(
+        corePoolSize,
+        Execs.makeThreadFactory(nameFormat)
+    );
+    scheduledExecutor.setKeepAliveTime(keepAliveTimeInMillis, 
TimeUnit.MILLISECONDS);
+    return scheduledExecutor;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to