Repository: activemq-artemis
Updated Branches:
  refs/heads/master 90c946970 -> 31748a793


Change to a lock free ordered executor


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/631c2fa7
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/631c2fa7
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/631c2fa7

Branch: refs/heads/master
Commit: 631c2fa78054d12aa164ae57b9d35c26634a7e21
Parents: 90c9469
Author: Stuart Douglas <[email protected]>
Authored: Fri Dec 18 09:21:34 2015 +0900
Committer: Clebert Suconic <[email protected]>
Committed: Thu Dec 17 22:17:08 2015 -0500

----------------------------------------------------------------------
 .../artemis/utils/OrderedExecutorFactory.java   | 110 +++++++++----------
 1 file changed, 52 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/631c2fa7/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
----------------------------------------------------------------------
diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
index 7475526..18db9c7 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java
@@ -16,8 +16,10 @@
  */
 package org.apache.activemq.artemis.utils;
 
+import java.util.Queue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
 import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
@@ -54,78 +56,70 @@ public final class OrderedExecutorFactory implements 
ExecutorFactory {
     * More specifically, any call B to the {@link #execute(Runnable)} method 
that happens-after another call A to the
     * same method, will result in B's task running after A's.
     */
-   private static final class OrderedExecutor implements Executor {
+   private static class OrderedExecutor implements Executor {
 
-      private final ConcurrentLinkedQueue<Runnable> tasks = new 
ConcurrentLinkedQueue<>();
+      private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<>();
+      private final Executor delegate;
+      private final ExecutorTask task = new ExecutorTask();
 
-      // @protected by tasks
-      private boolean running;
+      private static final AtomicIntegerFieldUpdater<OrderedExecutor> 
stateUpdater = AtomicIntegerFieldUpdater.newUpdater(OrderedExecutor.class, 
"state");
 
-      private final Executor parent;
+      private static final int STATE_NOT_RUNNING = 0;
+      private static final int STATE_RUNNING = 1;
 
-      private final Runnable runner;
+      public OrderedExecutor(Executor delegate) {
+         this.delegate = delegate;
+      }
+
+
+      @Override
+      public void execute(Runnable command) {
+         tasks.add(command);
+         if (stateUpdater.get(this) == STATE_NOT_RUNNING) {
+            //note that this can result in multiple tasks being queued
+            //this is not an issue as the CAS will mean that the second (and 
subsequent) execution is ignored
+            delegate.execute(task);
+         }
+      }
 
-      /**
-       * Construct a new instance.
-       *
-       * @param parent the parent executor
-       */
-      public OrderedExecutor(final Executor parent) {
-         this.parent = parent;
-         runner = new Runnable() {
-            @Override
-            public void run() {
-               for (;;) {
-                  // Optimization, first try without any locks
+      private final class ExecutorTask implements Runnable {
+
+         @Override
+         public void run() {
+            do {
+               //if there is no thread active then we run
+               if (stateUpdater.compareAndSet(OrderedExecutor.this, 
STATE_NOT_RUNNING, STATE_RUNNING)) {
                   Runnable task = tasks.poll();
-                  if (task == null) {
-                     synchronized (tasks) {
-                        // if it's null we need to retry now holding the lock 
on tasks
-                        // this is because running=false and tasks.empty must 
be an atomic operation
-                        // so we have to retry before setting the tasks to 
false
-                        // this is a different approach to the anti-pattern on 
synchronize-retry,
-                        // as this is just guaranteeing the running=false and 
tasks.empty being an atomic operation
-                        task = tasks.poll();
-                        if (task == null) {
-                           running = false;
-                           return;
-                        }
+                  //while the queue is not empty we process in order
+                  while (task != null) {
+                     try {
+                        task.run();
                      }
+                     catch (ActiveMQInterruptedException e) {
+                        // This could happen during shutdowns. Nothing to be 
concerned about here
+                        ActiveMQClientLogger.LOGGER.debug("Interrupted 
Thread", e);
+                     }
+                     catch (Throwable t) {
+                        
ActiveMQClientLogger.LOGGER.caughtunexpectedThrowable(t);
+                     }
+                     task = tasks.poll();
                   }
-                  try {
-                     task.run();
-                  }
-                  catch (ActiveMQInterruptedException e) {
-                     // This could happen during shutdowns. Nothing to be 
concerned about here
-                     ActiveMQClientLogger.LOGGER.debug("Interrupted Thread", 
e);
-                  }
-                  catch (Throwable t) {
-                     ActiveMQClientLogger.LOGGER.caughtunexpectedThrowable(t);
-                  }
+                  //set state back to not running.
+                  stateUpdater.set(OrderedExecutor.this, STATE_NOT_RUNNING);
                }
-            }
-         };
-      }
-
-      /**
-       * Run a task.
-       *
-       * @param command the task to run.
-       */
-      @Override
-      public void execute(final Runnable command) {
-         synchronized (tasks) {
-            tasks.add(command);
-            if (!running) {
-               running = true;
-               parent.execute(runner);
-            }
+               else {
+                  return;
+               }
+               //we loop again based on tasks not being empty. Otherwise there 
is a window where the state is running,
+               //but poll() has returned null, so a submitting thread will 
believe that it does not need re-execute.
+               //this check fixes the issue
+            } while (!tasks.isEmpty());
          }
       }
 
       @Override
       public String toString() {
-         return "OrderedExecutor(running=" + running + ", tasks=" + tasks + 
")";
+         return "OrderedExecutor(tasks=" + tasks + ")";
       }
    }
 }

Reply via email to