kfaraz commented on code in PR #18163:
URL: https://github.com/apache/druid/pull/18163#discussion_r2163100808


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -940,17 +944,9 @@ public SeekableStreamSupervisor(
         spec.isSuspended()
     );
 
-    int workerThreads;
+    final int workerThreads = calculateWorkerThreads(tuningConfig, ioConfig);

Review Comment:
   This line should be moved to where the `workerExec` is being initialized.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1009,6 +1010,33 @@ public Optional<TaskStatus> getTaskStatus(String id)
     this.taskClient = taskClientFactory.build(dataSource, taskInfoProvider, 
this.tuningConfig, workerExec);
   }
 
+  /**
+   * Calculates the number of worker threads to use for the supervisor tasks.
+   * <p>
+   * If the tuning config explicitly sets the worker thread count, that value 
is used.
+   * Otherwise, the value is derived from either the auto-scaler config (if 
enabled) or the ioConfig task count,
+   * divided by the default number of tasks per worker thread, with a minimum 
of {@code MIN_WORKER_CORE_THREADS}.

Review Comment:
   ```suggestion
      * divided by the {@link DEFAULT_TASKS_PER_WORKER_THREAD}, with a minimum 
of {@link MIN_WORKER_CORE_THREADS}.
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1009,6 +1010,33 @@ public Optional<TaskStatus> getTaskStatus(String id)
     this.taskClient = taskClientFactory.build(dataSource, taskInfoProvider, 
this.tuningConfig, workerExec);
   }
 
+  /**
+   * Calculates the number of worker threads to use for the supervisor tasks.

Review Comment:
   There should also be a line on what these worker threads are actually used 
for.



##########
processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java:
##########
@@ -172,8 +173,30 @@ public static ScheduledExecutorFactory createFactory(final 
Lifecycle lifecycle)
     return (corePoolSize, nameFormat) -> 
ExecutorServices.manageLifecycle(lifecycle, fixed(corePoolSize, nameFormat));
   }
 
+  /**
+   * Creates a new {@link ScheduledExecutorService} with a minimum number of 
threads.
+   *
+   * @param corePoolSize the minimum number of threads in the pool
+   * @param nameFormat   the naming format for threads created by the pool
+   * @return a new {@link ScheduledExecutorService} with the specified 
configuration
+   */
   public static ScheduledExecutorService fixed(int corePoolSize, String 
nameFormat)
   {
     return Executors.newScheduledThreadPool(corePoolSize, 
Execs.makeThreadFactory(nameFormat));
   }
+
+  public static ScheduledThreadPoolExecutor fixedWithKeepAliveTime(

Review Comment:
   Needs a javadoc.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1009,6 +1010,33 @@ public Optional<TaskStatus> getTaskStatus(String id)
     this.taskClient = taskClientFactory.build(dataSource, taskInfoProvider, 
this.tuningConfig, workerExec);
   }
 
+  /**
+   * Calculates the number of worker threads to use for the supervisor tasks.
+   * <p>
+   * If the tuning config explicitly sets the worker thread count, that value 
is used.
+   * Otherwise, the value is derived from either the auto-scaler config (if 
enabled) or the ioConfig task count,
+   * divided by the default number of tasks per worker thread, with a minimum 
of {@code MIN_WORKER_CORE_THREADS}.
+   *
+   * @param tuningConfig the tuning configuration
+   * @param ioConfig the IO configuration
+   * @return the number of worker threads for executor

Review Comment:
   These param descriptions don't really have any new info and can be omitted.



##########
processing/src/main/java/org/apache/druid/java/util/common/concurrent/ScheduledExecutors.java:
##########
@@ -172,8 +173,30 @@ public static ScheduledExecutorFactory createFactory(final 
Lifecycle lifecycle)
     return (corePoolSize, nameFormat) -> 
ExecutorServices.manageLifecycle(lifecycle, fixed(corePoolSize, nameFormat));
   }
 
+  /**
+   * Creates a new {@link ScheduledExecutorService} with a minimum number of 
threads.
+   *
+   * @param corePoolSize the minimum number of threads in the pool
+   * @param nameFormat   the naming format for threads created by the pool
+   * @return a new {@link ScheduledExecutorService} with the specified 
configuration
+   */
   public static ScheduledExecutorService fixed(int corePoolSize, String 
nameFormat)
   {
     return Executors.newScheduledThreadPool(corePoolSize, 
Execs.makeThreadFactory(nameFormat));
   }
+
+  public static ScheduledThreadPoolExecutor fixedWithKeepAliveTime(
+      int corePoolSize,
+      String nameFormat,
+      long keepAliveTime,
+      TimeUnit timeUnit

Review Comment:
   It might be simpler to just accept milliseconds.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1009,6 +1010,33 @@ public Optional<TaskStatus> getTaskStatus(String id)
     this.taskClient = taskClientFactory.build(dataSource, taskInfoProvider, 
this.tuningConfig, workerExec);
   }
 
+  /**
+   * Calculates the number of worker threads to use for the supervisor tasks.

Review Comment:
   ```suggestion
      * Calculates the number of worker threads to use in a {@link 
SeekableStreamSupervisor}.
   ```



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1009,6 +1010,33 @@ public Optional<TaskStatus> getTaskStatus(String id)
     this.taskClient = taskClientFactory.build(dataSource, taskInfoProvider, 
this.tuningConfig, workerExec);
   }
 
+  /**
+   * Calculates the number of worker threads to use for the supervisor tasks.
+   * <p>
+   * If the tuning config explicitly sets the worker thread count, that value 
is used.

Review Comment:
   ```suggestion
      * If the tuning config explicitly specifies the field {@code 
workerThreads}, that value is used.
   ```



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java:
##########
@@ -2675,13 +2763,28 @@ private static Map<String, Object> getProperties()
   }
 
   private static SeekableStreamSupervisorTuningConfig getTuningConfig()
+  {
+    return createSupervisorTuningConfigWithWorkerThreads(1);
+  }
+
+  private static SeekableStreamSupervisorTuningConfig 
createSupervisorTuningConfig()
+  {
+    return createSupervisorTuningConfigWithWorkerThreads(null);
+  }
+
+  /**
+   * Creates a {@link SeekableStreamSupervisorTuningConfig} with the given 
number of worker threads.
+   *
+   * @param workerThreads the number of threads to use, or null to imitate 
default settings.
+   */

Review Comment:
   Nit: Not really needed (especially in tests) unless it's adding info that is 
not obvious from the method name and signature.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to