gianm commented on code in PR #18176:
URL: https://github.com/apache/druid/pull/18176#discussion_r2264029299


##########
processing/src/main/java/org/apache/druid/java/util/common/concurrent/Execs.java:
##########
@@ -173,6 +175,97 @@ public void rejectedExecution(Runnable r, 
ThreadPoolExecutor executor)
     );
   }
 
+  public static ExecutorService newBlockingCached(
+      final String nameFormat,
+      int minThreads,
+      int maxThreads,
+      long keepAliveTime,
+      TimeUnit keepAliveTimeUnit,
+      final Integer priority
+  )
+  {
+    final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
+    if (minThreads == maxThreads) {
+      return new ThreadPoolExecutor(
+          minThreads,
+          maxThreads,
+          keepAliveTime,
+          keepAliveTimeUnit,
+          queue,
+          makeThreadFactory(nameFormat, priority),
+          (r, executor) -> {
+            if (executor.isShutdown()) {
+              throw new RejectedExecutionException("Executor is shutdown, 
rejecting task");
+            }
+            try {
+              executor.getQueue().put(r);
+            }
+            catch (InterruptedException e) {
+              throw new RejectedExecutionException("Got Interrupted while 
adding to the Queue", e);
+            }
+          }
+      );
+    }
+    return new ThreadPoolExecutor(
+        minThreads,
+        maxThreads,
+        keepAliveTime,
+        keepAliveTimeUnit,
+        queue,
+        makeThreadFactory(nameFormat, priority),
+        (r, executor) -> {
+          if (executor.isShutdown()) {
+            throw new RejectedExecutionException("Executor is shutdown, 
rejecting task");
+          }
+          try {
+            executor.getQueue().put(r);
+          }
+          catch (InterruptedException e) {
+            throw new RejectedExecutionException("Got Interrupted while adding 
to the Queue", e);
+          }
+        }
+    )
+    {
+      private int running = 0;
+
+      @Override
+      public void execute(Runnable command)
+      {
+        synchronized (this) {
+          running++;
+          growIfNeeded();

Review Comment:
   hmm, weird way for cached pools to work, but OK. IMO a fixed size pool is 
going to be simpler and fine, so I suggest using that. The pool probably won't 
be big enough that it causes problems.
   
   For virtual threads, were you thinking the processing threads themselves 
would be virtual, and we'd do "blocking" fetches in those virtual threads? Or 
did you have something else in mind?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to