This is an automated email from the ASF dual-hosted git repository.
clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new 79ca203 Revert "ARTEMIS-2240 ActiveMQThreadPoolExecutor should use
LinkedTransferQueue"
new 3ccf6a8 This closes #2582
79ca203 is described below
commit 79ca203d6dd529b14dc19086a93bdb8216f1f0f4
Author: Francesco Nigro <[email protected]>
AuthorDate: Wed Mar 13 13:47:06 2019 +0100
Revert "ARTEMIS-2240 ActiveMQThreadPoolExecutor should use
LinkedTransferQueue"
This reverts commit ea29483449eb74d4bc8ee703e1161c0d67dc77b4
---
.../artemis/utils/ActiveMQThreadPoolExecutor.java | 136 +++++++++++++++------
1 file changed, 101 insertions(+), 35 deletions(-)
diff --git
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
index 628283f..da7de12 100755
---
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
+++
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
@@ -16,12 +16,11 @@
*/
package org.apache.activemq.artemis.utils;
-import java.util.Objects;
-import java.util.concurrent.LinkedTransferQueue;
-import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
/*
* ActiveMQThreadPoolExecutor: a special ThreadPoolExecutor that combines
@@ -30,58 +29,125 @@ import java.util.concurrent.TimeUnit;
* and will be removed after idling for a specified keep time.
* But in contrast to a standard cached executor, tasks are queued if the
* maximum pool size if reached, instead of rejected.
+ *
+ * This is achieved by using a specialized blocking queue, which checks the
+ * state of the associated executor in the offer method to decide whether to
+ * queue a task or have the executor create another thread.
+ *
+ * Since the thread pool's execute method is reentrant, more than one caller
+ * could try to offer a task into the queue. There is a small chance that
+ * (a few) more threads are created as it should be limited by max pool size.
+ * To allow for such a case not to reject a task, the underlying thread pool
+ * executor is not limited. Only the offer method checks the configured limit.
*/
public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor {
- /**
- * The default rejected execution handler
- */
- private static final RejectedExecutionHandler defaultHandler = new
AbortPolicy();
+ @SuppressWarnings("serial")
+ private static class ThreadPoolQueue extends LinkedBlockingQueue<Runnable> {
- // Handler executed when a task is submitted and a new thread cannot be
created (because maxSize was reached)
- // It queues the task on the executors's queue (using the add() method, see
ThreadPoolQueue class below)
- private static class QueueExecutionHandler implements
RejectedExecutionHandler {
+ private ActiveMQThreadPoolExecutor executor = null;
- private final RejectedExecutionHandler handler;
+ // keep track of the difference between the number of idle threads and
+ // the number of queued tasks. If the delta is > 0, we have more
+ // idle threads than queued tasks and can add more tasks into the queue.
+ // The delta is incremented if a thread becomes idle or if a task is
taken from the queue.
+ // The delta is decremented if a thread leaves idle state or if a task
is added to the queue.
+ private static final AtomicIntegerFieldUpdater<ThreadPoolQueue>
DELTA_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ThreadPoolQueue.class,
"threadTaskDelta");
+ private volatile int threadTaskDelta = 0;
- private QueueExecutionHandler(RejectedExecutionHandler handler) {
- Objects.requireNonNull(handler);
- this.handler = handler;
+ public void setExecutor(ActiveMQThreadPoolExecutor executor) {
+ this.executor = executor;
}
@Override
- public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- if (executor.isShutdown() || !executor.getQueue().add(r)) {
- handler.rejectedExecution(r, executor);
+ public boolean offer(Runnable runnable) {
+ boolean retval = false;
+
+ if (threadTaskDelta > 0 || (executor.getPoolSize() >=
executor.getMaximumPoolSize())) {
+ // A new task will be added to the queue if the maximum number of
threads has been reached
+ // or if the delta is > 0, which means that there are enough idle
threads.
+
+ retval = super.offer(runnable);
+
+ // Only decrement the delta if the task has actually been added to
the queue
+ if (retval)
+ DELTA_UPDATER.decrementAndGet(this);
}
- }
- }
- // A specialized LinkedBlockingQueue that takes new elements by calling
add() but not offer()
- // This is to force the ThreadPoolExecutor to always create new threads and
never queue
- private static class ThreadPoolQueue extends LinkedTransferQueue<Runnable> {
+ return retval;
+ }
@Override
- public boolean offer(Runnable runnable) {
- return tryTransfer(runnable);
+ public Runnable take() throws InterruptedException {
+ // Increment the delta as a thread becomes idle
+ // by waiting for a task to take from the queue
+ DELTA_UPDATER.incrementAndGet(this);
+
+
+ Runnable runnable = null;
+
+ try {
+ runnable = super.take();
+ return runnable;
+ } finally {
+ // Now the thread is no longer idle waiting for a task
+ // If it had taken a task, the delta remains the same
+ // (decremented by the thread and incremented by the taken task)
+ // Only if no task had been taken, we have to decrement the delta.
+ if (runnable == null) {
+ DELTA_UPDATER.decrementAndGet(this);
+ }
+ }
}
@Override
- public boolean add(Runnable runnable) {
- return super.offer(runnable);
+ public Runnable poll(long arg0, TimeUnit arg2) throws
InterruptedException {
+ // Increment the delta as a thread becomes idle
+ // by waiting for a task to poll from the queue
+ DELTA_UPDATER.incrementAndGet(this);
+
+ Runnable runnable = null;
+
+ try {
+ runnable = super.poll(arg0, arg2);
+ } finally {
+ // Now the thread is no longer idle waiting for a task
+ // If it had taken a task, the delta remains the same
+ // (decremented by the thread and incremented by the taken task)
+ if (runnable == null) {
+ DELTA_UPDATER.decrementAndGet(this);
+ }
+ }
+
+ return runnable;
}
}
+ private int maxPoolSize;
+
public ActiveMQThreadPoolExecutor(int coreSize, int maxSize, long keep,
TimeUnit keepUnits, ThreadFactory factory) {
- this(coreSize, maxSize, keep, keepUnits, factory, defaultHandler);
+ this(coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(), factory);
+ }
+
+ // private constructor is needed to inject 'this' into the ThreadPoolQueue
instance
+ private ActiveMQThreadPoolExecutor(int coreSize,
+ int maxSize,
+ long keep,
+ TimeUnit keepUnits,
+ ThreadPoolQueue myQueue,
+ ThreadFactory factory) {
+ super(coreSize, Integer.MAX_VALUE, keep, keepUnits, myQueue, factory);
+ maxPoolSize = maxSize;
+ myQueue.setExecutor(this);
+ }
+
+ @Override
+ public int getMaximumPoolSize() {
+ return maxPoolSize;
}
- public ActiveMQThreadPoolExecutor(int coreSize,
- int maxSize,
- long keep,
- TimeUnit keepUnits,
- ThreadFactory factory,
- RejectedExecutionHandler handler) {
- super(coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(),
factory, new QueueExecutionHandler(handler));
+ @Override
+ public void setMaximumPoolSize(int maxSize) {
+ maxPoolSize = maxSize;
}
-}
\ No newline at end of file
+}