Author: chirino
Date: Tue Nov 14 06:12:03 2006
New Revision: 474799

URL: http://svn.apache.org/viewvc?view=rev&rev=474799
Log:
http://issues.apache.org/activemq/browse/AMQ-1045 we now evict failed 
connections from a connection pool.

Added:
    
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java
Modified:
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java?view=diff&rev=474799&r1=474798&r2=474799
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java
 Tue Nov 14 06:12:03 2006
@@ -17,18 +17,19 @@
  */
 package org.apache.activemq.pool;
 
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.util.JMSExceptionSupport;
-import org.apache.commons.pool.ObjectPoolFactory;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 
 import javax.jms.JMSException;
 import javax.jms.Session;
 
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.commons.pool.ObjectPoolFactory;
+
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Holds a real JMS connection along with the session pools associated with it.
@@ -36,13 +37,33 @@
  * @version $Revision$
  */
 public class ConnectionPool {
+       
     private ActiveMQConnection connection;
     private Map cache;
     private AtomicBoolean started = new AtomicBoolean(false);
+    private int referenceCount;
     private ObjectPoolFactory poolFactory;
+       private long lastUsed;
+       private boolean hasFailed;
+       private int idleTimeout = 30*1000;
 
     public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory 
poolFactory) {
         this(connection, new HashMap(), poolFactory);
+        // Add a transport Listener so that we can notice if this connection 
should be expired due to 
+        // a connection failure.
+        connection.addTransportListener(new TransportListener(){
+                       public void onCommand(Object command) {
+                       }
+                       public void onException(IOException error) {
+                               synchronized(ConnectionPool.this) {
+                                       hasFailed = true;
+                               }
+                       }
+                       public void transportInterupted() {
+                       }
+                       public void transportResumed() {
+                       }
+               });
     }
 
     public ConnectionPool(ActiveMQConnection connection, Map cache, 
ObjectPoolFactory poolFactory) {
@@ -57,7 +78,7 @@
         }
     }
 
-    public ActiveMQConnection getConnection() {
+    synchronized public ActiveMQConnection getConnection() {
         return connection;
     }
 
@@ -71,20 +92,58 @@
         return pool.borrowSession();
     }
 
-    public void close() throws JMSException {
-        Iterator i = cache.values().iterator();
-        while (i.hasNext()) {
-            SessionPool pool = (SessionPool) i.next();
-            i.remove();
+    synchronized public void close() {
+       if( connection!=null ) {
+               Iterator i = cache.values().iterator();
+               while (i.hasNext()) {
+                   SessionPool pool = (SessionPool) i.next();
+                   i.remove();
+                   try {
+                       pool.close();
+                   } catch (Exception e) {
+                   }
+               }
             try {
-                pool.close();
-            }
-            catch (Exception e) {
-                throw JMSExceptionSupport.create(e);
+               connection.close();
+            } catch (Exception e) {
             }
-        }
-        connection.close();
-        connection = null;
+               connection = null;
+       }
     }
+
+    synchronized public void incrementReferenceCount() {
+               referenceCount++;
+       }
+
+       synchronized public void decrementReferenceCount() {
+               referenceCount--;
+               if( referenceCount == 0 ) {
+                       lastUsed = System.currentTimeMillis();
+                       expiredCheck();
+               }
+       }
+
+       /**
+        * @return true if this connection has expired.
+        */
+       synchronized public boolean expiredCheck() {
+               if( connection == null )
+                       return true;
+               if( hasFailed || idleTimeout> 0 && System.currentTimeMillis() > 
lastUsed+idleTimeout ) {
+                       if( referenceCount == 0 ) {
+                               close();
+                       }
+                       return true;
+               }
+               return false;
+       }
+
+       public int getIdleTimeout() {
+               return idleTimeout;
+       }
+
+       public void setIdleTimeout(int idleTimeout) {
+               this.idleTimeout = idleTimeout;
+       }
 
 }

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java?view=diff&rev=474799&r1=474798&r2=474799
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java
 Tue Nov 14 06:12:03 2006
