svalaskevicius commented on code in PR #15312:
URL: https://github.com/apache/iceberg/pull/15312#discussion_r2891082232
##########
core/src/main/java/org/apache/iceberg/util/ThreadPools.java:
##########
@@ -180,15 +261,74 @@ public static ScheduledExecutorService
newScheduledPool(String namePrefix, int p
* is suitable for long-lived thread pools that should be automatically
cleaned up on JVM
* shutdown.
*/
- public static ScheduledExecutorService newExitingScheduledPool(
+ public static synchronized ScheduledExecutorService newExitingScheduledPool(
String namePrefix, int poolSize, Duration terminationTimeout) {
- return MoreExecutors.getExitingScheduledExecutorService(
- (ScheduledThreadPoolExecutor) newScheduledPool(namePrefix, poolSize),
- terminationTimeout.toMillis(),
- TimeUnit.MILLISECONDS);
+ ScheduledExecutorService service =
+
Executors.unconfigurableScheduledExecutorService(newScheduledPool(namePrefix,
poolSize));
+ THREAD_POOL_MANAGER.addThreadPool(service, terminationTimeout);
+ return service;
}
private static ThreadFactory newDaemonThreadFactory(String namePrefix) {
return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix
+ "-%d").build();
}
+
+ /** Manages the lifecycle of thread pools that need to be shut down
gracefully. */
+ static class ThreadPoolManager {
+ private final List<ExecutorServiceWithTimeout> threadPoolsToShutdown =
Lists.newArrayList();
+
+ /**
+ * Add an executor service to the list of thread pools to be shut down.
+ *
+ * @param service the executor service to add
+ * @param timeout the timeout for shutdown operations
+ */
+ synchronized void addThreadPool(ExecutorService service, Duration timeout)
{
+ threadPoolsToShutdown.add(new ExecutorServiceWithTimeout(service,
timeout));
+ }
+
+ /** Shut down all registered thread pools. */
+ synchronized void shutdownAll() {
+ long startTime = System.nanoTime();
+ List<ExecutorServiceWithTimeout> pendingShutdown = Lists.newArrayList();
+ for (ExecutorServiceWithTimeout item : threadPoolsToShutdown) {
+ item.getService().shutdown();
+ pendingShutdown.add(item);
+ }
+ threadPoolsToShutdown.clear();
+ for (ExecutorServiceWithTimeout item : pendingShutdown) {
+ long timeElapsed = System.nanoTime() - startTime;
+ long remainingTime = item.getTimeout().toNanos() - timeElapsed;
+ if (remainingTime > 0) {
+ try {
+ if (!item.service.awaitTermination(remainingTime,
TimeUnit.NANOSECONDS)) {
+ item.getService().shutdownNow();
+ }
+ } catch (InterruptedException ignored) {
+ // We're shutting down anyway, so just ignore.
+ }
+ } else {
Review Comment:
I suppose I'll add new lines before try and after closing of that block - so
it's visually separate... (?)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]