kfaraz commented on code in PR #18163:
URL: https://github.com/apache/druid/pull/18163#discussion_r2162902024


##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -1009,6 +1008,22 @@ public Optional<TaskStatus> getTaskStatus(String id)
     this.taskClient = taskClientFactory.build(dataSource, taskInfoProvider, 
this.tuningConfig, workerExec);
   }
 
+  public static int calculateWorkerThreads(

Review Comment:
   Please add a short javadoc about what these worker threads are used for and 
how their number is computed. 



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java:
##########
@@ -2630,21 +2703,32 @@ private static DataSchema getDataSchema()
                      .build();
   }
 
-  private static SeekableStreamSupervisorIOConfig getIOConfig()
+  private static SeekableStreamSupervisorIOConfig getSupervisorIOConfig()

Review Comment:
   Nit: Please use the prefix `create` instead of `get` for these new methods 
(you can leave the existing method names unchanged) since they are creating new 
objects return than getting some field that is already present in this instance.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -970,13 +965,17 @@ public SeekableStreamSupervisor(
       );
     }
 
-    this.workerExec = MoreExecutors.listeningDecorator(
-        ScheduledExecutors.fixed(
-            workerThreads,
-            StringUtils.encodeForFormat(supervisorTag) + "-Worker-%d"
-        )
+    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
+        workerThreads,
+        Execs.makeThreadFactory(StringUtils.encodeForFormat(supervisorTag) + 
"-Worker-%d")
+    );
+    executor.setKeepAliveTime(WORKER_THREAD_KEEPALIVE_TIME_SECONDS, 
TimeUnit.SECONDS);

Review Comment:
   Nit: I think we should put this in a utility method in `ScheduledExecutors` 
class itself. Essentially, another flavor of the `fixed` method which accepts 
timeout millis too.



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java:
##########
@@ -2563,6 +2565,77 @@ public LagStats computeLagStats()
     EasyMock.verify(executorService, spec);
   }
 