@@ -56,6 +56,7 @@
 
     public PooledConnection(ConnectionPool pool) {
         this.pool = pool;
+        this.pool.incrementReferenceCount();
     }
 
     /**
@@ -66,7 +67,10 @@
     }
 
     public void close() throws JMSException {
-        pool = null;
+       if( this.pool!=null ) {
+               this.pool.decrementReferenceCount();
+               this.pool = null;
+       }
     }
 
     public void start() throws JMSException {
@@ -133,7 +137,7 @@
     // Implementation methods
     // 
-------------------------------------------------------------------------
 
-    protected ActiveMQConnection getConnection() throws JMSException {
+    ActiveMQConnection getConnection() throws JMSException {
         assertNotClosed();
         return pool.getConnection();
     }

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java?view=diff&rev=474799&r1=474798&r2=474799
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java
 Tue Nov 14 06:12:03 2006
@@ -17,22 +17,20 @@
  */
 package org.apache.activemq.pool;
 
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.Service;
 import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.ServiceStopper;
 import org.apache.commons.pool.ObjectPoolFactory;
 import org.apache.commons.pool.impl.GenericObjectPoolFactory;
-import org.apache.commons.pool.impl.GenericObjectPool.Config;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
 
 /**
  * A JMS provider which pools Connection, Session and MessageProducer instances
@@ -79,6 +77,13 @@
     public synchronized Connection createConnection(String userName, String 
password) throws JMSException {
         ConnectionKey key = new ConnectionKey(userName, password);
         ConnectionPool connection = (ConnectionPool) cache.get(key);
+        
+        // Now.. we might get a connection, but it might be that we need to 
+        // dump it..
+        if( connection!=null && connection.expiredCheck() ) {
+               connection=null;
+        }
+        
         if (connection == null) {
             ActiveMQConnection delegate = createConnection(key);
             connection = new ConnectionPool(delegate, getPoolFactory());
@@ -109,17 +114,10 @@
     }
 
     public void stop() throws Exception {
-        ServiceStopper stopper = new ServiceStopper();
         for (Iterator iter = cache.values().iterator(); iter.hasNext();) {
             ConnectionPool connection = (ConnectionPool) iter.next();
-            try {
-                connection.close();
-            }
-            catch (JMSException e) {
-                stopper.onException(this, e);
-            }
+            connection.close();
         }
-        stopper.throwFirstException();
     }
 
     public ObjectPoolFactory getPoolFactory() {

Added: 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java
URL: 
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java?view=auto&rev=474799
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java
 (added)
+++ 
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java
 Tue Nov 14 06:12:03 2006
@@ -0,0 +1,79 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.pool;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.test.TestSupport;
+import org.apache.activemq.transport.mock.MockTransport;
+
+public class ConnectionFailureEvictsFromPool extends TestSupport {
+       
+       private BrokerService broker;
+       private ActiveMQConnectionFactory factory;
+       private PooledConnectionFactory pooledFactory;
+
+       protected void setUp() throws Exception {
+               broker = new BrokerService();
+               broker.setPersistent(false);
+               TransportConnector connector = 
broker.addConnector("tcp://localhost:0");
+               broker.start();
+               factory = new 
ActiveMQConnectionFactory("mock:"+connector.getConnectUri());
+               pooledFactory = new PooledConnectionFactory(factory);
+       }
+       
+       public void testEviction() throws Exception {
+               Connection connection = pooledFactory.createConnection();
+               sendMessage(connection);
+               createConnectionFailure(connection);
+               try {
+                       sendMessage(connection);
+                       fail("Expected Error");
+               } catch ( JMSException e) {
+               }
+               
+               // If we get another connection now it should be a new 
connection that works.
+               Connection connection2 = pooledFactory.createConnection();
+               sendMessage(connection2);               
+       }
+       
+       private void createConnectionFailure(Connection connection) throws 
Exception {
+               ActiveMQConnection c = 
((PooledConnection)connection).getConnection();
+               MockTransport t = (MockTransport) 
c.getTransportChannel().narrow(MockTransport.class);
+               t.stop();
+       }
+
+       private void sendMessage(Connection connection) throws JMSException {
+               Session session = connection.createSession(false, 0);
+               MessageProducer producer = session.createProducer(new 
ActiveMQQueue("FOO"));
+               producer.send(session.createTextMessage("Test"));
+               session.close();
+       }
+
+       protected void tearDown() throws Exception {
+               broker.stop();
+       }
+}


Reply via email to