This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
commit b9a063f2e715eeac05860dc4977dac9221e6aa93 Author: Michael André Pearce <michael.andre.pea...@me.com> AuthorDate: Wed Jan 23 09:52:24 2019 +0000 ARTEMIS-2236 - Revert Original ARTEMIS-1451 This reverts commit f8b758d1 --- .../artemis/utils/ActiveMQThreadPoolExecutor.java | 147 ++++++++++++++++++--- 1 file changed, 132 insertions(+), 15 deletions(-) diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java index ed5f4ef..c3b1988 100755 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java @@ -17,7 +17,6 @@ package org.apache.activemq.artemis.utils; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -29,33 +28,151 @@ import java.util.concurrent.TimeUnit; * and will be removed after idling for a specified keep time. * But in contrast to a standard cached executor, tasks are queued if the * maximum pool size if reached, instead of rejected. + * + * This is achieved by using a specialized blocking queue, which checks the + * state of the associated executor in the offer method to decide whether to + * queue a task or have the executor create another thread. + * + * Since the thread pool's execute method is reentrant, more than one caller + * could try to offer a task into the queue. There is a small chance that + * (a few) more threads are created as it should be limited by max pool size. + * To allow for such a case not to reject a task, the underlying thread pool + * executor is not limited. Only the offer method checks the configured limit. */ public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor { - // Handler executed when a task is submitted and a new thread cannot be created (because maxSize was reached) - // It queues the task on the executors's queue (using the add() method, see ThreadPoolQueue class below) - private static final RejectedExecutionHandler QUEUE_EXECUTION_HANDLER = (r, e) -> { - if (!e.isShutdown()) { - e.getQueue().add(r); - } - }; - - // A specialized LinkedBlockingQueue that takes new elements by calling add() but not offer() - // This is to force the ThreadPoolExecutor to always create new threads and never queue + @SuppressWarnings("serial") private static class ThreadPoolQueue extends LinkedBlockingQueue<Runnable> { + private ActiveMQThreadPoolExecutor executor = null; + + // lock object to synchronize on + private final Object lock = new Object(); + + // keep track of the difference between the number of idle threads and + // the number of queued tasks. If the delta is > 0, we have more + // idle threads than queued tasks and can add more tasks into the queue. + // The delta is incremented if a thread becomes idle or if a task is taken from the queue. + // The delta is decremented if a thread leaves idle state or if a task is added to the queue. + private int threadTaskDelta = 0; + + public void setExecutor(ActiveMQThreadPoolExecutor executor) { + this.executor = executor; + } + @Override public boolean offer(Runnable runnable) { - return false; + boolean retval = false; + + // Need to lock for 2 reasons: + // 1. to safely handle poll timeouts + // 2. to protect the delta from parallel updates + synchronized (lock) { + if ((executor.getPoolSize() >= executor.getMaximumPoolSize()) || (threadTaskDelta > 0)) { + // A new task will be added to the queue if the maximum number of threads has been reached + // or if the delta is > 0, which means that there are enough idle threads. + + retval = super.offer(runnable); + + // Only decrement the delta if the task has actually been added to the queue + if (retval) + threadTaskDelta--; + } + } + + return retval; } @Override - public boolean add(Runnable runnable) { - return super.offer( runnable ); + public Runnable take() throws InterruptedException { + // Increment the delta as a thread becomes idle + // by waiting for a task to take from the queue + synchronized (lock) { + threadTaskDelta++; + } + + Runnable runnable = null; + + try { + runnable = super.take(); + return runnable; + } finally { + // Now the thread is no longer idle waiting for a task + // If it had taken a task, the delta remains the same + // (decremented by the thread and incremented by the taken task) + // Only if no task had been taken, we have to decrement the delta. + if (runnable == null) { + synchronized (lock) { + threadTaskDelta--; + } + } + } + } + + @Override + public Runnable poll(long arg0, TimeUnit arg2) throws InterruptedException { + // Increment the delta as a thread becomes idle + // by waiting for a task to poll from the queue + synchronized (lock) { + threadTaskDelta++; + } + + Runnable runnable = null; + boolean timedOut = false; + + try { + runnable = super.poll(arg0, arg2); + timedOut = (runnable == null); + } finally { + // Now the thread is no longer idle waiting for a task + // If it had taken a task, the delta remains the same + // (decremented by the thread and incremented by the taken task) + if (runnable == null) { + synchronized (lock) { + // If the poll called timed out, we check again within a synchronized block + // to make sure all offer calls have been completed. + // This is to handle a newly queued task if the timeout occurred while an offer call + // added that task to the queue instead of creating a new thread. + if (timedOut) + runnable = super.poll(); + + // Only if no task had been taken (either no timeout, or no task from after-timeout poll), + // we have to decrement the delta. + if (runnable == null) + threadTaskDelta--; + } + } + } + + return runnable; } } + private int maxPoolSize; + public ActiveMQThreadPoolExecutor(int coreSize, int maxSize, long keep, TimeUnit keepUnits, ThreadFactory factory) { - super( coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(), factory, QUEUE_EXECUTION_HANDLER ); + this(coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(), factory); + } + + // private constructor is needed to inject 'this' into the ThreadPoolQueue instance + private ActiveMQThreadPoolExecutor(int coreSize, + int maxSize, + long keep, + TimeUnit keepUnits, + ThreadPoolQueue myQueue, + ThreadFactory factory) { + super(coreSize, Integer.MAX_VALUE, keep, keepUnits, myQueue, factory); + maxPoolSize = maxSize; + myQueue.setExecutor(this); + } + + @Override + public int getMaximumPoolSize() { + return maxPoolSize; + } + + @Override + public void setMaximumPoolSize(int maxSize) { + maxPoolSize = maxSize; } }