Author: chirino
Date: Sun Jul 2 07:31:48 2006
New Revision: 418600
URL: http://svn.apache.org/viewvc?rev=418600&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQ-724
Modified:
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
Modified:
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java?rev=418600&r1=418599&r2=418600&view=diff
==============================================================================
---
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
(original)
+++
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
Sun Jul 2 07:31:48 2006
@@ -421,7 +421,11 @@
if( ss == null )
throw new IllegalStateException("Cannot add a producer to a
session that had not been registered: "+sessionId);
broker.addProducer(cs.getContext(), info);
- ss.addProducer(info);
+ try {
+ ss.addProducer(info);
+ } catch (IllegalStateException e) {
+ broker.removeProducer(cs.getContext(), info);
+ }
return null;
}
@@ -451,7 +455,12 @@
throw new IllegalStateException("Cannot add a consumer to a
session that had not been registered: "+sessionId);
broker.addConsumer(cs.getContext(), info);
- ss.addConsumer(info);
+ try {
+ ss.addConsumer(info);
+ } catch (IllegalStateException e) {
+ broker.removeConsumer(cs.getContext(), info);
+ }
+
return null;
}
@@ -476,8 +485,12 @@
ConnectionId connectionId = info.getSessionId().getParentId();
ConnectionState cs = lookupConnectionState(connectionId);
- broker.addSession(cs.getContext(), info);
- cs.addSession(info);
+ broker.addSession(cs.getContext(), info);
+ try {
+ cs.addSession(info);
+ } catch (IllegalStateException e) {
+ broker.removeSession(cs.getContext(), info);
+ }
return null;
}
@@ -487,6 +500,10 @@
ConnectionState cs = lookupConnectionState(connectionId);
SessionState session = cs.getSessionState(id);
+
+ // Don't let new consumers or producers get added while we are closing
this down.
+ session.shutdown();
+
if( session == null )
throw new IllegalStateException("Cannot remove session that had
not been registered: "+id);
@@ -543,6 +560,9 @@
public Response processRemoveConnection(ConnectionId id) {
ConnectionState cs = lookupConnectionState(id);
+
+ // Don't allow things to be added to the connection state while we are
shutting down.
+ cs.shutdown();
// Cascade the connection stop to the sessions.
for (Iterator iter = cs.getSessionIds().iterator(); iter.hasNext();) {
Modified:
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java?rev=418600&r1=418599&r2=418600&view=diff
==============================================================================
---
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
(original)
+++
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/ConnectionState.java
Sun Jul 2 07:31:48 2006
@@ -23,6 +23,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
@@ -37,6 +38,7 @@
final ConnectionInfo info;
private final ConcurrentHashMap sessions = new ConcurrentHashMap();
private final List tempDestinations = Collections.synchronizedList(new
ArrayList());
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
public ConnectionState(ConnectionInfo info) {
this.info = info;
@@ -49,10 +51,11 @@
}
public void addTempDestination(DestinationInfo info) {
+ checkShutdown();
tempDestinations.add(info);
}
- public void removeTempDestination(ActiveMQDestination destination) {
+ public void removeTempDestination(ActiveMQDestination destination) {
for (Iterator iter = tempDestinations.iterator(); iter.hasNext();) {
DestinationInfo di = (DestinationInfo) iter.next();
if( di.getDestination().equals(destination) ) {
@@ -62,6 +65,7 @@
}
public void addSession(SessionInfo info) {
+ checkShutdown();
sessions.put(info.getSessionId(), new SessionState(info));
}
public SessionState removeSession(SessionId id) {
@@ -85,5 +89,19 @@
public Collection getSessionStates() {
return sessions.values();
- }
+ }
+
+ private void checkShutdown() {
+ if( shutdown.get() )
+ throw new IllegalStateException("Disposed");
+ }
+
+ public void shutdown() {
+ if( shutdown.compareAndSet(false, true) ) {
+ for (Iterator iter = sessions.values().iterator();
iter.hasNext();) {
+ SessionState ss = (SessionState) iter.next();
+ ss.shutdown();
+ }
+ }
+ }
}
Modified:
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java?rev=418600&r1=418599&r2=418600&view=diff
==============================================================================
---
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
(original)
+++
incubator/activemq/branches/activemq-4.0/activemq-core/src/main/java/org/apache/activemq/state/SessionState.java
Sun Jul 2 07:31:48 2006
@@ -19,6 +19,7 @@
import java.util.Collection;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
@@ -33,6 +34,7 @@
public final ConcurrentHashMap producers = new ConcurrentHashMap();
public final ConcurrentHashMap consumers = new ConcurrentHashMap();
+ private final AtomicBoolean shutdown = new AtomicBoolean(false);
public SessionState(SessionInfo info) {
this.info = info;
@@ -42,6 +44,7 @@
}
public void addProducer(ProducerInfo info) {
+ checkShutdown();
producers.put(info.getProducerId(), new ProducerState(info));
}
public ProducerState removeProducer(ProducerId id) {
@@ -49,6 +52,7 @@
}
public void addConsumer(ConsumerInfo info) {
+ checkShutdown();
consumers.put(info.getConsumerId(), new ConsumerState(info));
}
public ConsumerState removeConsumer(ConsumerId id) {
@@ -72,5 +76,15 @@
public Collection getConsumerStates() {
return consumers.values();
- }
+ }
+
+ private void checkShutdown() {
+ if( shutdown.get() )
+ throw new IllegalStateException("Disposed");
+ }
+
+ public void shutdown() {
+ shutdown.set(false);
+ }
+
}