svalaskevicius commented on code in PR #15312:
URL: https://github.com/apache/iceberg/pull/15312#discussion_r2891092036


##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -180,15 +261,74 @@ public static ScheduledExecutorService 
newScheduledPool(String namePrefix, int p
    * is suitable for long-lived thread pools that should be automatically 
cleaned up on JVM
    * shutdown.
    */
-  public static ScheduledExecutorService newExitingScheduledPool(
+  public static synchronized ScheduledExecutorService newExitingScheduledPool(
       String namePrefix, int poolSize, Duration terminationTimeout) {
-    return MoreExecutors.getExitingScheduledExecutorService(
-        (ScheduledThreadPoolExecutor) newScheduledPool(namePrefix, poolSize),
-        terminationTimeout.toMillis(),
-        TimeUnit.MILLISECONDS);
+    ScheduledExecutorService service =
+        
Executors.unconfigurableScheduledExecutorService(newScheduledPool(namePrefix, 
poolSize));
+    THREAD_POOL_MANAGER.addThreadPool(service, terminationTimeout);
+    return service;
   }
 
   private static ThreadFactory newDaemonThreadFactory(String namePrefix) {
     return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix 
+ "-%d").build();
   }
+
+  /** Manages the lifecycle of thread pools that need to be shut down 
gracefully. */
+  static class ThreadPoolManager {
+    private final List<ExecutorServiceWithTimeout> threadPoolsToShutdown = 
Lists.newArrayList();
+
+    /**
+     * Add an executor service to the list of thread pools to be shut down.
+     *
+     * @param service the executor service to add
+     * @param timeout the timeout for shutdown operations
+     */
+    synchronized void addThreadPool(ExecutorService service, Duration timeout) 
{
+      threadPoolsToShutdown.add(new ExecutorServiceWithTimeout(service, 
timeout));
+    }
+
+    /** Shut down all registered thread pools. */
+    synchronized void shutdownAll() {
+      long startTime = System.nanoTime();
+      List<ExecutorServiceWithTimeout> pendingShutdown = Lists.newArrayList();
+      for (ExecutorServiceWithTimeout item : threadPoolsToShutdown) {
+        item.getService().shutdown();
+        pendingShutdown.add(item);
+      }
+      threadPoolsToShutdown.clear();
+      for (ExecutorServiceWithTimeout item : pendingShutdown) {
+        long timeElapsed = System.nanoTime() - startTime;
+        long remainingTime = item.getTimeout().toNanos() - timeElapsed;
+        if (remainingTime > 0) {
+          try {
+            if (!item.service.awaitTermination(remainingTime, 
TimeUnit.NANOSECONDS)) {
+              item.getService().shutdownNow();
+            }
+          } catch (InterruptedException ignored) {
+            // We're shutting down anyway, so just ignore.
+          }
+        } else {
+          item.getService().shutdownNow();
+        }
+      }
+    }
+  }
+
+  static class ExecutorServiceWithTimeout {

Review Comment:
   yep. will change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to