mxm commented on code in PR #15312:
URL: https://github.com/apache/iceberg/pull/15312#discussion_r2804346455


##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +163,75 @@ public static ExecutorService newWorkerPool(String 
namePrefix, int poolSize) {
    * that should be automatically cleaned up on JVM shutdown.
    */
   public static ExecutorService newExitingWorkerPool(String namePrefix, int 
poolSize) {
-    return MoreExecutors.getExitingExecutorService(
-        (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+    ExecutorService service =
+        Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix, 
poolSize));
+    THREAD_POOLS_TO_SHUTDOWN.add(service);
+    return service;
+  }
+
+  /**
+   * Force manual shutdown of the thread pools created via the {@link 
#newExitingWorkerPool(String,
+   * int)}.
+   */
+  public static void shutDownStartedThreadPools() {

Review Comment:
   I think this needs to be synchronized to work in a multi-threaded 
environment.



##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +163,75 @@ public static ExecutorService newWorkerPool(String 
namePrefix, int poolSize) {
    * that should be automatically cleaned up on JVM shutdown.
    */
   public static ExecutorService newExitingWorkerPool(String namePrefix, int 
poolSize) {
-    return MoreExecutors.getExitingExecutorService(
-        (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+    ExecutorService service =
+        Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix, 
poolSize));
+    THREAD_POOLS_TO_SHUTDOWN.add(service);
+    return service;
+  }
+
+  /**
+   * Force manual shutdown of the thread pools created via the {@link 
#newExitingWorkerPool(String,
+   * int)}.
+   */
+  public static void shutDownStartedThreadPools() {
+    ExecutorService item;
+    Queue<ExecutorService> invoked = new ArrayDeque<>();
+    while ((item = THREAD_POOLS_TO_SHUTDOWN.poll()) != null) {
+      item.shutdown();
+      invoked.add(item);
+    }
+    while ((item = invoked.poll()) != null) {
+      try {
+        item.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+      } catch (InterruptedException ignored) {
+        // We're shutting down anyway, so just ignore.
+      }
+    }
+  }
+
+  /**
+   * Initialise a shutdown hook to stop the thread pools created via the {@link
+   * #newExitingWorkerPool(String, int)}.
+   *
+   * <p>NOTE: it is normally started automatically, and does not need to be 
invoked manually, unless
+   * it has been stopped via the {@link #removeShutdownHook()}
+   */
+  @SuppressWarnings("ShutdownHook")
+  public static void initShutdownHook() {

Review Comment:
   ```suggestion
     private static void initShutdownHook() {
   ```
   
   This should either be private or be `synchronized` to avoid adding the 
shutdown hook multiple times.



##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +163,75 @@ public static ExecutorService newWorkerPool(String 
namePrefix, int poolSize) {
    * that should be automatically cleaned up on JVM shutdown.
    */
   public static ExecutorService newExitingWorkerPool(String namePrefix, int 
poolSize) {
-    return MoreExecutors.getExitingExecutorService(
-        (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+    ExecutorService service =
+        Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix, 
poolSize));
+    THREAD_POOLS_TO_SHUTDOWN.add(service);
+    return service;
+  }
+
+  /**
+   * Force manual shutdown of the thread pools created via the {@link 
#newExitingWorkerPool(String,
+   * int)}.
+   */
+  public static void shutDownStartedThreadPools() {
+    ExecutorService item;
+    Queue<ExecutorService> invoked = new ArrayDeque<>();
+    while ((item = THREAD_POOLS_TO_SHUTDOWN.poll()) != null) {
+      item.shutdown();
+      invoked.add(item);
+    }
+    while ((item = invoked.poll()) != null) {
+      try {
+        item.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+      } catch (InterruptedException ignored) {
+        // We're shutting down anyway, so just ignore.
+      }
+    }
+  }
+
+  /**
+   * Initialise a shutdown hook to stop the thread pools created via the {@link
+   * #newExitingWorkerPool(String, int)}.
+   *
+   * <p>NOTE: it is normally started automatically, and does not need to be 
invoked manually, unless
+   * it has been stopped via the {@link #removeShutdownHook()}
+   */
+  @SuppressWarnings("ShutdownHook")
+  public static void initShutdownHook() {
+    if (shutDownHook == null) {
+      shutDownHook =
+          Executors.defaultThreadFactory()
+              .newThread(
+                  new Runnable() {
+                    @Override
+                    public void run() {
+                      shutDownStartedThreadPools();
+                    }
+                  });
+
+      try {
+        shutDownHook.setName("DelayedShutdownHook-iceberg");
+      } catch (SecurityException e) {
+        // OK if we can't set the name in this environment.
+      }

Review Comment:
   Print a log message to warn the user? What about adding the shutdown hook 
itself? Do we need try/catch for that as well?



##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -56,6 +62,14 @@ private ThreadPools() {}
   public static final int AUTH_REFRESH_THREAD_POOL_SIZE =
       SystemConfigs.AUTH_REFRESH_THREAD_POOL_SIZE.value();
 
+  private static final int SHUTDOWN_TIMEOUT_SECONDS = 120;
+
+  private static Thread shutDownHook = null;

Review Comment:
   No need to initialize with null.
   
   ```suggestion
     private static Thread shutDownHook;
   ```



##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +163,75 @@ public static ExecutorService newWorkerPool(String 
namePrefix, int poolSize) {
    * that should be automatically cleaned up on JVM shutdown.
    */
   public static ExecutorService newExitingWorkerPool(String namePrefix, int 
poolSize) {
-    return MoreExecutors.getExitingExecutorService(
-        (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+    ExecutorService service =
+        Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix, 
poolSize));
+    THREAD_POOLS_TO_SHUTDOWN.add(service);
+    return service;
+  }
+
+  /**
+   * Force manual shutdown of the thread pools created via the {@link 
#newExitingWorkerPool(String,
+   * int)}.
+   */
+  public static void shutDownStartedThreadPools() {
+    ExecutorService item;
+    Queue<ExecutorService> invoked = new ArrayDeque<>();
+    while ((item = THREAD_POOLS_TO_SHUTDOWN.poll()) != null) {
+      item.shutdown();
+      invoked.add(item);
+    }
+    while ((item = invoked.poll()) != null) {
+      try {
+        item.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+      } catch (InterruptedException ignored) {
+        // We're shutting down anyway, so just ignore.
+      }

Review Comment:
   I think we want to do that in a concurrent fashion. Waiting one by one would 
mean waiting for `N * SHUTDOWN_TIMEOUT_SECONDS` seconds of time. Probably we 
need to collect a list of futures and wait on their completion for max 
`SHUTDOWN_TIMEOUT_SECONDS`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to