Updated Branches: refs/heads/trunk 48d25adfc -> f7cbe9fa1
https://issues.apache.org/jira/browse/AMQ-4970 Prevent multiple start / stop operations on a queue from having any effect. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f7cbe9fa Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f7cbe9fa Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f7cbe9fa Branch: refs/heads/trunk Commit: f7cbe9fa173f1f3cf91d016ed340f90a2bd61242 Parents: 48d25ad Author: Timothy Bish <[email protected]> Authored: Mon Jan 20 10:43:44 2014 -0500 Committer: Timothy Bish <[email protected]> Committed: Mon Jan 20 10:43:44 2014 -0500 ---------------------------------------------------------------------- .../apache/activemq/broker/region/Queue.java | 104 ++++++++++++------- 1 file changed, 66 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/f7cbe9fa/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java index 10b463e..0ae4463 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java @@ -17,7 +17,18 @@ package org.apache.activemq.broker.region; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; @@ -27,6 +38,7 @@ import java.util.concurrent.Delayed; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -36,6 +48,7 @@ import javax.jms.InvalidSelectorException; import javax.jms.JMSException; import javax.jms.ResourceAllocationException; import javax.transaction.xa.XAException; + import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; @@ -52,7 +65,17 @@ import org.apache.activemq.broker.region.policy.DeadLetterStrategy; import org.apache.activemq.broker.region.policy.DispatchPolicy; import org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy; import org.apache.activemq.broker.util.InsertionCountList; -import org.apache.activemq.command.*; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ConsumerId; +import org.apache.activemq.command.ExceptionResponse; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.MessageDispatchNotification; +import org.apache.activemq.command.MessageId; +import org.apache.activemq.command.ProducerAck; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.Response; +import org.apache.activemq.command.TransactionId; import org.apache.activemq.filter.BooleanExpression; import org.apache.activemq.filter.MessageEvaluationContext; import org.apache.activemq.filter.NonCachedMessageEvaluationContext; @@ -108,6 +131,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { private CountDownLatch consumersBeforeStartsLatch; private final AtomicLong pendingWakeups = new AtomicLong(); private boolean allConsumersExclusiveByDefault = false; + private final AtomicBoolean started = new AtomicBoolean(); private boolean resetNeeded; @@ -717,7 +741,7 @@ public class Queue extends BaseDestination implements Task, UsageListener { } final ConcurrentHashMap<Transaction, SendSync> sendSyncs = new ConcurrentHashMap<Transaction, SendSync>(); - private LinkedList<Transaction> orderIndexUpdates = new LinkedList<Transaction>(); + private final LinkedList<Transaction> orderIndexUpdates = new LinkedList<Transaction>(); // roll up all message sends class SendSync extends Synchronization { @@ -995,51 +1019,55 @@ public class Queue extends BaseDestination implements Task, UsageListener { @Override public void start() throws Exception { - if (memoryUsage != null) { - memoryUsage.start(); - } - if (systemUsage.getStoreUsage() != null) { - systemUsage.getStoreUsage().start(); - } - systemUsage.getMemoryUsage().addUsageListener(this); - messages.start(); - if (getExpireMessagesPeriod() > 0) { - scheduler.schedualPeriodically(expireMessagesTask, getExpireMessagesPeriod()); + if (started.compareAndSet(false, true)) { + if (memoryUsage != null) { + memoryUsage.start(); + } + if (systemUsage.getStoreUsage() != null) { + systemUsage.getStoreUsage().start(); + } + systemUsage.getMemoryUsage().addUsageListener(this); + messages.start(); + if (getExpireMessagesPeriod() > 0) { + scheduler.executePeriodically(expireMessagesTask, getExpireMessagesPeriod()); + } + doPageIn(false); } - doPageIn(false); } @Override public void stop() throws Exception { - if (taskRunner != null) { - taskRunner.shutdown(); - } - if (this.executor != null) { - ThreadPoolUtils.shutdownNow(executor); - executor = null; - } + if (started.compareAndSet(true, false)) { + if (taskRunner != null) { + taskRunner.shutdown(); + } + if (this.executor != null) { + ThreadPoolUtils.shutdownNow(executor); + executor = null; + } - scheduler.cancel(expireMessagesTask); + scheduler.cancel(expireMessagesTask); - if (flowControlTimeoutTask.isAlive()) { - flowControlTimeoutTask.interrupt(); - } + if (flowControlTimeoutTask.isAlive()) { + flowControlTimeoutTask.interrupt(); + } - if (messages != null) { - messages.stop(); - } + if (messages != null) { + messages.stop(); + } - for (MessageReference messageReference : pagedInMessages.values()) { - messageReference.decrementReferenceCount(); - } - pagedInMessages.clear(); + for (MessageReference messageReference : pagedInMessages.values()) { + messageReference.decrementReferenceCount(); + } + pagedInMessages.clear(); - systemUsage.getMemoryUsage().removeUsageListener(this); - if (memoryUsage != null) { - memoryUsage.stop(); - } - if (store != null) { - store.stop(); + systemUsage.getMemoryUsage().removeUsageListener(this); + if (memoryUsage != null) { + memoryUsage.stop(); + } + if (store != null) { + store.stop(); + } } }
