kfaraz commented on code in PR #18163:
URL: https://github.com/apache/druid/pull/18163#discussion_r2163100808
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -940,17 +944,9 @@ public SeekableStreamSupervisor(
spec.isSuspended()
);
- int workerThreads;
+ final int workerThreads = calculateWorkerThreads(tuningConfig, ioConfig);
Review Comment:
This line should be moved to where the `workerExec` is being initialized.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1009,6 +1010,33 @@ public Optional<TaskStatus> getTaskStatus(String id)
this.taskClient = taskClientFactory.build(dataSource, taskInfoProvider,
this.tuningConfig, workerExec);
}
+ /**
+ * Calculates the number of worker threads to use for the supervisor tasks.
+ * <p>
+ * If the tuning config explicitly sets the worker thread count, that value
is used.
+ * Otherwise, the value is derived from either the auto-scaler config (if
enabled) or the ioConfig task count,
+ * divided by the default number of tasks per worker thread, with a minimum
of {@code MIN_WORKER_CORE_THREADS}.
Review Comment:
```suggestion
* divided by the {@link DEFAULT_TASKS_PER_WORKER_THREAD}, with a minimum
of {@link MIN_WORKER_CORE_THREADS}.
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1009,6 +1010,33 @@ public Optional<TaskStatus> getTaskStatus(String id)
this.taskClient = taskClientFactory.build(dataSource, taskInfoProvider,
this.tuningConfig, workerExec);
}
+ /**
+ * Calculates the number of worker threads to use for the supervisor tasks.
Review Comment:
There should also be a line on what these worker threads are actually used
for.
##########
processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java:
##########
@@ -172,8 +173,30 @@ public static ScheduledExecutorFactory createFactory(final
Lifecycle lifecycle)
return (corePoolSize, nameFormat) ->
ExecutorServices.manageLifecycle(lifecycle, fixed(corePoolSize, nameFormat));
}
+ /**
+ * Creates a new {@link ScheduledExecutorService} with a minimum number of
threads.
+ *
+ * @param corePoolSize the minimum number of threads in the pool
+ * @param nameFormat the naming format for threads created by the pool
+ * @return a new {@link ScheduledExecutorService} with the specified
configuration
+ */
public static ScheduledExecutorService fixed(int corePoolSize, String
nameFormat)
{
return Executors.newScheduledThreadPool(corePoolSize,
Execs.makeThreadFactory(nameFormat));
}
+
+ public static ScheduledThreadPoolExecutor fixedWithKeepAliveTime(
Review Comment:
Needs a javadoc.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1009,6 +1010,33 @@ public Optional<TaskStatus> getTaskStatus(String id)
this.taskClient = taskClientFactory.build(dataSource, taskInfoProvider,
this.tuningConfig, workerExec);
}
+ /**
+ * Calculates the number of worker threads to use for the supervisor tasks.
+ * <p>
+ * If the tuning config explicitly sets the worker thread count, that value
is used.
+ * Otherwise, the value is derived from either the auto-scaler config (if
enabled) or the ioConfig task count,
+ * divided by the default number of tasks per worker thread, with a minimum
of {@code MIN_WORKER_CORE_THREADS}.
+ *
+ * @param tuningConfig the tuning configuration
+ * @param ioConfig the IO configuration
+ * @return the number of worker threads for executor
Review Comment:
These param descriptions don't really have any new info and can be omitted.
##########
processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java:
##########
@@ -172,8 +173,30 @@ public static ScheduledExecutorFactory createFactory(final
Lifecycle lifecycle)
return (corePoolSize, nameFormat) ->
ExecutorServices.manageLifecycle(lifecycle, fixed(corePoolSize, nameFormat));
}
+ /**
+ * Creates a new {@link ScheduledExecutorService} with a minimum number of
threads.
+ *
+ * @param corePoolSize the minimum number of threads in the pool
+ * @param nameFormat the naming format for threads created by the pool
+ * @return a new {@link ScheduledExecutorService} with the specified
configuration
+ */
public static ScheduledExecutorService fixed(int corePoolSize, String
nameFormat)
{
return Executors.newScheduledThreadPool(corePoolSize,
Execs.makeThreadFactory(nameFormat));
}
+
+ public static ScheduledThreadPoolExecutor fixedWithKeepAliveTime(
+ int corePoolSize,
+ String nameFormat,
+ long keepAliveTime,
+ TimeUnit timeUnit
Review Comment:
It might be simpler to just accept milliseconds.
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1009,6 +1010,33 @@ public Optional<TaskStatus> getTaskStatus(String id)
this.taskClient = taskClientFactory.build(dataSource, taskInfoProvider,
this.tuningConfig, workerExec);
}
+ /**
+ * Calculates the number of worker threads to use for the supervisor tasks.
Review Comment:
```suggestion
* Calculates the number of worker threads to use in a {@link
SeekableStreamSupervisor}.
```
##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1009,6 +1010,33 @@ public Optional<TaskStatus> getTaskStatus(String id)
this.taskClient = taskClientFactory.build(dataSource, taskInfoProvider,
this.tuningConfig, workerExec);
}
+ /**
+ * Calculates the number of worker threads to use for the supervisor tasks.
+ * <p>
+ * If the tuning config explicitly sets the worker thread count, that value
is used.
Review Comment:
```suggestion
* If the tuning config explicitly specifies the field {@code
workerThreads}, that value is used.
```
##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java:
##########
@@ -2675,13 +2763,28 @@ private static Map<String, Object> getProperties()
}
private static SeekableStreamSupervisorTuningConfig getTuningConfig()
+ {
+ return createSupervisorTuningConfigWithWorkerThreads(1);
+ }
+
+ private static SeekableStreamSupervisorTuningConfig
createSupervisorTuningConfig()
+ {
+ return createSupervisorTuningConfigWithWorkerThreads(null);
+ }
+
+ /**
+ * Creates a {@link SeekableStreamSupervisorTuningConfig} with the given
number of worker threads.
+ *
+ * @param workerThreads the number of threads to use, or null to imitate
default settings.
+ */
Review Comment:
Nit: Not really needed (especially in tests) unless it's adding info that is
not obvious from the method name and signature.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]