mxm commented on code in PR #15312:
URL: https://github.com/apache/iceberg/pull/15312#discussion_r2804346455
##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +163,75 @@ public static ExecutorService newWorkerPool(String
namePrefix, int poolSize) {
* that should be automatically cleaned up on JVM shutdown.
*/
public static ExecutorService newExitingWorkerPool(String namePrefix, int
poolSize) {
- return MoreExecutors.getExitingExecutorService(
- (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+ ExecutorService service =
+ Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix,
poolSize));
+ THREAD_POOLS_TO_SHUTDOWN.add(service);
+ return service;
+ }
+
+ /**
+ * Force manual shutdown of the thread pools created via the {@link
#newExitingWorkerPool(String,
+ * int)}.
+ */
+ public static void shutDownStartedThreadPools() {
Review Comment:
I think this needs to be synchronized to work in a multi-threaded
environment.
##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +163,75 @@ public static ExecutorService newWorkerPool(String
namePrefix, int poolSize) {
* that should be automatically cleaned up on JVM shutdown.
*/
public static ExecutorService newExitingWorkerPool(String namePrefix, int
poolSize) {
- return MoreExecutors.getExitingExecutorService(
- (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+ ExecutorService service =
+ Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix,
poolSize));
+ THREAD_POOLS_TO_SHUTDOWN.add(service);
+ return service;
+ }
+
+ /**
+ * Force manual shutdown of the thread pools created via the {@link
#newExitingWorkerPool(String,
+ * int)}.
+ */
+ public static void shutDownStartedThreadPools() {
+ ExecutorService item;
+ Queue<ExecutorService> invoked = new ArrayDeque<>();
+ while ((item = THREAD_POOLS_TO_SHUTDOWN.poll()) != null) {
+ item.shutdown();
+ invoked.add(item);
+ }
+ while ((item = invoked.poll()) != null) {
+ try {
+ item.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ } catch (InterruptedException ignored) {
+ // We're shutting down anyway, so just ignore.
+ }
+ }
+ }
+
+ /**
+ * Initialise a shutdown hook to stop the thread pools created via the {@link
+ * #newExitingWorkerPool(String, int)}.
+ *
+ * <p>NOTE: it is normally started automatically, and does not need to be
invoked manually, unless
+ * it has been stopped via the {@link #removeShutdownHook()}
+ */
+ @SuppressWarnings("ShutdownHook")
+ public static void initShutdownHook() {
Review Comment:
```suggestion
private static void initShutdownHook() {
```
This should either be private or be `synchronized` to avoid adding the
shutdown hook multiple times.
##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +163,75 @@ public static ExecutorService newWorkerPool(String
namePrefix, int poolSize) {
* that should be automatically cleaned up on JVM shutdown.
*/
public static ExecutorService newExitingWorkerPool(String namePrefix, int
poolSize) {
- return MoreExecutors.getExitingExecutorService(
- (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+ ExecutorService service =
+ Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix,
poolSize));
+ THREAD_POOLS_TO_SHUTDOWN.add(service);
+ return service;
+ }
+
+ /**
+ * Force manual shutdown of the thread pools created via the {@link
#newExitingWorkerPool(String,
+ * int)}.
+ */
+ public static void shutDownStartedThreadPools() {
+ ExecutorService item;
+ Queue<ExecutorService> invoked = new ArrayDeque<>();
+ while ((item = THREAD_POOLS_TO_SHUTDOWN.poll()) != null) {
+ item.shutdown();
+ invoked.add(item);
+ }
+ while ((item = invoked.poll()) != null) {
+ try {
+ item.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ } catch (InterruptedException ignored) {
+ // We're shutting down anyway, so just ignore.
+ }
+ }
+ }
+
+ /**
+ * Initialise a shutdown hook to stop the thread pools created via the {@link
+ * #newExitingWorkerPool(String, int)}.
+ *
+ * <p>NOTE: it is normally started automatically, and does not need to be
invoked manually, unless
+ * it has been stopped via the {@link #removeShutdownHook()}
+ */
+ @SuppressWarnings("ShutdownHook")
+ public static void initShutdownHook() {
+ if (shutDownHook == null) {
+ shutDownHook =
+ Executors.defaultThreadFactory()
+ .newThread(
+ new Runnable() {
+ @Override
+ public void run() {
+ shutDownStartedThreadPools();
+ }
+ });
+
+ try {
+ shutDownHook.setName("DelayedShutdownHook-iceberg");
+ } catch (SecurityException e) {
+ // OK if we can't set the name in this environment.
+ }
Review Comment:
Print a log message to warn the user? What about adding the shutdown hook
itself? Do we need try/catch for that as well?
##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -56,6 +62,14 @@ private ThreadPools() {}
public static final int AUTH_REFRESH_THREAD_POOL_SIZE =
SystemConfigs.AUTH_REFRESH_THREAD_POOL_SIZE.value();
+ private static final int SHUTDOWN_TIMEOUT_SECONDS = 120;
+
+ private static Thread shutDownHook = null;
Review Comment:
No need to initialize with null.
```suggestion
private static Thread shutDownHook;
```
##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -149,8 +163,75 @@ public static ExecutorService newWorkerPool(String
namePrefix, int poolSize) {
* that should be automatically cleaned up on JVM shutdown.
*/
public static ExecutorService newExitingWorkerPool(String namePrefix, int
poolSize) {
- return MoreExecutors.getExitingExecutorService(
- (ThreadPoolExecutor) newFixedThreadPool(namePrefix, poolSize));
+ ExecutorService service =
+ Executors.unconfigurableExecutorService(newFixedThreadPool(namePrefix,
poolSize));
+ THREAD_POOLS_TO_SHUTDOWN.add(service);
+ return service;
+ }
+
+ /**
+ * Force manual shutdown of the thread pools created via the {@link
#newExitingWorkerPool(String,
+ * int)}.
+ */
+ public static void shutDownStartedThreadPools() {
+ ExecutorService item;
+ Queue<ExecutorService> invoked = new ArrayDeque<>();
+ while ((item = THREAD_POOLS_TO_SHUTDOWN.poll()) != null) {
+ item.shutdown();
+ invoked.add(item);
+ }
+ while ((item = invoked.poll()) != null) {
+ try {
+ item.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS);
+ } catch (InterruptedException ignored) {
+ // We're shutting down anyway, so just ignore.
+ }
Review Comment:
I think we want to do that in a concurrent fashion. Waiting one by one would
mean waiting for `N * SHUTDOWN_TIMEOUT_SECONDS` seconds of time. Probably we
need to collect a list of futures and wait on their completion for max
`SHUTDOWN_TIMEOUT_SECONDS`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]