Author: tabish
Date: Fri Oct 7 13:51:44 2011
New Revision: 1180038
URL: http://svn.apache.org/viewvc?rev=1180038&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-2643
track pooled sessions in the parent ConnectionPool and ensure
they get closed and returned to the pool.
Added:
activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledSessionExhaustionTest.java
(with props)
Modified:
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/SessionPool.java
Modified:
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java?rev=1180038&r1=1180037&r2=1180038&view=diff
==============================================================================
---
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
(original)
+++
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/ConnectionPool.java
Fri Oct 7 13:51:44 2011
@@ -20,6 +20,7 @@ package org.apache.activemq.pool;
import java.io.IOException;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
@@ -38,6 +39,7 @@ public class ConnectionPool {
private ActiveMQConnection connection;
private ConcurrentHashMap<SessionKey, SessionPool> cache;
+ private ConcurrentLinkedQueue<PooledSession> loanedSessions = new
ConcurrentLinkedQueue<PooledSession>();
private AtomicBoolean started = new AtomicBoolean(false);
private int referenceCount;
private ObjectPoolFactory poolFactory;
@@ -118,6 +120,7 @@ public class ConnectionPool {
pool = cache.get(key); // this will return a non-null value...
}
PooledSession session = pool.borrowSession();
+ this.loanedSessions.add(session);
return session;
}
@@ -155,6 +158,14 @@ public class ConnectionPool {
if (referenceCount == 0) {
expiredCheck();
+ for (PooledSession session : this.loanedSessions) {
+ try {
+ session.close();
+ } catch (Exception e) {
+ }
+ }
+ this.loanedSessions.clear();
+
// only clean up temp destinations when all users
// of this connection have called close
if (getConnection() != null) {
@@ -208,4 +219,11 @@ public class ConnectionPool {
return expiryTimeout;
}
+ void onSessionReturned(PooledSession session) {
+ this.loanedSessions.remove(session);
+ }
+
+ void onSessionInvalidated(PooledSession session) {
+ this.loanedSessions.remove(session);
+ }
}
Modified:
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java?rev=1180038&r1=1180037&r2=1180038&view=diff
==============================================================================
---
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
(original)
+++
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/PooledSession.java
Fri Oct 7 13:51:44 2011
@@ -55,7 +55,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- *
+ *
*/
public class PooledSession implements Session, TopicSession, QueueSession,
XASession {
private static final transient Logger LOG =
LoggerFactory.getLogger(PooledSession.class);
Modified:
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/SessionPool.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/SessionPool.java?rev=1180038&r1=1180037&r2=1180038&view=diff
==============================================================================
---
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/SessionPool.java
(original)
+++
activemq/trunk/activemq-pool/src/main/java/org/apache/activemq/pool/SessionPool.java
Fri Oct 7 13:51:44 2011
@@ -27,8 +27,8 @@ import org.apache.commons.pool.PoolableO
/**
* Represents the session pool for a given JMS connection.
- *
- *
+ *
+ *
*/
public class SessionPool implements PoolableObjectFactory {
private ConnectionPool connectionPool;
@@ -64,20 +64,21 @@ public class SessionPool implements Pool
// lets check if we are already closed
getConnection();
try {
+ connectionPool.onSessionReturned(session);
getSessionPool().returnObject(session);
} catch (Exception e) {
throw JMSExceptionSupport.create("Failed to return session to
pool: " + e, e);
}
}
-
+
public void invalidateSession(PooledSession session) throws JMSException {
try {
+ connectionPool.onSessionInvalidated(session);
getSessionPool().invalidateObject(session);
} catch (Exception e) {
throw JMSExceptionSupport.create("Failed to invalidate session: "
+ e, e);
}
}
-
// PoolableObjectFactory methods
//
-------------------------------------------------------------------------
Added:
activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledSessionExhaustionTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledSessionExhaustionTest.java?rev=1180038&view=auto
==============================================================================
---
activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledSessionExhaustionTest.java
(added)
+++
activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledSessionExhaustionTest.java
Fri Oct 7 13:51:44 2011
@@ -0,0 +1,88 @@
+package org.apache.activemq.pool;
+
+import org.apache.activemq.*;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+
+import junit.framework.*;
+import javax.jms.*;
+import javax.jms.Message;
+
+import org.apache.log4j.Logger;
+
+public class PooledSessionExhaustionTest extends TestCase {
+ private static final String QUEUE = "FOO";
+ private static final int NUM_MESSAGES = 700;
+
+ private Logger logger = Logger.getLogger(getClass());
+
+ private BrokerService broker;
+ private ActiveMQConnectionFactory factory;
+ private PooledConnectionFactory pooledFactory;
+ private String connectionUri;
+ private int numReceived = 0;
+
+ protected void setUp() throws Exception {
+ broker = new BrokerService();
+ broker.setPersistent(false);
+ TransportConnector connector =
broker.addConnector("tcp://localhost:0");
+ broker.start();
+ connectionUri = connector.getPublishableConnectString();
+ factory = new ActiveMQConnectionFactory(connectionUri);
+ pooledFactory = new PooledConnectionFactory(factory);
+ pooledFactory.setMaxConnections(1);
+ pooledFactory.setBlockIfSessionPoolIsFull(false);
+ }
+
+ public void sendMessages(ConnectionFactory connectionFactory) throws
Exception {
+ for (int i = 0; i < NUM_MESSAGES; i++) {
+ Connection connection = connectionFactory.createConnection();
+ connection.start();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(QUEUE);
+ MessageProducer producer = session.createProducer(destination);
+
+ String msgTo = "hello";
+ TextMessage message = session.createTextMessage(msgTo);
+ producer.send(message);
+ connection.close();
+ logger.debug("sent " + i + " messages using " +
connectionFactory.getClass());
+ }
+ }
+
+ public void testCanExhaustSessions() throws Exception {
+ Thread thread = new Thread(new Runnable() {
+ public void run() {
+ try {
+ ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(connectionUri);
+ Connection connection =
connectionFactory.createConnection();
+ connection.start();
+
+ Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ Destination destination = session.createQueue(QUEUE);
+ MessageConsumer consumer =
session.createConsumer(destination);
+ for (int i = 0; i < NUM_MESSAGES; ++i) {
+ Message msg = consumer.receive(5000);
+ if (msg == null) {
+ return;
+ }
+ numReceived++;
+ if (numReceived % 20 == 0) {
+ logger.debug("received " + numReceived + "
messages ");
+ System.runFinalization();
+ }
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ });
+ thread.start();
+
+ sendMessages(pooledFactory);
+ thread.join();
+
+ assertEquals(NUM_MESSAGES, numReceived);
+ }
+}
Propchange:
activemq/trunk/activemq-pool/src/test/java/org/apache/activemq/pool/PooledSessionExhaustionTest.java
------------------------------------------------------------------------------
svn:eol-style = native