Author: tabish
Date: Tue Dec 13 23:13:35 2011
New Revision: 1213979
URL: http://svn.apache.org/viewvc?rev=1213979&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3605
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1213979&r1=1213978&r2=1213979&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Tue Dec 13 23:13:35 2011
@@ -445,7 +445,9 @@ public class TransportConnection impleme
public Response processMessageAck(MessageAck ack) throws Exception {
ConsumerBrokerExchange consumerExchange =
getConsumerBrokerExchange(ack.getConsumerId());
- broker.acknowledge(consumerExchange, ack);
+ if (consumerExchange != null) {
+ broker.acknowledge(consumerExchange, ack);
+ }
return null;
}
@@ -529,6 +531,7 @@ public class TransportConnection impleme
broker.addConsumer(cs.getContext(), info);
try {
ss.addConsumer(info);
+ addConsumerBrokerExchange(info.getConsumerId());
} catch (IllegalStateException e) {
broker.removeConsumer(cs.getContext(), info);
}
@@ -703,8 +706,7 @@ public class TransportConnection impleme
TransportConnectionState cs = lookupConnectionState(id);
if (cs != null) {
// Don't allow things to be added to the connection state while we
- // are
- // shutting down.
+ // are shutting down.
cs.shutdown();
// Cascade the connection stop to the sessions.
for (Iterator iter = cs.getSessionIds().iterator();
iter.hasNext(); ) {
@@ -757,7 +759,6 @@ public class TransportConnection impleme
}
public void dispatchSync(Command message) {
- // getStatistics().getEnqueues().increment();
try {
processDispatch(message);
} catch (IOException e) {
@@ -767,7 +768,6 @@ public class TransportConnection impleme
public void dispatchAsync(Command message) {
if (!stopping.get()) {
- // getStatistics().getEnqueues().increment();
if (taskRunner == null) {
dispatchSync(message);
} else {
@@ -809,13 +809,12 @@ public class TransportConnection impleme
sub.run();
}
}
- // getStatistics().getDequeues().increment();
}
}
public boolean iterate() {
try {
- if (stopping.get()) {
+ if (pendingStop || stopping.get()) {
if (dispatchStopped.compareAndSet(false, true)) {
if (transportException.get() == null) {
try {
@@ -931,7 +930,6 @@ public class TransportConnection impleme
}
}
-
public void stopAsync() {
// If we're in the middle of starting then go no further... for now.
synchronized (this) {
@@ -1328,6 +1326,11 @@ public class TransportConnection impleme
private ConsumerBrokerExchange getConsumerBrokerExchange(ConsumerId id) {
ConsumerBrokerExchange result = consumerExchanges.get(id);
+ return result;
+ }
+
+ private ConsumerBrokerExchange addConsumerBrokerExchange(ConsumerId id) {
+ ConsumerBrokerExchange result = consumerExchanges.get(id);
if (result == null) {
synchronized (consumerExchanges) {
result = new ConsumerBrokerExchange();