This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 79ca203  Revert "ARTEMIS-2240 ActiveMQThreadPoolExecutor should use 
LinkedTransferQueue"
     new 3ccf6a8  This closes #2582
79ca203 is described below

commit 79ca203d6dd529b14dc19086a93bdb8216f1f0f4
Author: Francesco Nigro <[email protected]>
AuthorDate: Wed Mar 13 13:47:06 2019 +0100

    Revert "ARTEMIS-2240 ActiveMQThreadPoolExecutor should use 
LinkedTransferQueue"
    
    This reverts commit ea29483449eb74d4bc8ee703e1161c0d67dc77b4
---
 .../artemis/utils/ActiveMQThreadPoolExecutor.java  | 136 +++++++++++++++------
 1 file changed, 101 insertions(+), 35 deletions(-)

diff --git 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
index 628283f..da7de12 100755
--- 
a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
+++ 
b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
@@ -16,12 +16,11 @@
  */
 package org.apache.activemq.artemis.utils;
 
-import java.util.Objects;
-import java.util.concurrent.LinkedTransferQueue;
-import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 /*
  * ActiveMQThreadPoolExecutor: a special ThreadPoolExecutor that combines
@@ -30,58 +29,125 @@ import java.util.concurrent.TimeUnit;
  * and will be removed after idling for a specified keep time.
  * But in contrast to a standard cached executor, tasks are queued if the
  * maximum pool size if reached, instead of rejected.
+ *
+ * This is achieved by using a specialized blocking queue, which checks the
+ * state of the associated executor in the offer method to decide whether to
+ * queue a task or have the executor create another thread.
+ *
+ * Since the thread pool's execute method is reentrant, more than one caller
+ * could try to offer a task into the queue. There is a small chance that
+ * (a few) more threads are created as it should be limited by max pool size.
+ * To allow for such a case not to reject a task, the underlying thread pool
+ * executor is not limited. Only the offer method checks the configured limit.
  */
 public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor {
 
-   /**
-    * The default rejected execution handler
-    */
-   private static final RejectedExecutionHandler defaultHandler = new 
AbortPolicy();
+   @SuppressWarnings("serial")
+   private static class ThreadPoolQueue extends LinkedBlockingQueue<Runnable> {
 
-   // Handler executed when a task is submitted and a new thread cannot be 
created (because maxSize was reached)
-   // It queues the task on the executors's queue (using the add() method, see 
ThreadPoolQueue class below)
-   private static class QueueExecutionHandler implements 
RejectedExecutionHandler {
+      private ActiveMQThreadPoolExecutor executor = null;
 
-      private final RejectedExecutionHandler handler;
+      // keep track of the difference between the number of idle threads and
+      // the number of queued tasks. If the delta is > 0, we have more
+      // idle threads than queued tasks and can add more tasks into the queue.
+      // The delta is incremented if a thread becomes idle or if a task is 
taken from the queue.
+      // The delta is decremented if a thread leaves idle state or if a task 
is added to the queue.
+      private static final AtomicIntegerFieldUpdater<ThreadPoolQueue> 
DELTA_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ThreadPoolQueue.class, 
"threadTaskDelta");
+      private volatile int threadTaskDelta = 0;
 
-      private QueueExecutionHandler(RejectedExecutionHandler handler) {
-         Objects.requireNonNull(handler);
-         this.handler = handler;
+      public void setExecutor(ActiveMQThreadPoolExecutor executor) {
+         this.executor = executor;
       }
 
       @Override
-      public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
-         if (executor.isShutdown() || !executor.getQueue().add(r)) {
-            handler.rejectedExecution(r, executor);
+      public boolean offer(Runnable runnable) {
+         boolean retval = false;
+
+         if (threadTaskDelta > 0 || (executor.getPoolSize() >= 
executor.getMaximumPoolSize())) {
+            // A new task will be added to the queue if the maximum number of 
threads has been reached
+            // or if the delta is > 0, which means that there are enough idle 
threads.
+
+            retval = super.offer(runnable);
+
+            // Only decrement the delta if the task has actually been added to 
the queue
+            if (retval)
+               DELTA_UPDATER.decrementAndGet(this);
          }
-      }
-   }
 
-   // A specialized LinkedBlockingQueue that takes new elements by calling 
add() but not offer()
-   // This is to force the ThreadPoolExecutor to always create new threads and 
never queue
-   private static class ThreadPoolQueue extends LinkedTransferQueue<Runnable> {
+         return retval;
+      }
 
       @Override
-      public boolean offer(Runnable runnable) {
-         return tryTransfer(runnable);
+      public Runnable take() throws InterruptedException {
+         // Increment the delta as a thread becomes idle
+         // by waiting for a task to take from the queue
+         DELTA_UPDATER.incrementAndGet(this);
+
+
+         Runnable runnable = null;
+
+         try {
+            runnable = super.take();
+            return runnable;
+         } finally {
+            // Now the thread is no longer idle waiting for a task
+            // If it had taken a task, the delta remains the same
+            // (decremented by the thread and incremented by the taken task)
+            // Only if no task had been taken, we have to decrement the delta.
+            if (runnable == null) {
+               DELTA_UPDATER.decrementAndGet(this);
+            }
+         }
       }
 
       @Override
-      public boolean add(Runnable runnable) {
-         return super.offer(runnable);
+      public Runnable poll(long arg0, TimeUnit arg2) throws 
InterruptedException {
+         // Increment the delta as a thread becomes idle
+         // by waiting for a task to poll from the queue
+         DELTA_UPDATER.incrementAndGet(this);
+
+         Runnable runnable = null;
+
+         try {
+            runnable = super.poll(arg0, arg2);
+         } finally {
+            // Now the thread is no longer idle waiting for a task
+            // If it had taken a task, the delta remains the same
+            // (decremented by the thread and incremented by the taken task)
+            if (runnable == null) {
+               DELTA_UPDATER.decrementAndGet(this);
+            }
+         }
+
+         return runnable;
       }
    }
 
+   private int maxPoolSize;
+
    public ActiveMQThreadPoolExecutor(int coreSize, int maxSize, long keep, 
TimeUnit keepUnits, ThreadFactory factory) {
-      this(coreSize, maxSize, keep, keepUnits, factory, defaultHandler);
+      this(coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(), factory);
+   }
+
+   // private constructor is needed to inject 'this' into the ThreadPoolQueue 
instance
+   private ActiveMQThreadPoolExecutor(int coreSize,
+                                      int maxSize,
+                                      long keep,
+                                      TimeUnit keepUnits,
+                                      ThreadPoolQueue myQueue,
+                                      ThreadFactory factory) {
+      super(coreSize, Integer.MAX_VALUE, keep, keepUnits, myQueue, factory);
+      maxPoolSize = maxSize;
+      myQueue.setExecutor(this);
+   }
+
+   @Override
+   public int getMaximumPoolSize() {
+      return maxPoolSize;
    }
 
-   public ActiveMQThreadPoolExecutor(int coreSize,
-                                     int maxSize,
-                                     long keep,
-                                     TimeUnit keepUnits,
-                                     ThreadFactory factory,
-                                     RejectedExecutionHandler handler) {
-      super(coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(), 
factory, new QueueExecutionHandler(handler));
+   @Override
+   public void setMaximumPoolSize(int maxSize) {
+      maxPoolSize = maxSize;
    }
-}
\ No newline at end of file
+}

Reply via email to