Author: chirino Date: Tue Nov 14 06:12:03 2006 New Revision: 474799 URL: http://svn.apache.org/viewvc?view=rev&rev=474799 Log: http://issues.apache.org/activemq/browse/AMQ-1045 we now evict failed connections from a connection pool.
Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java?view=diff&rev=474799&r1=474798&r2=474799 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/ConnectionPool.java Tue Nov 14 06:12:03 2006 @@ -17,18 +17,19 @@ */ package org.apache.activemq.pool; -import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; - -import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.util.JMSExceptionSupport; -import org.apache.commons.pool.ObjectPoolFactory; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; import javax.jms.JMSException; import javax.jms.Session; -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.transport.TransportListener; +import org.apache.commons.pool.ObjectPoolFactory; + +import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean; /** * Holds a real JMS connection along with the session pools associated with it. @@ -36,13 +37,33 @@ * @version $Revision$ */ public class ConnectionPool { + private ActiveMQConnection connection; private Map cache; private AtomicBoolean started = new AtomicBoolean(false); + private int referenceCount; private ObjectPoolFactory poolFactory; + private long lastUsed; + private boolean hasFailed; + private int idleTimeout = 30*1000; public ConnectionPool(ActiveMQConnection connection, ObjectPoolFactory poolFactory) { this(connection, new HashMap(), poolFactory); + // Add a transport Listener so that we can notice if this connection should be expired due to + // a connection failure. + connection.addTransportListener(new TransportListener(){ + public void onCommand(Object command) { + } + public void onException(IOException error) { + synchronized(ConnectionPool.this) { + hasFailed = true; + } + } + public void transportInterupted() { + } + public void transportResumed() { + } + }); } public ConnectionPool(ActiveMQConnection connection, Map cache, ObjectPoolFactory poolFactory) { @@ -57,7 +78,7 @@ } } - public ActiveMQConnection getConnection() { + synchronized public ActiveMQConnection getConnection() { return connection; } @@ -71,20 +92,58 @@ return pool.borrowSession(); } - public void close() throws JMSException { - Iterator i = cache.values().iterator(); - while (i.hasNext()) { - SessionPool pool = (SessionPool) i.next(); - i.remove(); + synchronized public void close() { + if( connection!=null ) { + Iterator i = cache.values().iterator(); + while (i.hasNext()) { + SessionPool pool = (SessionPool) i.next(); + i.remove(); + try { + pool.close(); + } catch (Exception e) { + } + } try { - pool.close(); - } - catch (Exception e) { - throw JMSExceptionSupport.create(e); + connection.close(); + } catch (Exception e) { } - } - connection.close(); - connection = null; + connection = null; + } } + + synchronized public void incrementReferenceCount() { + referenceCount++; + } + + synchronized public void decrementReferenceCount() { + referenceCount--; + if( referenceCount == 0 ) { + lastUsed = System.currentTimeMillis(); + expiredCheck(); + } + } + + /** + * @return true if this connection has expired. + */ + synchronized public boolean expiredCheck() { + if( connection == null ) + return true; + if( hasFailed || idleTimeout> 0 && System.currentTimeMillis() > lastUsed+idleTimeout ) { + if( referenceCount == 0 ) { + close(); + } + return true; + } + return false; + } + + public int getIdleTimeout() { + return idleTimeout; + } + + public void setIdleTimeout(int idleTimeout) { + this.idleTimeout = idleTimeout; + } } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java?view=diff&rev=474799&r1=474798&r2=474799 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnection.java Tue Nov 14 06:12:03 2006 @@ -56,6 +56,7 @@ public PooledConnection(ConnectionPool pool) { this.pool = pool; + this.pool.incrementReferenceCount(); } /** @@ -66,7 +67,10 @@ } public void close() throws JMSException { - pool = null; + if( this.pool!=null ) { + this.pool.decrementReferenceCount(); + this.pool = null; + } } public void start() throws JMSException { @@ -133,7 +137,7 @@ // Implementation methods // ------------------------------------------------------------------------- - protected ActiveMQConnection getConnection() throws JMSException { + ActiveMQConnection getConnection() throws JMSException { assertNotClosed(); return pool.getConnection(); } Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java?view=diff&rev=474799&r1=474798&r2=474799 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/pool/PooledConnectionFactory.java Tue Nov 14 06:12:03 2006 @@ -17,22 +17,20 @@ */ package org.apache.activemq.pool; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; + import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.Service; import org.apache.activemq.util.IOExceptionSupport; -import org.apache.activemq.util.ServiceStopper; import org.apache.commons.pool.ObjectPoolFactory; import org.apache.commons.pool.impl.GenericObjectPoolFactory; -import org.apache.commons.pool.impl.GenericObjectPool.Config; - -import javax.jms.Connection; -import javax.jms.ConnectionFactory; -import javax.jms.JMSException; - -import java.util.HashMap; -import java.util.Iterator; -import java.util.Map; /** * A JMS provider which pools Connection, Session and MessageProducer instances @@ -79,6 +77,13 @@ public synchronized Connection createConnection(String userName, String password) throws JMSException { ConnectionKey key = new ConnectionKey(userName, password); ConnectionPool connection = (ConnectionPool) cache.get(key); + + // Now.. we might get a connection, but it might be that we need to + // dump it.. + if( connection!=null && connection.expiredCheck() ) { + connection=null; + } + if (connection == null) { ActiveMQConnection delegate = createConnection(key); connection = new ConnectionPool(delegate, getPoolFactory()); @@ -109,17 +114,10 @@ } public void stop() throws Exception { - ServiceStopper stopper = new ServiceStopper(); for (Iterator iter = cache.values().iterator(); iter.hasNext();) { ConnectionPool connection = (ConnectionPool) iter.next(); - try { - connection.close(); - } - catch (JMSException e) { - stopper.onException(this, e); - } + connection.close(); } - stopper.throwFirstException(); } public ObjectPoolFactory getPoolFactory() { Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java?view=auto&rev=474799 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java (added) +++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/pool/ConnectionFailureEvictsFromPool.java Tue Nov 14 06:12:03 2006 @@ -0,0 +1,79 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.pool; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.test.TestSupport; +import org.apache.activemq.transport.mock.MockTransport; + +public class ConnectionFailureEvictsFromPool extends TestSupport { + + private BrokerService broker; + private ActiveMQConnectionFactory factory; + private PooledConnectionFactory pooledFactory; + + protected void setUp() throws Exception { + broker = new BrokerService(); + broker.setPersistent(false); + TransportConnector connector = broker.addConnector("tcp://localhost:0"); + broker.start(); + factory = new ActiveMQConnectionFactory("mock:"+connector.getConnectUri()); + pooledFactory = new PooledConnectionFactory(factory); + } + + public void testEviction() throws Exception { + Connection connection = pooledFactory.createConnection(); + sendMessage(connection); + createConnectionFailure(connection); + try { + sendMessage(connection); + fail("Expected Error"); + } catch ( JMSException e) { + } + + // If we get another connection now it should be a new connection that works. + Connection connection2 = pooledFactory.createConnection(); + sendMessage(connection2); + } + + private void createConnectionFailure(Connection connection) throws Exception { + ActiveMQConnection c = ((PooledConnection)connection).getConnection(); + MockTransport t = (MockTransport) c.getTransportChannel().narrow(MockTransport.class); + t.stop(); + } + + private void sendMessage(Connection connection) throws JMSException { + Session session = connection.createSession(false, 0); + MessageProducer producer = session.createProducer(new ActiveMQQueue("FOO")); + producer.send(session.createTextMessage("Test")); + session.close(); + } + + protected void tearDown() throws Exception { + broker.stop(); + } +}