key the connections session store by their JmsSessionInfo objects for later lookup
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/d8555012 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/d8555012 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/d8555012 Branch: refs/heads/master Commit: d855501238d666da5ecaf002add140c67b73ab56 Parents: 2ec9c3f Author: Robert Gemmell <[email protected]> Authored: Thu Feb 26 14:50:58 2015 +0000 Committer: Robert Gemmell <[email protected]> Committed: Thu Feb 26 14:50:58 2015 +0000 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsConnection.java | 37 ++++++++++---------- .../java/org/apache/qpid/jms/JmsSession.java | 2 +- 2 files changed, 19 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d8555012/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index 0905d11..671cb92 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -18,11 +18,9 @@ package org.apache.qpid.jms; import java.io.IOException; import java.net.URI; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; @@ -62,6 +60,7 @@ import org.apache.qpid.jms.meta.JmsConnectionInfo; import org.apache.qpid.jms.meta.JmsConsumerId; import org.apache.qpid.jms.meta.JmsResource; import org.apache.qpid.jms.meta.JmsSessionId; +import org.apache.qpid.jms.meta.JmsSessionInfo; import org.apache.qpid.jms.meta.JmsTransactionId; import org.apache.qpid.jms.provider.Provider; import org.apache.qpid.jms.provider.ProviderClosedException; @@ -81,7 +80,7 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti private static final Logger LOG = LoggerFactory.getLogger(JmsConnection.class); private final IdGenerator clientIdGenerator; - private final List<JmsSession> sessions = new CopyOnWriteArrayList<JmsSession>(); + private final Map<JmsSessionInfo, JmsSession> sessions = new ConcurrentHashMap<JmsSessionInfo, JmsSession>(); private final Map<JmsConsumerId, JmsMessageDispatcher> dispatchers = new ConcurrentHashMap<JmsConsumerId, JmsMessageDispatcher>(); private final AtomicBoolean connected = new AtomicBoolean(); @@ -164,7 +163,7 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti closing.set(true); - for (JmsSession session : this.sessions) { + for (JmsSession session : sessions.values()) { session.shutdown(); } @@ -224,7 +223,7 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti // NOTE - Once ConnectionConsumer is added we must shutdown those as well. - for (JmsSession session : this.sessions) { + for (JmsSession session : sessions.values()) { session.shutdown(); } @@ -250,7 +249,7 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti connect(); int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode); JmsSession result = new JmsSession(this, getNextSessionId(), ackMode); - addSession(result); + addSession(result.getSessionInfo(), result); if (started.get()) { result.start(); } @@ -317,7 +316,7 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti connect(); if (started.compareAndSet(false, true)) { try { - for (JmsSession s : this.sessions) { + for (JmsSession s : sessions.values()) { s.start(); } } catch (Exception e) { @@ -348,7 +347,7 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti } if (started.compareAndSet(true, false)) { synchronized(sessions) { - for (JmsSession s : this.sessions) { + for (JmsSession s : sessions.values()) { s.stop(); } } @@ -442,7 +441,7 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti connect(); int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode); JmsTopicSession result = new JmsTopicSession(this, getNextSessionId(), ackMode); - addSession(result); + addSession(result.getSessionInfo(), result); if (started.get()) { result.start(); } @@ -462,7 +461,7 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti connect(); int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode); JmsQueueSession result = new JmsQueueSession(this, getNextSessionId(), ackMode); - addSession(result); + addSession(result.getSessionInfo(), result); if (started.get()) { result.start(); } @@ -495,12 +494,12 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti return result; } - protected void removeSession(JmsSession session) throws JMSException { - this.sessions.remove(session); + protected void removeSession(JmsSessionInfo si) throws JMSException { + sessions.remove(si); } - protected void addSession(JmsSession s) { - this.sessions.add(s); + protected void addSession(JmsSessionInfo si, JmsSession s) { + sessions.put(si, s); } protected void addDispatcher(JmsConsumerId consumerId, JmsMessageDispatcher dispatcher) { @@ -555,7 +554,7 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti connect(); try { - for (JmsSession session : this.sessions) { + for (JmsSession session : sessions.values()) { if (session.isDestinationInUse(destination)) { throw new IllegalStateException("A consumer is consuming from the temporary destination"); } @@ -984,7 +983,7 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti @Override public void onConnectionInterrupted(final URI remoteURI) { - for (JmsSession session : sessions) { + for (JmsSession session : sessions.values()) { session.onConnectionInterrupted(); } @@ -1013,7 +1012,7 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti createResource(tempDestination); } - for (JmsSession session : sessions) { + for (JmsSession session : sessions.values()) { session.onConnectionRecovery(provider); } } @@ -1025,14 +1024,14 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti setMessageFactory(provider.getMessageFactory()); setConnectedURI(provider.getRemoteURI()); - for (JmsSession session : sessions) { + for (JmsSession session : sessions.values()) { session.onConnectionRecovered(provider); } } @Override public void onConnectionRestored(final URI remoteURI) { - for (JmsSession session : sessions) { + for (JmsSession session : sessions.values()) { session.onConnectionRestored(); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d8555012/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index 3182ddd..8042b6f 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -217,7 +217,7 @@ public class JmsSession implements Session, QueueSession, TopicSession, JmsMessa protected void doClose() throws JMSException { boolean interrupted = Thread.interrupted(); shutdown(); - connection.removeSession(this); + connection.removeSession(sessionInfo); connection.destroyResource(sessionInfo); if (interrupted) { Thread.currentThread().interrupt(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
