Repository: activemq-artemis Updated Branches: refs/heads/master 90c946970 -> 31748a793
Change to a lock free ordered executor Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/631c2fa7 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/631c2fa7 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/631c2fa7 Branch: refs/heads/master Commit: 631c2fa78054d12aa164ae57b9d35c26634a7e21 Parents: 90c9469 Author: Stuart Douglas <[email protected]> Authored: Fri Dec 18 09:21:34 2015 +0900 Committer: Clebert Suconic <[email protected]> Committed: Thu Dec 17 22:17:08 2015 -0500 ---------------------------------------------------------------------- .../artemis/utils/OrderedExecutorFactory.java | 110 +++++++++---------- 1 file changed, 52 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/631c2fa7/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java ---------------------------------------------------------------------- diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java index 7475526..18db9c7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/utils/OrderedExecutorFactory.java @@ -16,8 +16,10 @@ */ package org.apache.activemq.artemis.utils; +import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException; import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; @@ -54,78 +56,70 @@ public final class OrderedExecutorFactory implements ExecutorFactory { * More specifically, any call B to the {@link #execute(Runnable)} method that happens-after another call A to the * same method, will result in B's task running after A's. */ - private static final class OrderedExecutor implements Executor { + private static class OrderedExecutor implements Executor { - private final ConcurrentLinkedQueue<Runnable> tasks = new ConcurrentLinkedQueue<>(); + private final Queue<Runnable> tasks = new ConcurrentLinkedQueue<>(); + private final Executor delegate; + private final ExecutorTask task = new ExecutorTask(); - // @protected by tasks - private boolean running; + private static final AtomicIntegerFieldUpdater<OrderedExecutor> stateUpdater = AtomicIntegerFieldUpdater.newUpdater(OrderedExecutor.class, "state"); - private final Executor parent; + private static final int STATE_NOT_RUNNING = 0; + private static final int STATE_RUNNING = 1; - private final Runnable runner; + public OrderedExecutor(Executor delegate) { + this.delegate = delegate; + } + + + @Override + public void execute(Runnable command) { + tasks.add(command); + if (stateUpdater.get(this) == STATE_NOT_RUNNING) { + //note that this can result in multiple tasks being queued + //this is not an issue as the CAS will mean that the second (and subsequent) execution is ignored + delegate.execute(task); + } + } - /** - * Construct a new instance. - * - * @param parent the parent executor - */ - public OrderedExecutor(final Executor parent) { - this.parent = parent; - runner = new Runnable() { - @Override - public void run() { - for (;;) { - // Optimization, first try without any locks + private final class ExecutorTask implements Runnable { + + @Override + public void run() { + do { + //if there is no thread active then we run + if (stateUpdater.compareAndSet(OrderedExecutor.this, STATE_NOT_RUNNING, STATE_RUNNING)) { Runnable task = tasks.poll(); - if (task == null) { - synchronized (tasks) { - // if it's null we need to retry now holding the lock on tasks - // this is because running=false and tasks.empty must be an atomic operation - // so we have to retry before setting the tasks to false - // this is a different approach to the anti-pattern on synchronize-retry, - // as this is just guaranteeing the running=false and tasks.empty being an atomic operation - task = tasks.poll(); - if (task == null) { - running = false; - return; - } + //while the queue is not empty we process in order + while (task != null) { + try { + task.run(); } + catch (ActiveMQInterruptedException e) { + // This could happen during shutdowns. Nothing to be concerned about here + ActiveMQClientLogger.LOGGER.debug("Interrupted Thread", e); + } + catch (Throwable t) { + ActiveMQClientLogger.LOGGER.caughtunexpectedThrowable(t); + } + task = tasks.poll(); } - try { - task.run(); - } - catch (ActiveMQInterruptedException e) { - // This could happen during shutdowns. Nothing to be concerned about here - ActiveMQClientLogger.LOGGER.debug("Interrupted Thread", e); - } - catch (Throwable t) { - ActiveMQClientLogger.LOGGER.caughtunexpectedThrowable(t); - } + //set state back to not running. + stateUpdater.set(OrderedExecutor.this, STATE_NOT_RUNNING); } - } - }; - } - - /** - * Run a task. - * - * @param command the task to run. - */ - @Override - public void execute(final Runnable command) { - synchronized (tasks) { - tasks.add(command); - if (!running) { - running = true; - parent.execute(runner); - } + else { + return; + } + //we loop again based on tasks not being empty. Otherwise there is a window where the state is running, + //but poll() has returned null, so a submitting thread will believe that it does not need re-execute. + //this check fixes the issue + } while (!tasks.isEmpty()); } } @Override public String toString() { - return "OrderedExecutor(running=" + running + ", tasks=" + tasks + ")"; + return "OrderedExecutor(tasks=" + tasks + ")"; } } }
