svalaskevicius commented on code in PR #15312:
URL: https://github.com/apache/iceberg/pull/15312#discussion_r2845758223


##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +166,98 @@ public static ExecutorService newWorkerPool(String 
namePrefix, int poolSize) {
    * that should be automatically cleaned up on JVM shutdown.
    */
   public static ExecutorService newExitingWorkerPool(String namePrefix, int 
poolSize) {
-    return MoreExecutors.getExitingExecutorService(
-        (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+    ExecutorService service =
+        Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix, 
poolSize));
+    THREAD_POOLS_TO_SHUTDOWN.add(service);
+    return service;
+  }
+
+  /**
+   * Force manual shutdown of the thread pools created via the {@link 
#newExitingWorkerPool(String,
+   * int)}.
+   *
+   * <p>This method allows: (1) to stop thread pools manually, to avoid leaks 
in hot-reload
+   * environments; (2) opt-out of the standard shutdown mechanism to manage 
graceful service stops
+   * (and commit the last pending files, if the client application needs to 
react to shutdown hooks
+   * on its own).
+   *
+   * <p>Please only call this method at the end of the intended usage of the 
library, and NEVER
+   * before, as this method will stop thread pools required for normal library 
workflows.
+   */
+  public static void shutdownThreadPools() {
+    removeShutdownHook();
+    long startTime = System.nanoTime();
+    ExecutorService item;
+    Queue<ExecutorService> pendingShutdown = new ArrayDeque<>();
+    while ((item = THREAD_POOLS_TO_SHUTDOWN.poll()) != null) {
+      item.shutdown();
+      pendingShutdown.add(item);
+    }
+    while ((item = pendingShutdown.poll()) != null) {
+      long timeElapsed = System.nanoTime() - startTime;
+      long remainingTime = SHUTDOWN_TIMEOUT.toNanos() - timeElapsed;
+      if (remainingTime > 0) {
+        try {
+          if (!item.awaitTermination(remainingTime, TimeUnit.NANOSECONDS)) {
+            item.shutdownNow();
+          }
+        } catch (InterruptedException ignored) {
+          // We're shutting down anyway, so just ignore.
+        }
+      } else {
+        item.shutdownNow();
+      }
+    }
+  }
+
+  /**
+   * Initialize a shutdown hook to stop the thread pools created via the {@link
+   * #newExitingWorkerPool(String, int)}.
+   */
+  @SuppressWarnings("ShutdownHook")
+  private static void initShutdownHook() {
+    if (shutdownHook == null) {
+      shutdownHook =
+          Executors.defaultThreadFactory()
+              .newThread(
+                  new Runnable() {
+                    @Override
+                    public void run() {
+                      shutdownHook = null;
+                      shutdownThreadPools();
+                    }
+                  });
+
+      try {
+        shutdownHook.setName("DelayedShutdownHook-iceberg");
+      } catch (SecurityException e) {
+        LOG.warn("Cannot set thread name for the shutdown hook", e);
+      }
+
+      try {
+        Runtime.getRuntime().addShutdownHook(shutdownHook);
+      } catch (SecurityException e) {
+        LOG.warn("Cannot install a shutdown hook for thread pools clean up", 
e);
+      }
+    }
+  }
+
+  /**
+   * Stop the shutdown hook for the thread pools created via the {@link
+   * #newExitingWorkerPool(String, int)}.
+   *
+   * <p>Thread pools can still be stopped manually via the {@link 
#shutdownThreadPools()} method.
+   */
+  @SuppressWarnings("ShutdownHook")
+  public static void removeShutdownHook() {
+    if (shutdownHook != null) {
+      try {
+        Runtime.getRuntime().removeShutdownHook(shutdownHook);
+      } catch (SecurityException e) {
+        LOG.warn("Cannot remove the shutdown hook for thread pools clean up", 
e);
+      }
+      shutdownHook = null;

Review Comment:
   actually even simpler - changed these two methods to synchronized, as they 
are not intended to be performant - intended to be called once, not on hotpath



##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +166,98 @@ public static ExecutorService newWorkerPool(String 
namePrefix, int poolSize) {
    * that should be automatically cleaned up on JVM shutdown.
    */
   public static ExecutorService newExitingWorkerPool(String namePrefix, int 
poolSize) {
-    return MoreExecutors.getExitingExecutorService(
-        (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+    ExecutorService service =
+        Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix, 
poolSize));
+    THREAD_POOLS_TO_SHUTDOWN.add(service);
+    return service;
+  }
+
+  /**
+   * Force manual shutdown of the thread pools created via the {@link 
#newExitingWorkerPool(String,
+   * int)}.
+   *
+   * <p>This method allows: (1) to stop thread pools manually, to avoid leaks 
in hot-reload
+   * environments; (2) opt-out of the standard shutdown mechanism to manage 
graceful service stops
+   * (and commit the last pending files, if the client application needs to 
react to shutdown hooks
+   * on its own).
+   *
+   * <p>Please only call this method at the end of the intended usage of the 
library, and NEVER
+   * before, as this method will stop thread pools required for normal library 
workflows.
+   */
+  public static void shutdownThreadPools() {
+    removeShutdownHook();
+    long startTime = System.nanoTime();
+    ExecutorService item;
+    Queue<ExecutorService> pendingShutdown = new ArrayDeque<>();
+    while ((item = THREAD_POOLS_TO_SHUTDOWN.poll()) != null) {
+      item.shutdown();
+      pendingShutdown.add(item);
+    }
+    while ((item = pendingShutdown.poll()) != null) {
+      long timeElapsed = System.nanoTime() - startTime;
+      long remainingTime = SHUTDOWN_TIMEOUT.toNanos() - timeElapsed;
+      if (remainingTime > 0) {
+        try {
+          if (!item.awaitTermination(remainingTime, TimeUnit.NANOSECONDS)) {
+            item.shutdownNow();
+          }
+        } catch (InterruptedException ignored) {
+          // We're shutting down anyway, so just ignore.
+        }
+      } else {
+        item.shutdownNow();
+      }
+    }
+  }
+
+  /**
+   * Initialize a shutdown hook to stop the thread pools created via the {@link
+   * #newExitingWorkerPool(String, int)}.
+   */
+  @SuppressWarnings("ShutdownHook")
+  private static void initShutdownHook() {
+    if (shutdownHook == null) {
+      shutdownHook =
+          Executors.defaultThreadFactory()
+              .newThread(
+                  new Runnable() {
+                    @Override
+                    public void run() {
+                      shutdownHook = null;
+                      shutdownThreadPools();
+                    }
+                  });
+
+      try {
+        shutdownHook.setName("DelayedShutdownHook-iceberg");
+      } catch (SecurityException e) {
+        LOG.warn("Cannot set thread name for the shutdown hook", e);
+      }
+
+      try {
+        Runtime.getRuntime().addShutdownHook(shutdownHook);
+      } catch (SecurityException e) {
+        LOG.warn("Cannot install a shutdown hook for thread pools clean up", 
e);
+      }
+    }
+  }
+
+  /**
+   * Stop the shutdown hook for the thread pools created via the {@link
+   * #newExitingWorkerPool(String, int)}.
+   *
+   * <p>Thread pools can still be stopped manually via the {@link 
#shutdownThreadPools()} method.
+   */
+  @SuppressWarnings("ShutdownHook")
+  public static void removeShutdownHook() {
+    if (shutdownHook != null) {
+      try {
+        Runtime.getRuntime().removeShutdownHook(shutdownHook);
+      } catch (SecurityException e) {
+        LOG.warn("Cannot remove the shutdown hook for thread pools clean up", 
e);
+      }
+      shutdownHook = null;

Review Comment:
   actually even simpler - changed these two methods to synchronized, as they 
are not intended to be performant - intended to be called once, not on hotpath, 
please check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to