+  @Test
+  public void test_calculateWorkerThreads_shouldHonourWorkerConfig()
+  {
+    final int totalThreads = 5;
+    SeekableStreamSupervisorTuningConfig tuningConfig = 
getTuningConfigWithWorkerThreads(totalThreads);
+    SeekableStreamSupervisorIOConfig ioConfig = getSupervisorIOConfig(1, null);
+    Assert.assertEquals(totalThreads, 
SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig));
+  }
+
+  @Test
+  public void test_calculateWorkerThreads_shouldUseDefaultWorkerThreads()
+  {
+    SeekableStreamSupervisorTuningConfig tuningConfig = 
getTuningConfigWithWorkerThreads();
+    SeekableStreamSupervisorIOConfig ioConfig = getSupervisorIOConfig(1, null);
+    Assert.assertEquals(
+        DEFAULT_WORKER_THREADS,
+        SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig)
+    );
+  }
+
+  @Test
+  public void 
test_calculateWorkerThreads_shouldUseMinimumWorkerThreadstWithTasks()
+  {
+    SeekableStreamSupervisorTuningConfig tuningConfig = 
getTuningConfigWithWorkerThreads();
+    SeekableStreamSupervisorIOConfig ioConfig = getSupervisorIOConfig(7, null);
+    Assert.assertEquals(
+        DEFAULT_WORKER_THREADS,
+        SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig)
+    );
+  }
+
+  @Test
+  public void test_calculateWorkerThreads_shouldUseFactorOfTaskCount()
+  {
+    SeekableStreamSupervisorTuningConfig tuningConfig = 
getTuningConfigWithWorkerThreads();
+    SeekableStreamSupervisorIOConfig ioConfig = getSupervisorIOConfig(18, 
null);
+    Assert.assertEquals(
+        4,
+        SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig)
+    );
+  }
+
+  @Test
+  public void test_calculateWorkerThreads_shouldUseAutoScalerConfig()
+  {
+    SeekableStreamSupervisorTuningConfig tuningConfig = 
getTuningConfigWithWorkerThreads();
+    AutoScalerConfig autoScalerConfig = new LagBasedAutoScalerConfig(
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        21,

Review Comment:
   Assign this value to a field `taskCountMax` so that the test is easier to 
follow.



##########
indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisor.java:
##########
@@ -940,17 +943,9 @@ public SeekableStreamSupervisor(
         spec.isSuspended()
     );
 
-    int workerThreads;
+    final int workerThreads = calculateWorkerThreads(tuningConfig, ioConfig);
     if (autoScalerConfig != null && 
autoScalerConfig.getEnableTaskAutoScaler()) {
       log.info("Running Task autoscaler for supervisor[%s] for 
datasource[%s]", supervisorId, dataSource);

Review Comment:
   Nit: I guess the log line can move to the new calculate method itself.



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java:
##########
@@ -2563,6 +2565,77 @@ public LagStats computeLagStats()
     EasyMock.verify(executorService, spec);
   }
 
+  @Test
+  public void test_calculateWorkerThreads_shouldHonourWorkerConfig()
+  {
+    final int totalThreads = 5;
+    SeekableStreamSupervisorTuningConfig tuningConfig = 
getTuningConfigWithWorkerThreads(totalThreads);
+    SeekableStreamSupervisorIOConfig ioConfig = getSupervisorIOConfig(1, null);
+    Assert.assertEquals(totalThreads, 
SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig));
+  }
+
+  @Test
+  public void test_calculateWorkerThreads_shouldUseDefaultWorkerThreads()
+  {
+    SeekableStreamSupervisorTuningConfig tuningConfig = 
getTuningConfigWithWorkerThreads();
+    SeekableStreamSupervisorIOConfig ioConfig = getSupervisorIOConfig(1, null);
+    Assert.assertEquals(
+        DEFAULT_WORKER_THREADS,
+        SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig)
+    );
+  }
+
+  @Test
+  public void 
test_calculateWorkerThreads_shouldUseMinimumWorkerThreadstWithTasks()
+  {
+    SeekableStreamSupervisorTuningConfig tuningConfig = 
getTuningConfigWithWorkerThreads();
+    SeekableStreamSupervisorIOConfig ioConfig = getSupervisorIOConfig(7, null);
+    Assert.assertEquals(
+        DEFAULT_WORKER_THREADS,
+        SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig)
+    );
+  }
+
+  @Test
+  public void test_calculateWorkerThreads_shouldUseFactorOfTaskCount()
+  {
+    SeekableStreamSupervisorTuningConfig tuningConfig = 
getTuningConfigWithWorkerThreads();
+    SeekableStreamSupervisorIOConfig ioConfig = getSupervisorIOConfig(18, 
null);
+    Assert.assertEquals(
+        4,
+        SeekableStreamSupervisor.calculateWorkerThreads(tuningConfig, ioConfig)
+    );
+  }
+
+  @Test
+  public void test_calculateWorkerThreads_shouldUseAutoScalerConfig()
+  {
+    SeekableStreamSupervisorTuningConfig tuningConfig = 
getTuningConfigWithWorkerThreads();
+    AutoScalerConfig autoScalerConfig = new LagBasedAutoScalerConfig(
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        null,
+        21,
+        null,
+        5,
+        null,
+        null,
+        true,
+        null,
+        null
+    );
+    SeekableStreamSupervisorIOConfig ioConfig = getSupervisorIOConfig(1, 
autoScalerConfig);
+    Assert.assertEquals(
+        5,

Review Comment:
   This would then become:
   
   ```suggestion
           taskCountMax / defaultTasksPerThread,
   ```



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java:
##########
@@ -2563,6 +2565,77 @@ public LagStats computeLagStats()
     EasyMock.verify(executorService, spec);
   }
 
+  @Test
+  public void test_calculateWorkerThreads_shouldHonourWorkerConfig()
+  {
+    final int totalThreads = 5;

Review Comment:
   ```suggestion
       final int numWorkerThreads = 5;
   ```



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java:
##########
@@ -2630,21 +2703,32 @@ private static DataSchema getDataSchema()
                      .build();
   }
 
-  private static SeekableStreamSupervisorIOConfig getIOConfig()
+  private static SeekableStreamSupervisorIOConfig getSupervisorIOConfig()
+  {
+    return getSupervisorIOConfig(1, 
OBJECT_MAPPER.convertValue(getProperties(), AutoScalerConfig.class));
+  }
+
+  /**
+   * Use this method to create a {@link SeekableStreamSupervisorIOConfig} with 
custom properties.
+   */

Review Comment:
   Not really needed if we name the method appropriately.



##########
indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SeekableStreamSupervisorStateTest.java:
##########
@@ -2675,13 +2759,28 @@ private static Map<String, Object> getProperties()
   }
 
   private static SeekableStreamSupervisorTuningConfig getTuningConfig()
+  {
+    return getTuningConfigWithWorkerThreads(1);
+  }
+
+  private static SeekableStreamSupervisorTuningConfig 
getTuningConfigWithWorkerThreads()

Review Comment:
   Nit: since we are not really specifying any value for `workerThreads` in 
this method, it shouldn't be included in the name.
   
   ```suggestion
     private static SeekableStreamSupervisorTuningConfig 
createSupervisorTuningConfig()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to