This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 514ff19cb95 Reduce default pool size of worker threads in
SeekableStreamSupervisor (#18163)
514ff19cb95 is described below
commit 514ff19cb95edd43bbd0397975d1edd962a56508
Author: Uddeshya Singh <[email protected]>
AuthorDate: Tue Jun 24 15:40:54 2025 +0530
Reduce default pool size of worker threads in SeekableStreamSupervisor
(#18163)
The core pool size of worker threads is calculated as follows:
- If `tuningConfig.workerThreads` is set, use that.
- If auto scaler is enabled, use `Math.max(2, taskCountMax/4)`
- Otherwise use `Math.max(2, taskCount/2)`
---
.../supervisor/SeekableStreamSupervisor.java | 55 ++++++++---
.../SeekableStreamSupervisorStateTest.java | 108 ++++++++++++++++++++-
.../util/common/concurrent/ScheduledExecutors.java | 27 ++++++
3 files changed, 170 insertions(+), 20 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
index 01acc9de306..9c70314d5ad 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java
@@ -126,6 +126,7 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
@@ -162,6 +163,9 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
private static final long MAX_RUN_FREQUENCY_MILLIS = 1000;
private static final int MAX_INITIALIZATION_RETRIES = 20;
+ private static final int MIN_WORKER_CORE_THREADS = 2;
+ private static final int DEFAULT_TASKS_PER_WORKER_THREAD = 4;
+ private static final int WORKER_THREAD_KEEPALIVE_TIME_MILLIS = 2000;
private static final EmittingLogger log = new
EmittingLogger(SeekableStreamSupervisor.class);
@@ -940,17 +944,8 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
spec.isSuspended()
);
- int workerThreads;
if (autoScalerConfig != null &&
autoScalerConfig.getEnableTaskAutoScaler()) {
log.info("Running Task autoscaler for supervisor[%s] for
datasource[%s]", supervisorId, dataSource);
-
- workerThreads = (this.tuningConfig.getWorkerThreads() != null
- ? this.tuningConfig.getWorkerThreads()
- : Math.min(10, autoScalerConfig.getTaskCountMax()));
- } else {
- workerThreads = (this.tuningConfig.getWorkerThreads() != null
- ? this.tuningConfig.getWorkerThreads()
- : Math.min(10, this.ioConfig.getTaskCount()));
}
IdleConfig specIdleConfig = spec.getIoConfig().getIdleConfig();
@@ -970,13 +965,18 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
);
}
- this.workerExec = MoreExecutors.listeningDecorator(
- ScheduledExecutors.fixed(
- workerThreads,
- StringUtils.encodeForFormat(supervisorTag) + "-Worker-%d"
- )
+ final int workerThreads = calculateWorkerThreads(tuningConfig, ioConfig);
+ ScheduledThreadPoolExecutor executor =
ScheduledExecutors.fixedWithKeepAliveTime(
+ workerThreads,
+ StringUtils.encodeForFormat(supervisorTag) + "-Worker-%d",
+ WORKER_THREAD_KEEPALIVE_TIME_MILLIS
+ );
+
+ this.workerExec = MoreExecutors.listeningDecorator(executor);
+ log.info(
+ "Created worker pool with [%d] threads for supervisor[%s],
dataSource[%s]",
+ workerThreads, this.supervisorId, this.dataSource
);
- log.info("Created worker pool with [%d] threads for supervisor[%s] for
dataSource[%s]", workerThreads, this.supervisorId, this.dataSource);
this.taskInfoProvider = new TaskInfoProvider()
{
@@ -1009,6 +1009,31 @@ public abstract class
SeekableStreamSupervisor<PartitionIdType, SequenceOffsetTy
this.taskClient = taskClientFactory.build(dataSource, taskInfoProvider,
this.tuningConfig, workerExec);
}
+ /**
+ * Calculates the number of worker threads to use in a {@link
SeekableStreamSupervisor}.
+ * These threads are used to interact with tasks in {@link
SeekableStreamIndexTaskClient}
+ * and handle task interactions (discovery, updates etc.) in {@link
SeekableStreamSupervisor}
+ * <p>
+ * If the tuning config explicitly specifies the field {@code
workerThreads}, that value is used.
+ * Otherwise, the value is derived from either the auto-scaler config (if
enabled) or the ioConfig task count,
+ * divided by the {@link DEFAULT_TASKS_PER_WORKER_THREAD}, with a minimum of
{@link MIN_WORKER_CORE_THREADS}.
+ */
+ public static int calculateWorkerThreads(
+ SeekableStreamSupervisorTuningConfig tuningConfig,
+ SeekableStreamSupervisorIOConfig ioConfig
+ )
+ {
+ if (tuningConfig.getWorkerThreads() != null) {
+ return tuningConfig.getWorkerThreads();
+ }
+ final AutoScalerConfig autoScalerConfig = ioConfig.getAutoScalerConfig();
+ if (autoScalerConfig != null &&
autoScalerConfig.getEnableTaskAutoScaler()) {
+ return Math.max(MIN_WORKER_CORE_THREADS,
autoScalerConfig.getTaskCountMax() / DEFAULT_TASKS_PER_WORKER_THREAD);
+ } else {
+ return Math.max(MIN_WORKER_CORE_THREADS, ioConfig.getTaskCount() /
DEFAULT_TASKS_PER_WORKER_THREAD);
+ }
+ }
+
@Override
public int getActiveTaskGroupsCount()
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
index e37ff561e3c..4ae5a44c1c8 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java
@@ -72,6 +72,7 @@ import
org.apache.druid.indexing.seekablestream.common.StreamPartition;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamExceptionEvent;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorStateManager.SeekableStreamState;
import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.LagBasedAutoScalerConfig;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.Intervals;
@@ -135,6 +136,8 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
private static final StreamPartition<String> SHARD0_PARTITION =
StreamPartition.of(STREAM, SHARD_ID);
private static final String EXCEPTION_MSG = "I had an exception";
private static final Map<String, Object> METRIC_TAGS = ImmutableMap.of("k1",
"v1", "k2", 20);
+ private static final int DEFAULT_WORKER_THREADS = 2;
+ private static final int DEFAULT_TASKS_PER_WORKER_THREAD = 4;
private TaskStorage taskStorage;
private TaskMaster taskMaster;
@@ -177,7 +180,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
EasyMock.expect(spec.getSupervisorStateManagerConfig()).andReturn(supervisorConfig).anyTimes();
EasyMock.expect(spec.getDataSchema()).andReturn(getDataSchema()).anyTimes();
- EasyMock.expect(spec.getIoConfig()).andReturn(getIOConfig()).anyTimes();
+
EasyMock.expect(spec.getIoConfig()).andReturn(createSupervisorIOConfig()).anyTimes();
EasyMock.expect(spec.getTuningConfig()).andReturn(getTuningConfig()).anyTimes();
EasyMock.expect(spec.getEmitter()).andReturn(emitter).anyTimes();
EasyMock.expect(spec.getContextValue(DruidMetrics.TAGS)).andReturn(METRIC_TAGS).anyTimes();
@@ -2563,6 +2566,83 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
EasyMock.verify(executorService, spec);
}
+ @Test
+ public void test_calculateWorkerThreads_shouldHonourWorkerConfig()
+ {
+ final int numWorkerThreads = 5;
+ final int taskCount = 1;
+ SeekableStreamSupervisorTuningConfig tuningConfig =
createSupervisorTuningConfigWithWorkerThreads(numWorkerThreads);
+ SeekableStreamSupervisorIOConfig ioConfig =
createSupervisorIOConfig(taskCount, null);
+ Assert.assertEquals(numWorkerThreads,
SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig));
+ }
+
+ @Test
+ public void test_calculateWorkerThreads_shouldUseDefaultWorkerThreads()
+ {
+ final int taskCount = 1;
+ SeekableStreamSupervisorTuningConfig tuningConfig =
createSupervisorTuningConfig();
+ SeekableStreamSupervisorIOConfig ioConfig =
createSupervisorIOConfig(taskCount, null);
+ Assert.assertEquals(
+ DEFAULT_WORKER_THREADS,
+ SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig)
+ );
+ }
+
+ @Test
+ public void
test_calculateWorkerThreads_shouldUseMinimumWorkerThreadstWithTasks()
+ {
+ final int taskCount = 7;
+ SeekableStreamSupervisorTuningConfig tuningConfig =
createSupervisorTuningConfig();
+ SeekableStreamSupervisorIOConfig ioConfig =
createSupervisorIOConfig(taskCount, null);
+ Assert.assertEquals(
+ DEFAULT_WORKER_THREADS,
+ SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig)
+ );
+ }
+
+ @Test
+ public void test_calculateWorkerThreads_shouldUseFactorOfTaskCount()
+ {
+ final int taskCount = 18;
+ SeekableStreamSupervisorTuningConfig tuningConfig =
createSupervisorTuningConfig();
+ SeekableStreamSupervisorIOConfig ioConfig =
createSupervisorIOConfig(taskCount, null);
+ Assert.assertEquals(
+ taskCount / DEFAULT_TASKS_PER_WORKER_THREAD,
+ SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig)
+ );
+ }
+
+ @Test
+ public void test_calculateWorkerThreads_shouldUseAutoScalerConfig()
+ {
+ final int taskCountMax = 21;
+ final int taskCountMin = 5;
+ SeekableStreamSupervisorTuningConfig tuningConfig =
createSupervisorTuningConfig();
+ AutoScalerConfig autoScalerConfig = new LagBasedAutoScalerConfig(
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ taskCountMax,
+ null,
+ taskCountMin,
+ null,
+ null,
+ true,
+ null,
+ null
+ );
+ SeekableStreamSupervisorIOConfig ioConfig = createSupervisorIOConfig(1,
autoScalerConfig);
+ Assert.assertEquals(
+ taskCountMax / DEFAULT_TASKS_PER_WORKER_THREAD,
+ SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig)
+ );
+ }
+
private void expectEmitterSupervisor(boolean suspended)
{
spec = createMock(SeekableStreamSupervisorSpec.class);
@@ -2630,13 +2710,21 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
.build();
}
- private static SeekableStreamSupervisorIOConfig getIOConfig()
+ private static SeekableStreamSupervisorIOConfig createSupervisorIOConfig()
+ {
+ return createSupervisorIOConfig(1,
OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class));
+ }
+
+ private static SeekableStreamSupervisorIOConfig createSupervisorIOConfig(
+ int taskCount,
+ @Nullable AutoScalerConfig autoScalerConfig
+ )
{
return new SeekableStreamSupervisorIOConfig(
"stream",
new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()),
ImmutableMap.of(), false, false, false),
1,
- 1,
+ taskCount,
new Period("PT1H"),
new Period("P1D"),
new Period("PT30S"),
@@ -2644,7 +2732,7 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
new Period("PT30M"),
null,
null,
- OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class),
+ autoScalerConfig,
LagAggregator.DEFAULT,
null,
null,
@@ -2675,13 +2763,23 @@ public class SeekableStreamSupervisorStateTest extends
EasyMockSupport
}
private static SeekableStreamSupervisorTuningConfig getTuningConfig()
+ {
+ return createSupervisorTuningConfigWithWorkerThreads(1);
+ }
+
+ private static SeekableStreamSupervisorTuningConfig
createSupervisorTuningConfig()
+ {
+ return createSupervisorTuningConfigWithWorkerThreads(null);
+ }
+
+ private static SeekableStreamSupervisorTuningConfig
createSupervisorTuningConfigWithWorkerThreads(@Nullable Integer workerThreads)
{
return new SeekableStreamSupervisorTuningConfig()
{
@Override
public Integer getWorkerThreads()
{
- return 1;
+ return workerThreads;
}
@Override
diff --git
a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
index 70e7aa18a2b..da8f5bf475a 100644
---
a/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
+++
b/processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java
@@ -26,6 +26,7 @@ import org.joda.time.Duration;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ScheduledExecutors
@@ -172,8 +173,34 @@ public class ScheduledExecutors
return (corePoolSize, nameFormat) ->
ExecutorServices.manageLifecycle(lifecycle, fixed(corePoolSize, nameFormat));
}
+ /**
+ * Creates a new {@link ScheduledExecutorService} with a minimum number of
threads.
+ *
+ * @param corePoolSize the minimum number of threads in the pool
+ * @param nameFormat the naming format for threads created by the pool
+ * @return a new {@link ScheduledExecutorService} with the specified
configuration
+ */
public static ScheduledExecutorService fixed(int corePoolSize, String
nameFormat)
{
return Executors.newScheduledThreadPool(corePoolSize,
Execs.makeThreadFactory(nameFormat));
}
+
+ /**
+ * Creates a new {@link ScheduledExecutorService} with a minimum number of
threads along with a
+ * keep-alive time for idle non-core threads.
+ * <p>
+ */
+ public static ScheduledThreadPoolExecutor fixedWithKeepAliveTime(
+ int corePoolSize,
+ String nameFormat,
+ long keepAliveTimeInMillis
+ )
+ {
+ ScheduledThreadPoolExecutor scheduledExecutor = new
ScheduledThreadPoolExecutor(
+ corePoolSize,
+ Execs.makeThreadFactory(nameFormat)
+ );
+ scheduledExecutor.setKeepAliveTime(keepAliveTimeInMillis,
TimeUnit.MILLISECONDS);
+ return scheduledExecutor;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]