Repository: activemq
Updated Branches:
  refs/heads/trunk 189a75afd -> b53d8ea29


https://issues.apache.org/jira/browse/AMQ-5534 - generic jms pool reconnection


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/b53d8ea2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/b53d8ea2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/b53d8ea2

Branch: refs/heads/trunk
Commit: b53d8ea2954fd0eaf64981b10b00542026b06b8b
Parents: 189a75a
Author: Dejan Bosanac <[email protected]>
Authored: Thu Jan 22 12:50:48 2015 +0100
Committer: Dejan Bosanac <[email protected]>
Committed: Thu Jan 22 12:51:11 2015 +0100

----------------------------------------------------------------------
 .../activemq/jms/pool/ConnectionPool.java       | 53 +++++++++++++++++---
 .../jms/pool/PooledConnectionFactory.java       | 20 ++++++++
 2 files changed, 67 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/b53d8ea2/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
----------------------------------------------------------------------
diff --git 
a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
 
b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
index 26995ea..c802a17 100644
--- 
a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
+++ 
b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/ConnectionPool.java
@@ -21,16 +21,14 @@ import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import javax.jms.Connection;
+import javax.jms.*;
 import javax.jms.IllegalStateException;
-import javax.jms.JMSException;
-import javax.jms.Session;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
 
 import org.apache.commons.pool.KeyedPoolableObjectFactory;
 import org.apache.commons.pool.impl.GenericKeyedObjectPool;
 import org.apache.commons.pool.impl.GenericObjectPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Holds a real JMS connection along with the session pools associated with it.
@@ -40,7 +38,8 @@ import org.apache.commons.pool.impl.GenericObjectPool;
  * that the temporary destinations of the managed Connection are purged when 
all references
  * to this ConnectionPool are released.
  */
-public class ConnectionPool {
+public class ConnectionPool implements ExceptionListener {
+    private static final transient Logger LOG = 
LoggerFactory.getLogger(ConnectionPool.class);
 
     protected Connection connection;
     private int referenceCount;
@@ -54,6 +53,8 @@ public class ConnectionPool {
     private final AtomicBoolean started = new AtomicBoolean(false);
     private final GenericKeyedObjectPool<SessionKey, Session> sessionPool;
     private final List<PooledSession> loanedSessions = new 
CopyOnWriteArrayList<PooledSession>();
+    private boolean reconnectOnException;
+    private ExceptionListener parentExceptionListener;
 
     public ConnectionPool(Connection connection) {
 
@@ -334,6 +335,46 @@ public class ConnectionPool {
         this.sessionPool.setMaxWait(blockIfSessionPoolIsFullTimeout);
     }
 
+    /**
+     * @return true if the underlying connection will be renewed on 
JMSException, false otherwise
+     */
+    public boolean isReconnectOnException() {
+        return reconnectOnException;
+    }
+
+    /**
+     * Controls weather the underlying connection should be reset (and 
renewed) on JMSException
+     *
+     * @param reconnectOnException
+     *          Boolean value that configures whether reconnect on exception 
should happen
+     */
+    public void setReconnectOnException(boolean reconnectOnException) {
+        this.reconnectOnException = reconnectOnException;
+        try {
+            if (isReconnectOnException()) {
+                if (connection.getExceptionListener() != null) {
+                    parentExceptionListener = 
connection.getExceptionListener();
+                }
+                connection.setExceptionListener(this);
+            } else {
+                if (parentExceptionListener != null) {
+                    connection.setExceptionListener(parentExceptionListener);
+                }
+                parentExceptionListener = null;
+            }
+        } catch (JMSException jmse) {
+            LOG.warn("Cannot set reconnect exception listener", jmse);
+        }
+    }
+
+    @Override
+    public void onException(JMSException exception) {
+        close();
+        if (parentExceptionListener != null) {
+            parentExceptionListener.onException(exception);
+        }
+    }
+
     @Override
     public String toString() {
         return "ConnectionPool[" + connection + "]";

http://git-wip-us.apache.org/repos/asf/activemq/blob/b53d8ea2/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
----------------------------------------------------------------------
diff --git 
a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
 
b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
index 67f45fd..06918c5 100644
--- 
a/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
+++ 
b/activemq-jms-pool/src/main/java/org/apache/activemq/jms/pool/PooledConnectionFactory.java
@@ -77,6 +77,7 @@ public class PooledConnectionFactory implements 
ConnectionFactory, QueueConnecti
     private long expiryTimeout = 0l;
     private boolean createConnectionOnStartup = true;
     private boolean useAnonymousProducers = true;
+    private boolean reconnectOnException = true;
 
     // Temporary value used to always fetch the result of makeObject.
     private final AtomicReference<ConnectionPool> mostRecentlyCreated = new 
AtomicReference<ConnectionPool>(null);
@@ -115,6 +116,7 @@ public class PooledConnectionFactory implements 
ConnectionFactory, QueueConnecti
                             
connection.setBlockIfSessionPoolIsFullTimeout(getBlockIfSessionPoolIsFullTimeout());
                         }
                         
connection.setUseAnonymousProducers(isUseAnonymousProducers());
+                        
connection.setReconnectOnException(isReconnectOnException());
 
                         if (LOG.isTraceEnabled()) {
                             LOG.trace("Created new connection: {}", 
connection);
@@ -560,6 +562,23 @@ public class PooledConnectionFactory implements 
ConnectionFactory, QueueConnecti
     }
 
     /**
+     * @return true if the underlying connection will be renewed on 
JMSException, false otherwise
+     */
+    public boolean isReconnectOnException() {
+        return reconnectOnException;
+    }
+
+    /**
+     * Controls weather the underlying connection should be reset (and 
renewed) on JMSException
+     *
+     * @param reconnectOnException
+     *          Boolean value that configures whether reconnect on exception 
should happen
+     */
+    public void setReconnectOnException(boolean reconnectOnException) {
+        this.reconnectOnException = reconnectOnException;
+    }
+
+    /**
      * Called by any superclass that implements a JNDIReferencable or similar 
that needs to collect
      * the properties of this class for storage etc.
      *
@@ -577,5 +596,6 @@ public class PooledConnectionFactory implements 
ConnectionFactory, QueueConnecti
         props.setProperty("createConnectionOnStartup", 
Boolean.toString(isCreateConnectionOnStartup()));
         props.setProperty("useAnonymousProducers", 
Boolean.toString(isUseAnonymousProducers()));
         props.setProperty("blockIfSessionPoolIsFullTimeout", 
Long.toString(getBlockIfSessionPoolIsFullTimeout()));
+        props.setProperty("reconnectOnException", 
Boolean.toString(isReconnectOnException()));
     }
 }

Reply via email to