Author: tabish
Date: Fri Dec 21 22:05:48 2012
New Revision: 1425162
URL: http://svn.apache.org/viewvc?rev=1425162&view=rev
Log:
fix and test for: https://issues.apache.org/jira/browse/AMQ-4225
Added:
activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionSessionCleanupTest.java
(with props)
Modified:
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSessionEventListener.java
Modified:
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java?rev=1425162&r1=1425161&r2=1425162&view=diff
==============================================================================
---
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
(original)
+++
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledConnection.java
Fri Dec 21 22:05:48 2012
@@ -62,6 +62,7 @@ public class PooledConnection implements
private volatile boolean stopped;
private final List<TemporaryQueue> connTempQueues = new
CopyOnWriteArrayList<TemporaryQueue>();
private final List<TemporaryTopic> connTempTopics = new
CopyOnWriteArrayList<TemporaryTopic>();
+ private final List<PooledSession> loanedSessions = new
CopyOnWriteArrayList<PooledSession>();
/**
* Creates a new PooledConnection instance that uses the given
ConnectionPool to create
@@ -86,6 +87,7 @@ public class PooledConnection implements
@Override
public void close() throws JMSException {
this.cleanupConnectionTemporaryDestinations();
+ this.cleanupAllLoanedSessions();
if (this.pool != null) {
this.pool.decrementReferenceCount();
this.pool = null;
@@ -104,8 +106,7 @@ public class PooledConnection implements
}
@Override
- public ConnectionConsumer createConnectionConsumer(Destination
destination, String selector, ServerSessionPool serverSessionPool, int
maxMessages)
- throws JMSException {
+ public ConnectionConsumer createConnectionConsumer(Destination
destination, String selector, ServerSessionPool serverSessionPool, int
maxMessages) throws JMSException {
return getConnection().createConnectionConsumer(destination, selector,
serverSessionPool, maxMessages);
}
@@ -115,8 +116,7 @@ public class PooledConnection implements
}
@Override
- public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
String selector, String s1, ServerSessionPool serverSessionPool, int i)
- throws JMSException {
+ public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
String selector, String s1, ServerSessionPool serverSessionPool, int i) throws
JMSException {
return getConnection().createDurableConnectionConsumer(topic,
selector, s1, serverSessionPool, i);
}
@@ -173,10 +173,14 @@ public class PooledConnection implements
PooledSession result;
result = (PooledSession) pool.createSession(transacted, ackMode);
- // Add a temporary destination event listener to the session that
notifies us when
- // the session creates temporary destinations.
- result.addTempDestEventListener(this);
- return (Session) result;
+ // Store the session so we can close the sessions that this
PooledConnection
+ // created in order to ensure that consumers etc are closed per the
JMS contract.
+ loanedSessions.add(result);
+
+ // Add a event listener to the session that notifies us when the
session
+ // creates / destroys temporary destinations and closes etc.
+ result.addSessionEventListener(this);
+ return result;
}
// EnhancedCollection API
@@ -190,14 +194,23 @@ public class PooledConnection implements
// Implementation methods
//
-------------------------------------------------------------------------
+ @Override
public void onTemporaryQueueCreate(TemporaryQueue tempQueue) {
connTempQueues.add(tempQueue);
}
+ @Override
public void onTemporaryTopicCreate(TemporaryTopic tempTopic) {
connTempTopics.add(tempTopic);
}
+ @Override
+ public void onSessionClosed(PooledSession session) {
+ if (session != null) {
+ this.loanedSessions.remove(session);
+ }
+ }
+
public ActiveMQConnection getConnection() throws JMSException {
assertNotClosed();
return pool.getConnection();
@@ -213,6 +226,7 @@ public class PooledConnection implements
return (ActiveMQSession)
getConnection().createSession(key.isTransacted(), key.getAckMode());
}
+ @Override
public String toString() {
return "PooledConnection { " + pool + " }";
}
@@ -247,6 +261,23 @@ public class PooledConnection implements
}
/**
+ * The PooledSession tracks all Sessions that it created and now we close
them. Closing the
+ * PooledSession will return the internal Session to the Pool of Session
after cleaning up
+ * all the resources that the Session had allocated for this
PooledConnection.
+ */
+ protected void cleanupAllLoanedSessions() {
+
+ for (PooledSession session : loanedSessions) {
+ try {
+ session.close();
+ } catch (JMSException ex) {
+ LOG.info("failed to close laoned Session \"" + session + "\"
on closing pooled connection: " + ex.getMessage());
+ }
+ }
+ loanedSessions.clear();
+ }
+
+ /**
* @return the total number of Pooled session including idle sessions that
are not
* currently loaned out to any client.
*/
Modified:
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java?rev=1425162&r1=1425161&r2=1425162&view=diff
==============================================================================
---
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
(original)
+++
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
Fri Dec 21 22:05:48 2012
@@ -63,7 +63,7 @@ public class PooledSession implements Se
private final KeyedObjectPool<SessionKey, PooledSession> sessionPool;
private final CopyOnWriteArrayList<MessageConsumer> consumers = new
CopyOnWriteArrayList<MessageConsumer>();
private final CopyOnWriteArrayList<QueueBrowser> browsers = new
CopyOnWriteArrayList<QueueBrowser>();
- private final CopyOnWriteArrayList<PooledSessionEventListener>
tempDestEventListeners =
+ private final CopyOnWriteArrayList<PooledSessionEventListener>
sessionEventListeners =
new CopyOnWriteArrayList<PooledSessionEventListener>();
private ActiveMQSession session;
@@ -81,10 +81,10 @@ public class PooledSession implements Se
this.transactional = session.isTransacted();
}
- public void addTempDestEventListener(PooledSessionEventListener listener) {
+ public void addSessionEventListener(PooledSessionEventListener listener) {
// only add if really needed
- if (!tempDestEventListeners.contains(listener)) {
- this.tempDestEventListeners.add(listener);
+ if (!sessionEventListeners.contains(listener)) {
+ this.sessionEventListeners.add(listener);
}
}
@@ -129,7 +129,10 @@ public class PooledSession implements Se
} finally {
consumers.clear();
browsers.clear();
- tempDestEventListeners.clear();
+ for (PooledSessionEventListener listener :
this.sessionEventListeners) {
+ listener.onSessionClosed(this);
+ }
+ sessionEventListeners.clear();
}
if (invalidate) {
@@ -205,7 +208,7 @@ public class PooledSession implements Se
result = getInternalSession().createTemporaryQueue();
// Notify all of the listeners of the created temporary Queue.
- for (PooledSessionEventListener listener :
this.tempDestEventListeners) {
+ for (PooledSessionEventListener listener : this.sessionEventListeners)
{
listener.onTemporaryQueueCreate(result);
}
@@ -219,7 +222,7 @@ public class PooledSession implements Se
result = getInternalSession().createTemporaryTopic();
// Notify all of the listeners of the created temporary Topic.
- for (PooledSessionEventListener listener :
this.tempDestEventListeners) {
+ for (PooledSessionEventListener listener : this.sessionEventListeners)
{
listener.onTemporaryTopicCreate(result);
}
Modified:
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSessionEventListener.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSessionEventListener.java?rev=1425162&r1=1425161&r2=1425162&view=diff
==============================================================================
---
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSessionEventListener.java
(original)
+++
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSessionEventListener.java
Fri Dec 21 22:05:48 2012
@@ -26,7 +26,7 @@ interface PooledSessionEventListener {
* Called on successful creation of a new TemporaryQueue.
*
* @param tempQueue
- * The TemporaryQueue just created.
+ * The TemporaryQueue just created.
*/
void onTemporaryQueueCreate(TemporaryQueue tempQueue);
@@ -34,8 +34,16 @@ interface PooledSessionEventListener {
* Called on successful creation of a new TemporaryTopic.
*
* @param tempTopic
- * The TemporaryTopic just created.
+ * The TemporaryTopic just created.
*/
void onTemporaryTopicCreate(TemporaryTopic tempTopic);
+ /**
+ * Called when the PooledSession is closed.
+ *
+ * @param session
+ * The PooledSession that has been closed.
+ */
+ void onSessionClosed(PooledSession session);
+
}
Added:
activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionSessionCleanupTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionSessionCleanupTest.java?rev=1425162&view=auto
==============================================================================
---
activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionSessionCleanupTest.java
(added)
+++
activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionSessionCleanupTest.java
Fri Dec 21 22:05:48 2012
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.pool;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PooledConnectionSessionCleanupTest {
+
+ @SuppressWarnings("unused")
+ private static final Logger LOG =
LoggerFactory.getLogger(PooledConnectionSessionCleanupTest.class);
+
+ protected BrokerService service;
+
+ protected ActiveMQConnectionFactory directConnFact;
+ protected Connection directConn1;
+ protected Connection directConn2;
+
+ protected PooledConnectionFactory pooledConnFact;
+ protected Connection pooledConn1;
+ protected Connection pooledConn2;
+
+ private final ActiveMQQueue queue = new ActiveMQQueue("ContendedQueue");
+ private final int MESSAGE_COUNT = 50;
+
+ /**
+ * Prepare to run a test case: create, configure, and start the embedded
+ * broker, as well as creating the client connections to the broker.
+ */
+ @Before
+ public void prepTest() throws java.lang.Exception {
+ service = new BrokerService();
+ service.setUseJmx(true);
+ service.setPersistent(false);
+ service.setSchedulerSupport(false);
+ service.start();
+ service.waitUntilStarted();
+
+ // Create the ActiveMQConnectionFactory and the
PooledConnectionFactory.
+ // Set a long idle timeout on the pooled connections to better show the
+ // problem of holding onto created resources on close.
+ directConnFact = new
ActiveMQConnectionFactory(service.getVmConnectorURI());
+ pooledConnFact = new PooledConnectionFactory(directConnFact);
+ pooledConnFact.setIdleTimeout((int)TimeUnit.MINUTES.toMillis(60));
+ pooledConnFact.setMaxConnections(1);
+
+ // Prepare the connections
+ directConn1 = directConnFact.createConnection();
+ directConn1.start();
+ directConn2 = directConnFact.createConnection();
+ directConn2.start();
+
+ // The pooled Connections should have the same underlying connection
+ pooledConn1 = pooledConnFact.createConnection();
+ pooledConn1.start();
+ pooledConn2 = pooledConnFact.createConnection();
+ pooledConn2.start();
+ }
+
+ @After
+ public void cleanupTest() throws java.lang.Exception {
+ try {
+ pooledConn1.close();
+ } catch (JMSException jms_exc) {
+ }
+ try {
+ pooledConn2.close();
+ } catch (JMSException jms_exc) {
+ }
+ try {
+ directConn1.close();
+ } catch (JMSException jms_exc) {
+ }
+ try {
+ directConn2.close();
+ } catch (JMSException jms_exc) {
+ }
+ try {
+ service.stop();
+ } catch (JMSException jms_exc) {
+ }
+ }
+
+ private void produceMessages() throws Exception {
+
+ Session session = directConn1.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageProducer producer = session.createProducer(queue);
+ for (int i = 0; i < MESSAGE_COUNT; ++i) {
+ producer.send(session.createTextMessage("Test Message: " + i));
+ }
+ producer.close();
+ }
+
+ private QueueViewMBean getProxyToQueue(String name) throws
MalformedObjectNameException, JMSException {
+ ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq"
+ + ":Type=Queue,Destination=" + name
+ + ",BrokerName=localhost");
+ QueueViewMBean proxy = (QueueViewMBean) service.getManagementContext()
+ .newProxyInstance(queueViewMBeanName, QueueViewMBean.class,
true);
+ return proxy;
+ }
+
+ @Test
+ public void testLingeringPooledSessionsHoldingPrefetchedMessages() throws
Exception {
+
+ produceMessages();
+
+ Session pooledSession1 = pooledConn1.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ pooledSession1.createConsumer(queue);
+
+ final QueueViewMBean view = getProxyToQueue(queue.getPhysicalName());
+
+ assertTrue("Should have all sent messages in flight:",
Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return view.getInFlightCount() == MESSAGE_COUNT;
+ }
+ }));
+
+ // While all the message are in flight we should get anything on this
consumer.
+ Session session = directConn1.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queue);
+ assertNull(consumer.receive(2000));
+
+ pooledConn1.close();
+
+ assertTrue("Should have only one consumer now:", Wait.waitFor(new
Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return view.getSubscriptions().length == 1;
+ }
+ }));
+
+ // Now we'd expect that the message stuck in the prefetch of the
pooled session's
+ // consumer would be rerouted to the non-pooled session's consumer.
+ assertNotNull(consumer.receive(10000));
+ }
+
+ @Test
+ public void testNonPooledConnectionCloseNotHoldingPrefetchedMessages()
throws Exception {
+
+ produceMessages();
+
+ Session directSession = directConn2.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ directSession.createConsumer(queue);
+
+ final QueueViewMBean view = getProxyToQueue(queue.getPhysicalName());
+
+ assertTrue("Should have all sent messages in flight:",
Wait.waitFor(new Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return view.getInFlightCount() == MESSAGE_COUNT;
+ }
+ }));
+
+ // While all the message are in flight we should get anything on this
consumer.
+ Session session = directConn1.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queue);
+ assertNull(consumer.receive(2000));
+
+ directConn2.close();
+
+ assertTrue("Should have only one consumer now:", Wait.waitFor(new
Wait.Condition() {
+
+ @Override
+ public boolean isSatisified() throws Exception {
+ return view.getSubscriptions().length == 1;
+ }
+ }));
+
+ // Now we'd expect that the message stuck in the prefetch of the first
session's
+ // consumer would be rerouted to the alternate session's consumer.
+ assertNotNull(consumer.receive(10000));
+ }
+}
Propchange:
activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledConnectionSessionCleanupTest.java
------------------------------------------------------------------------------
svn:eol-style = native