franz1981 commented on a change in pull request #2514: ARTEMIS-2236 Address 
Latency Impact caused by ARTEMIS-1451
URL: https://github.com/apache/activemq-artemis/pull/2514#discussion_r250105709
 
 

 ##########
 File path: 
artemis-commons/src/main/java/org/apache/activemq/artemis/utils/ActiveMQThreadPoolExecutor.java
 ##########
 @@ -29,33 +29,129 @@
  * and will be removed after idling for a specified keep time.
  * But in contrast to a standard cached executor, tasks are queued if the
  * maximum pool size if reached, instead of rejected.
+ *
+ * This is achieved by using a specialized blocking queue, which checks the
+ * state of the associated executor in the offer method to decide whether to
+ * queue a task or have the executor create another thread.
+ *
+ * Since the thread pool's execute method is reentrant, more than one caller
+ * could try to offer a task into the queue. There is a small chance that
+ * (a few) more threads are created as it should be limited by max pool size.
+ * To allow for such a case not to reject a task, the underlying thread pool
+ * executor is not limited. Only the offer method checks the configured limit.
  */
 public class ActiveMQThreadPoolExecutor extends ThreadPoolExecutor {
 
-   // Handler executed when a task is submitted and a new thread cannot be 
created (because maxSize was reached)
-   // It queues the task on the executors's queue (using the add() method, see 
ThreadPoolQueue class below)
-   private static final RejectedExecutionHandler QUEUE_EXECUTION_HANDLER = (r, 
e) -> {
-      if (!e.isShutdown()) {
-         e.getQueue().add(r);
-      }
-   };
-
-   // A specialized LinkedBlockingQueue that takes new elements by calling 
add() but not offer()
-   // This is to force the ThreadPoolExecutor to always create new threads and 
never queue
+   @SuppressWarnings("serial")
    private static class ThreadPoolQueue extends LinkedBlockingQueue<Runnable> {
 
+      private ActiveMQThreadPoolExecutor executor = null;
+
+      // keep track of the difference between the number of idle threads and
+      // the number of queued tasks. If the delta is > 0, we have more
+      // idle threads than queued tasks and can add more tasks into the queue.
+      // The delta is incremented if a thread becomes idle or if a task is 
taken from the queue.
+      // The delta is decremented if a thread leaves idle state or if a task 
is added to the queue.
+      private static final AtomicIntegerFieldUpdater<ThreadPoolQueue> 
DELTA_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ThreadPoolQueue.class, 
"threadTaskDelta");
+      private volatile int threadTaskDelta = 0;
+
+      public void setExecutor(ActiveMQThreadPoolExecutor executor) {
+         this.executor = executor;
+      }
+
       @Override
       public boolean offer(Runnable runnable) {
-         return false;
+         boolean retval = false;
+
+         if (threadTaskDelta > 0 || (executor.getPoolSize() >= 
executor.getMaximumPoolSize())) {
+            // A new task will be added to the queue if the maximum number of 
threads has been reached
+            // or if the delta is > 0, which means that there are enough idle 
threads.
+
+            retval = super.offer(runnable);
+
+            // Only decrement the delta if the task has actually been added to 
the queue
+            if (retval)
+               DELTA_UPDATER.incrementAndGet(this);
+         }
+
+
+         return retval;
+      }
+
+      @Override
+      public Runnable take() throws InterruptedException {
+         // Increment the delta as a thread becomes idle
+         // by waiting for a task to take from the queue
+         DELTA_UPDATER.incrementAndGet(this);
+
+
+         Runnable runnable = null;
+
+         try {
+            runnable = super.take();
+            return runnable;
+         } finally {
+            // Now the thread is no longer idle waiting for a task
+            // If it had taken a task, the delta remains the same
+            // (decremented by the thread and incremented by the taken task)
+            // Only if no task had been taken, we have to decrement the delta.
+            if (runnable == null) {
+               DELTA_UPDATER.decrementAndGet(this);
+
+            }
+         }
       }
 
       @Override
-      public boolean add(Runnable runnable) {
-         return super.offer( runnable );
+      public Runnable poll(long arg0, TimeUnit arg2) throws 
InterruptedException {
+         // Increment the delta as a thread becomes idle
+         // by waiting for a task to poll from the queue
+         DELTA_UPDATER.incrementAndGet(this);
+
+
+         Runnable runnable = null;
+
+         try {
+            runnable = super.poll(arg0, arg2);
+         } finally {
+            // Now the thread is no longer idle waiting for a task
+            // If it had taken a task, the delta remains the same
+            // (decremented by the thread and incremented by the taken task)
+            if (runnable == null) {
+               DELTA_UPDATER.decrementAndGet(this);
+
+            }
+         }
+
+         return runnable;
       }
    }
 
+   private int maxPoolSize;
+
    public ActiveMQThreadPoolExecutor(int coreSize, int maxSize, long keep, 
TimeUnit keepUnits, ThreadFactory factory) {
-      super( coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(), 
factory, QUEUE_EXECUTION_HANDLER );
+      this(coreSize, maxSize, keep, keepUnits, new ThreadPoolQueue(), factory);
+   }
+
+   // private constructor is needed to inject 'this' into the ThreadPoolQueue 
instance
+   private ActiveMQThreadPoolExecutor(int coreSize,
+                                      int maxSize,
+                                      long keep,
+                                      TimeUnit keepUnits,
+                                      ThreadPoolQueue myQueue,
+                                      ThreadFactory factory) {
+      super(coreSize, Integer.MAX_VALUE, keep, keepUnits, myQueue, factory);
+      maxPoolSize = maxSize;
+      myQueue.setExecutor(this);
 
 Review comment:
   `myQueue.setExecutor` is bot protected by a so called safe-initialization: 
there are 2 ways to address it.
   The correct one, but more complex is:
   - declare `private volatile ActiveMQThreadPoolExecutor executor = null;` and 
perform a volatile set on `setExecutor`
   
   Same should be for maxPoolSize, given that is an int, it risk to not be 
updated atomically, if changed
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to