This is an automated email from the ASF dual-hosted git repository. ahuber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/isis.git
commit 539e004310ef7c2511e699ea01e00dca538304a8 Author: Andi Huber <ahu...@apache.org> AuthorDate: Fri Aug 31 07:00:09 2018 +0200 ISIS-1974: (porting from 1.16.x) support sequential execution in ThreadPoolSupport Task-Url: https://issues.apache.org/jira/browse/ISIS-1895 --- .../core/runtime/threadpool/ThreadPoolSupport.java | 69 ++++++++++++++++++---- 1 file changed, 56 insertions(+), 13 deletions(-) diff --git a/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupport.java b/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupport.java index 5fb798e..7a37e70 100644 --- a/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupport.java +++ b/core/metamodel/src/main/java/org/apache/isis/core/runtime/threadpool/ThreadPoolSupport.java @@ -19,6 +19,9 @@ package org.apache.isis.core.runtime.threadpool; +import static org.apache.isis.commons.internal.base._NullSafe.isEmpty; + +import java.util.Collections; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; @@ -28,12 +31,16 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; + +import javax.annotation.Nullable; -import org.apache.isis.commons.internal.collections._Lists; -import org.apache.isis.commons.internal.context._Context; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.isis.commons.internal.collections._Lists; +import org.apache.isis.commons.internal.context._Context; + /** * ThreadPoolSupport is application-scoped, meaning ThreadPoolSupport is closed on * application's end of life-cycle. @@ -48,6 +55,7 @@ public final class ThreadPoolSupport implements AutoCloseable { private final ThreadGroup group; private final ThreadPoolExecutor executor; + private final ThreadPoolExecutor sequentialExecutor; private ThreadPoolSupport() { @@ -56,21 +64,26 @@ public final class ThreadPoolSupport implements AutoCloseable { final int corePoolSize = Runtime.getRuntime().availableProcessors(); final int maximumPoolSize = Runtime.getRuntime().availableProcessors(); final int keepAliveTimeSecs = 5; + + final ThreadFactory threadFactory = (Runnable r) -> new Thread(group, r); final int queueCapacity = 25; - final BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(queueCapacity); - + final Supplier<BlockingQueue<Runnable>> workQueueFactory = + ()->new LinkedBlockingQueue<>(queueCapacity); + + executor = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTimeSecs, TimeUnit.SECONDS, - workQueue, - new ThreadFactory() { - @Override - public Thread newThread(final Runnable r) { - return new Thread(group, r); - } - }); + workQueueFactory.get(), + threadFactory); + + sequentialExecutor = new ThreadPoolExecutor(1, 1, // fixed size = 1 + keepAliveTimeSecs, TimeUnit.MILLISECONDS, + workQueueFactory.get(), + threadFactory); + } public static List<Object> join(final List<Future<Object>> futures) { @@ -100,7 +113,16 @@ public final class ThreadPoolSupport implements AutoCloseable { return null; } - public List<Future<Object>> invokeAll(final List<Callable<Object>> callables) { + /** + * Executes specified {@code callables} on the default executor. + * See {@link ThreadPoolExecutor#invokeAll(java.util.Collection)} + * @param callables nullable + * @return non-null + */ + public List<Future<Object>> invokeAll(@Nullable final List<Callable<Object>> callables) { + if(isEmpty(callables)) { + return Collections.emptyList(); + } try { return executor.invokeAll(callables); } catch (InterruptedException e) { @@ -108,13 +130,34 @@ public final class ThreadPoolSupport implements AutoCloseable { } } + /** + * Executes specified {@code callables} on the sequential executor in sequence, one by one. + * @param callables nullable + * @return non-null + */ + public List<Future<Object>> invokeAllSequential(@Nullable final List<Callable<Object>> callables) { + if(isEmpty(callables)) { + return Collections.emptyList(); + } + try { + return sequentialExecutor.invokeAll(callables); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + public static ThreadPoolSupport getInstance() { return _Context.computeIfAbsent(ThreadPoolSupport.class, __-> new ThreadPoolSupport()); } @Override public void close() throws Exception { - executor.shutdown(); + try { + executor.shutdown(); + } finally { + // in case the previous throws, continue execution here + sequentialExecutor.shutdown(); + } } }