Repository: activemq Updated Branches: refs/heads/activemq-5.10.x ede604eaf -> f62f47b92
https://issues.apache.org/jira/browse/AMQ-5564 Fixed session in the pool losing their reference to the anonymous producer created when useAnonymousProducers is true. The anonymous producer stays live for the life of the pooled session. Also added some synchronization safety to some methods that could get into NPE trouble. Conflicts: activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f62f47b9 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f62f47b9 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f62f47b9 Branch: refs/heads/activemq-5.10.x Commit: f62f47b92d8c8d60fc9cd0f796b80eacc50c4b4a Parents: ede604e Author: Timothy Bish <[email protected]> Authored: Thu Feb 5 17:50:43 2015 -0500 Committer: Claus Ibsen <[email protected]> Committed: Fri Feb 6 08:05:17 2015 +0100 ---------------------------------------------------------------------- .../activemq/jms/pool/ConnectionPool.java | 18 ++-- .../activemq/jms/pool/PooledConnection.java | 6 +- .../apache/activemq/jms/pool/PooledSession.java | 81 ++++++---------- .../apache/activemq/jms/pool/SessionHolder.java | 98 ++++++++++++++++++++ .../activemq/jms/pool/PooledSessionTest.java | 53 +++++++++-- 5 files changed, 187 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/f62f47b9/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java ---------------------------------------------------------------------- diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java index 26995ea..9b773d6 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java @@ -52,7 +52,7 @@ public class ConnectionPool { private boolean useAnonymousProducers = true; private final AtomicBoolean started = new AtomicBoolean(false); - private final GenericKeyedObjectPool<SessionKey, Session> sessionPool; + private final GenericKeyedObjectPool<SessionKey, SessionHolder> sessionPool; private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>(); public ConnectionPool(Connection connection) { @@ -60,29 +60,29 @@ public class ConnectionPool { this.connection = wrap(connection); // Create our internal Pool of session instances. - this.sessionPool = new GenericKeyedObjectPool<SessionKey, Session>( - new KeyedPoolableObjectFactory<SessionKey, Session>() { + this.sessionPool = new GenericKeyedObjectPool<SessionKey, SessionHolder>( + new KeyedPoolableObjectFactory<SessionKey, SessionHolder>() { @Override - public void activateObject(SessionKey key, Session session) throws Exception { + public void activateObject(SessionKey key, SessionHolder session) throws Exception { } @Override - public void destroyObject(SessionKey key, Session session) throws Exception { + public void destroyObject(SessionKey key, SessionHolder session) throws Exception { session.close(); } @Override - public Session makeObject(SessionKey key) throws Exception { - return makeSession(key); + public SessionHolder makeObject(SessionKey key) throws Exception { + return new SessionHolder(makeSession(key)); } @Override - public void passivateObject(SessionKey key, Session session) throws Exception { + public void passivateObject(SessionKey key, SessionHolder session) throws Exception { } @Override - public boolean validateObject(SessionKey key, Session session) { + public boolean validateObject(SessionKey key, SessionHolder session) { return true; } } http://git-wip-us.apache.org/repos/asf/activemq/blob/f62f47b9/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java ---------------------------------------------------------------------- diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java index b268862..b7b56ba 100755 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnection.java @@ -24,6 +24,7 @@ import javax.jms.ConnectionConsumer; import javax.jms.ConnectionMetaData; import javax.jms.Destination; import javax.jms.ExceptionListener; +import javax.jms.IllegalStateException; import javax.jms.JMSException; import javax.jms.Queue; import javax.jms.QueueConnection; @@ -35,7 +36,7 @@ import javax.jms.TemporaryTopic; import javax.jms.Topic; import javax.jms.TopicConnection; import javax.jms.TopicSession; -import javax.jms.IllegalStateException; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -163,8 +164,7 @@ public class PooledConnection implements TopicConnection, QueueConnection, Poole @Override public Session createSession(boolean transacted, int ackMode) throws JMSException { - PooledSession result; - result = (PooledSession) pool.createSession(transacted, ackMode); + PooledSession result = (PooledSession) pool.createSession(transacted, ackMode); // Store the session so we can close the sessions that this PooledConnection // created in order to ensure that consumers etc are closed per the JMS contract. http://git-wip-us.apache.org/repos/asf/activemq/blob/f62f47b9/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java ---------------------------------------------------------------------- diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java index 3a2e698..cbfec29 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledSession.java @@ -55,25 +55,21 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes private static final transient Logger LOG = LoggerFactory.getLogger(PooledSession.class); private final SessionKey key; - private final KeyedObjectPool<SessionKey, Session> sessionPool; + private final KeyedObjectPool<SessionKey, SessionHolder> sessionPool; private final CopyOnWriteArrayList<MessageConsumer> consumers = new CopyOnWriteArrayList<MessageConsumer>(); private final CopyOnWriteArrayList<QueueBrowser> browsers = new CopyOnWriteArrayList<QueueBrowser>(); private final CopyOnWriteArrayList<PooledSessionEventListener> sessionEventListeners = new CopyOnWriteArrayList<PooledSessionEventListener>(); private final AtomicBoolean closed = new AtomicBoolean(); - private MessageProducer producer; - private TopicPublisher publisher; - private QueueSender sender; - - private Session session; + private SessionHolder sessionHolder; private boolean transactional = true; private boolean ignoreClose; private boolean isXa; private boolean useAnonymousProducers = true; - public PooledSession(SessionKey key, Session session, KeyedObjectPool<SessionKey, Session> sessionPool, boolean transactional, boolean anonymous) { + public PooledSession(SessionKey key, SessionHolder sessionHolder, KeyedObjectPool<SessionKey, SessionHolder> sessionPool, boolean transactional, boolean anonymous) { this.key = key; - this.session = session; + this.sessionHolder = sessionHolder; this.sessionPool = sessionPool; this.transactional = transactional; this.useAnonymousProducers = anonymous; @@ -140,21 +136,21 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes if (invalidate) { // lets close the session and not put the session back into the pool // instead invalidate it so the pool can create a new one on demand. - if (session != null) { + if (sessionHolder != null) { try { - session.close(); + sessionHolder.close(); } catch (JMSException e1) { LOG.trace("Ignoring exception on close as discarding session: " + e1, e1); } } try { - sessionPool.invalidateObject(key, session); + sessionPool.invalidateObject(key, sessionHolder); } catch (Exception e) { LOG.trace("Ignoring exception on invalidateObject as discarding session: " + e, e); } } else { try { - sessionPool.returnObject(key, session); + sessionPool.returnObject(key, sessionHolder); } catch (Exception e) { javax.jms.IllegalStateException illegalStateException = new javax.jms.IllegalStateException(e.toString()); illegalStateException.initCause(e); @@ -162,7 +158,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes } } - session = null; + sessionHolder = null; } } @@ -276,9 +272,12 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes @Override public XAResource getXAResource() { - if (session instanceof XASession) { - return ((XASession) session).getXAResource(); + SessionHolder session = safeGetSessionHolder(); + + if (session.getSession() instanceof XASession) { + return ((XASession) session.getSession()).getXAResource(); } + return null; } @@ -289,8 +288,9 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes @Override public void run() { + SessionHolder session = safeGetSessionHolder(); if (session != null) { - session.run(); + session.getSession().run(); } } @@ -379,10 +379,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes } public Session getInternalSession() throws IllegalStateException { - if (session == null) { - throw new IllegalStateException("The session has already been closed"); - } - return session; + return safeGetSessionHolder().getSession(); } public MessageProducer getMessageProducer() throws JMSException { @@ -393,16 +390,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes MessageProducer result = null; if (useAnonymousProducers) { - if (producer == null) { - // Don't allow for duplicate anonymous producers. - synchronized (this) { - if (producer == null) { - producer = getInternalSession().createProducer(null); - } - } - } - - result = producer; + result = safeGetSessionHolder().getOrCreateProducer(); } else { result = getInternalSession().createProducer(destination); } @@ -418,16 +406,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes QueueSender result = null; if (useAnonymousProducers) { - if (sender == null) { - // Don't allow for duplicate anonymous producers. - synchronized (this) { - if (sender == null) { - sender = ((QueueSession) getInternalSession()).createSender(null); - } - } - } - - result = sender; + result = safeGetSessionHolder().getOrCreateSender(); } else { result = ((QueueSession) getInternalSession()).createSender(destination); } @@ -443,16 +422,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes TopicPublisher result = null; if (useAnonymousProducers) { - if (publisher == null) { - // Don't allow for duplicate anonymous producers. - synchronized (this) { - if (publisher == null) { - publisher = ((TopicSession) getInternalSession()).createPublisher(null); - } - } - } - - result = publisher; + result = safeGetSessionHolder().getOrCreatePublisher(); } else { result = ((TopicSession) getInternalSession()).createPublisher(destination); } @@ -489,7 +459,7 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes @Override public String toString() { - return "PooledSession { " + session + " }"; + return "PooledSession { " + safeGetSessionHolder() + " }"; } /** @@ -505,4 +475,13 @@ public class PooledSession implements Session, TopicSession, QueueSession, XASes protected void onConsumerClose(MessageConsumer consumer) { consumers.remove(consumer); } + + private SessionHolder safeGetSessionHolder() { + SessionHolder sessionHolder = this.sessionHolder; + if (sessionHolder == null) { + throw new IllegalStateException("The session has already been closed"); + } + + return sessionHolder; + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/f62f47b9/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionHolder.java ---------------------------------------------------------------------- diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionHolder.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionHolder.java new file mode 100644 index 0000000..afa75d6 --- /dev/null +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/SessionHolder.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.jms.pool; + +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.QueueSender; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; + +/** + * Used to store a pooled session instance and any resources that can + * be left open and carried along with the pooled instance such as the + * anonymous producer used for all MessageProducer instances created + * from this pooled session when enabled. + */ +public class SessionHolder { + + private final Session session; + private MessageProducer producer; + private TopicPublisher publisher; + private QueueSender sender; + + public SessionHolder(Session session) { + this.session = session; + } + + public void close() throws JMSException { + try { + session.close(); + } finally { + producer = null; + publisher = null; + sender = null; + } + } + + public Session getSession() { + return session; + } + + public MessageProducer getOrCreateProducer() throws JMSException { + if (producer == null) { + synchronized (this) { + if (producer == null) { + producer = session.createProducer(null); + } + } + } + + return producer; + } + + public TopicPublisher getOrCreatePublisher() throws JMSException { + if (publisher == null) { + synchronized (this) { + if (publisher == null) { + publisher = ((TopicSession) session).createPublisher(null); + } + } + } + + return publisher; + } + + public QueueSender getOrCreateSender() throws JMSException { + if (sender == null) { + synchronized (this) { + if (sender == null) { + sender = ((QueueSession) session).createSender(null); + } + } + } + + return sender; + } + + @Override + public String toString() { + return session.toString(); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/f62f47b9/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java ---------------------------------------------------------------------- diff --git a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java index 7483e6b..9432add 100644 --- a/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java +++ b/activemq-jms-pool/src/test/java/org/apache/activemq/jms/pool/PooledSessionTest.java @@ -17,9 +17,13 @@ package org.apache.activemq.jms.pool; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import javax.jms.Destination; +import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.QueueSession; import javax.jms.Session; @@ -44,7 +48,8 @@ public class PooledSessionTest { public void setUp() throws Exception { broker = new BrokerService(); broker.setPersistent(false); - broker.setUseJmx(false); + broker.setUseJmx(true); + broker.getManagementContext().setCreateMBeanServer(false); TransportConnector connector = broker.addConnector("tcp://localhost:0"); broker.start(); connectionUri = connector.getPublishableConnectString(); @@ -62,7 +67,7 @@ public class PooledSessionTest { broker = null; } - @Test + @Test(timeout = 60000) public void testPooledSessionStats() throws Exception { PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); @@ -73,9 +78,11 @@ public class PooledSessionTest { assertEquals(0, connection.getNumActiveSessions()); assertEquals(1, connection.getNumtIdleSessions()); assertEquals(1, connection.getNumSessions()); + + connection.close(); } - @Test + @Test(timeout = 60000) public void testMessageProducersAreAllTheSame() throws Exception { PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -87,9 +94,11 @@ public class PooledSessionTest { PooledProducer producer2 = (PooledProducer) session.createProducer(queue2); assertSame(producer1.getMessageProducer(), producer2.getMessageProducer()); + + connection.close(); } - @Test + @Test(timeout = 60000) public void testThrowsWhenDifferentDestinationGiven() throws Exception { PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); @@ -110,9 +119,11 @@ public class PooledSessionTest { fail("Should only be able to send to queue 1"); } catch (Exception ex) { } + + connection.close(); } - @Test + @Test(timeout = 60000) public void testCreateTopicPublisher() throws Exception { PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); TopicSession session = connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); @@ -124,9 +135,10 @@ public class PooledSessionTest { PooledTopicPublisher publisher2 = (PooledTopicPublisher) session.createPublisher(topic2); assertSame(publisher1.getMessageProducer(), publisher2.getMessageProducer()); + connection.close(); } - @Test + @Test(timeout = 60000) public void testQueueSender() throws Exception { PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); QueueSession session = connection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); @@ -138,5 +150,34 @@ public class PooledSessionTest { PooledQueueSender sender2 = (PooledQueueSender) session.createSender(queue2); assertSame(sender1.getMessageProducer(), sender2.getMessageProducer()); + connection.close(); + } + + @Test(timeout = 60000) + public void testRepeatedCreateSessionProducerResultsInSame() throws Exception { + PooledConnection connection = (PooledConnection) pooledFactory.createConnection(); + + assertTrue(pooledFactory.isUseAnonymousProducers()); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createTopic("test-topic"); + PooledProducer producer = (PooledProducer) session.createProducer(destination); + MessageProducer original = producer.getMessageProducer(); + assertNotNull(original); + session.close(); + + assertEquals(1, broker.getAdminView().getDynamicDestinationProducers().length); + + for (int i = 0; i < 20; ++i) { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = (PooledProducer) session.createProducer(destination); + assertSame(original, producer.getMessageProducer()); + session.close(); + } + + assertEquals(1, broker.getAdminView().getDynamicDestinationProducers().length); + + connection.close(); + pooledFactory.clear(); } }
