cryptoe commented on code in PR #18591:
URL: https://github.com/apache/druid/pull/18591#discussion_r2465602586


##########
extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java:
##########
@@ -179,6 +191,24 @@ protected KubernetesWorkItem joinAsync(Task task)
     }
   }
 
+  private void syncCapacityWithDynamicConfig(KubernetesTaskRunnerDynamicConfig 
config)

Review Comment:
   This looks much cleaner. Thanks @FrankChen021 for the suggestion and 
@GabrielCWT for the impl. 



##########
extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java:
##########
@@ -132,10 +142,12 @@ public KubernetesTaskRunner(
     this.httpClient = httpClient;
     this.peonLifecycleFactory = peonLifecycleFactory;
     this.cleanupExecutor = Executors.newScheduledThreadPool(1);
-    this.exec = MoreExecutors.listeningDecorator(
-        Execs.multiThreaded(config.getCapacity(), "k8s-task-runner-%d")
-    );
     this.emitter = emitter;
+
+    this.currentCapacity = config.getCapacity();
+    this.tpe = new ThreadPoolExecutor(currentCapacity, currentCapacity, 0L, 
TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), 
Execs.makeThreadFactory("k8s-task-runner-%d", null));
+    this.exec = MoreExecutors.listeningDecorator(this.tpe);
+    configManager.addListener(KubernetesTaskRunnerDynamicConfig.CONFIG_KEY, 
StringUtils.format(OBSERVER_KEY, Thread.currentThread().getId()), 
this::syncCapacityWithDynamicConfig);

Review Comment:
   Would this run in the jetty thread?



##########
extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java:
##########
@@ -111,19 +117,23 @@ public class KubernetesTaskRunner implements 
TaskLogStreamer, TaskRunner
   private final KubernetesPeonClient client;
   private final KubernetesTaskRunnerConfig config;
   private final ListeningExecutorService exec;
+  private final ThreadPoolExecutor tpe;
   private final HttpClient httpClient;
   private final PeonLifecycleFactory peonLifecycleFactory;
   private final ServiceEmitter emitter;
   // currently worker categories aren't supported, so it's hardcoded.
   protected static final String WORKER_CATEGORY = "_k8s_worker_category";
 
+  private int currentCapacity;

Review Comment:
   This should be made thread safe. Use Atomic int. 



##########
extensions-core/kubernetes-overlord-extensions/src/main/java/org/apache/druid/k8s/overlord/KubernetesTaskRunner.java:
##########
@@ -179,6 +189,24 @@ protected KubernetesWorkItem joinAsync(Task task)
     }
   }
 
+  private void syncCapacityWithDynamicConfig()

Review Comment:
   Thank you the watcher based approach looks much more cleaner. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to