Repository: activemq Updated Branches: refs/heads/trunk 189a75afd -> b53d8ea29
https://issues.apache.org/jira/browse/AMQ-5534 - generic jms pool reconnection Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b53d8ea2 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b53d8ea2 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b53d8ea2 Branch: refs/heads/trunk Commit: b53d8ea2954fd0eaf64981b10b00542026b06b8b Parents: 189a75a Author: Dejan Bosanac <[email protected]> Authored: Thu Jan 22 12:50:48 2015 +0100 Committer: Dejan Bosanac <[email protected]> Committed: Thu Jan 22 12:51:11 2015 +0100 ---------------------------------------------------------------------- .../activemq/jms/pool/ConnectionPool.java | 53 +++++++++++++++++--- .../jms/pool/PooledConnectionFactory.java | 20 ++++++++ 2 files changed, 67 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/b53d8ea2/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java ---------------------------------------------------------------------- diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java index 26995ea..c802a17 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java @@ -21,16 +21,14 @@ import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; -import javax.jms.Connection; +import javax.jms.*; import javax.jms.IllegalStateException; -import javax.jms.JMSException; -import javax.jms.Session; -import javax.jms.TemporaryQueue; -import javax.jms.TemporaryTopic; import org.apache.commons.pool.KeyedPoolableObjectFactory; import org.apache.commons.pool.impl.GenericKeyedObjectPool; import org.apache.commons.pool.impl.GenericObjectPool; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Holds a real JMS connection along with the session pools associated with it. @@ -40,7 +38,8 @@ import org.apache.commons.pool.impl.GenericObjectPool; * that the temporary destinations of the managed Connection are purged when all references * to this ConnectionPool are released. */ -public class ConnectionPool { +public class ConnectionPool implements ExceptionListener { + private static final transient Logger LOG = LoggerFactory.getLogger(ConnectionPool.class); protected Connection connection; private int referenceCount; @@ -54,6 +53,8 @@ public class ConnectionPool { private final AtomicBoolean started = new AtomicBoolean(false); private final GenericKeyedObjectPool<SessionKey, Session> sessionPool; private final List<PooledSession> loanedSessions = new CopyOnWriteArrayList<PooledSession>(); + private boolean reconnectOnException; + private ExceptionListener parentExceptionListener; public ConnectionPool(Connection connection) { @@ -334,6 +335,46 @@ public class ConnectionPool { this.sessionPool.setMaxWait(blockIfSessionPoolIsFullTimeout); } + /** + * @return true if the underlying connection will be renewed on JMSException, false otherwise + */ + public boolean isReconnectOnException() { + return reconnectOnException; + } + + /** + * Controls weather the underlying connection should be reset (and renewed) on JMSException + * + * @param reconnectOnException + * Boolean value that configures whether reconnect on exception should happen + */ + public void setReconnectOnException(boolean reconnectOnException) { + this.reconnectOnException = reconnectOnException; + try { + if (isReconnectOnException()) { + if (connection.getExceptionListener() != null) { + parentExceptionListener = connection.getExceptionListener(); + } + connection.setExceptionListener(this); + } else { + if (parentExceptionListener != null) { + connection.setExceptionListener(parentExceptionListener); + } + parentExceptionListener = null; + } + } catch (JMSException jmse) { + LOG.warn("Cannot set reconnect exception listener", jmse); + } + } + + @Override + public void onException(JMSException exception) { + close(); + if (parentExceptionListener != null) { + parentExceptionListener.onException(exception); + } + } + @Override public String toString() { return "ConnectionPool[" + connection + "]"; http://git-wip-us.apache.org/repos/asf/activemq/blob/b53d8ea2/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java ---------------------------------------------------------------------- diff --git a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java index 67f45fd..06918c5 100644 --- a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java +++ b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java @@ -77,6 +77,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti private long expiryTimeout = 0l; private boolean createConnectionOnStartup = true; private boolean useAnonymousProducers = true; + private boolean reconnectOnException = true; // Temporary value used to always fetch the result of makeObject. private final AtomicReference<ConnectionPool> mostRecentlyCreated = new AtomicReference<ConnectionPool>(null); @@ -115,6 +116,7 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti connection.setBlockIfSessionPoolIsFullTimeout(getBlockIfSessionPoolIsFullTimeout()); } connection.setUseAnonymousProducers(isUseAnonymousProducers()); + connection.setReconnectOnException(isReconnectOnException()); if (LOG.isTraceEnabled()) { LOG.trace("Created new connection: {}", connection); @@ -560,6 +562,23 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti } /** + * @return true if the underlying connection will be renewed on JMSException, false otherwise + */ + public boolean isReconnectOnException() { + return reconnectOnException; + } + + /** + * Controls weather the underlying connection should be reset (and renewed) on JMSException + * + * @param reconnectOnException + * Boolean value that configures whether reconnect on exception should happen + */ + public void setReconnectOnException(boolean reconnectOnException) { + this.reconnectOnException = reconnectOnException; + } + + /** * Called by any superclass that implements a JNDIReferencable or similar that needs to collect * the properties of this class for storage etc. * @@ -577,5 +596,6 @@ public class PooledConnectionFactory implements ConnectionFactory, QueueConnecti props.setProperty("createConnectionOnStartup", Boolean.toString(isCreateConnectionOnStartup())); props.setProperty("useAnonymousProducers", Boolean.toString(isUseAnonymousProducers())); props.setProperty("blockIfSessionPoolIsFullTimeout", Long.toString(getBlockIfSessionPoolIsFullTimeout())); + props.setProperty("reconnectOnException", Boolean.toString(isReconnectOnException())); } }
