Author: rgodfrey
Date: Thu May 7 23:20:46 2015
New Revision: 1678273
URL: http://svn.apache.org/r1678273
Log:
QPID-6534 : [Java Client] Add a PooledConnectionFactory
Added:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/CommonConnection.java
(with props)
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/PooledConnectionFactory.java
(with props)
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/handler/PooledConnectionFactoryTest.java
(with props)
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
qpid/java/trunk/client/src/test/java/org/apache/qpid/jms/FailoverPolicyTest.java
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1678273&r1=1678272&r2=1678273&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Thu May 7 23:20:46 2015
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
+import java.net.InetAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.nio.channels.UnresolvedAddressException;
@@ -33,6 +34,7 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
@@ -88,7 +90,7 @@ import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.url.URLSyntaxException;
-public class AMQConnection extends Closeable implements Connection,
QueueConnection, TopicConnection, Referenceable
+public class AMQConnection extends Closeable implements CommonConnection,
Referenceable
{
private static final Logger _logger =
LoggerFactory.getLogger(AMQConnection.class);
private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong();
@@ -125,7 +127,7 @@ public class AMQConnection extends Close
/** Maps from session id (Integer) to AMQSession instance */
private final ChannelToSessionMap _sessions = new ChannelToSessionMap();
- private final String _clientName;
+ private String _clientName;
/** The user name to use for authentication */
private String _username;
@@ -157,6 +159,8 @@ public class AMQConnection extends Close
*/
private boolean _connected;
+ private boolean _connectionAttempted;
+
/*
* The connection meta data
*/
@@ -403,7 +407,6 @@ public class AMQConnection extends Close
}
_failoverPolicy = new FailoverPolicy(connectionURL, this);
- BrokerDetails brokerDetails =
_failoverPolicy.getCurrentBrokerDetails();
if ("0-8".equals(amqpVersion))
{
_delegate = new AMQConnectionDelegate_8_0(this);
@@ -459,6 +462,31 @@ public class AMQConnection extends Close
// We are not currently connected
setConnected(false);
+ if(_clientName != null)
+ {
+ makeConnection();
+ }
+
+ _connectionMetaData = new QpidConnectionMetaData(this);
+ }
+
+ private void makeConnection() throws AMQException
+ {
+ _connectionAttempted = true;
+ if(_clientName == null)
+ {
+
+ try
+ {
+ InetAddress addr = InetAddress.getLocalHost();
+ _clientName = addr.getHostName() + System.currentTimeMillis();
+ }
+ catch (UnknownHostException e)
+ {
+ _clientName = "UnknownHost" + UUID.randomUUID();
+ }
+ }
+ BrokerDetails brokerDetails =
_failoverPolicy.getCurrentBrokerDetails();
boolean retryAllowed = true;
Exception connectionException = null;
while (!isConnected() && retryAllowed && brokerDetails != null)
@@ -565,8 +593,6 @@ public class AMQConnection extends Close
_sessions.setMaxChannelID(_delegate.getMaxChannelID());
_sessions.setMinChannelID(_delegate.getMinChannelID());
-
- _connectionMetaData = new QpidConnectionMetaData(this);
}
private void initDelegate(ProtocolVersion pe) throws AMQProtocolException
@@ -744,8 +770,21 @@ public class AMQConnection extends Close
{
synchronized (_sessionCreationLock)
{
+
checkNotClosed();
+ if(!_connectionAttempted)
+ {
+ try
+ {
+ makeConnection();
+ }
+ catch (AMQException e)
+ {
+ throw JMSExceptionHelper.chainJMSException(new
JMSException("Unable to establish connection"),e);
+ }
+ }
+
if(_delegate.isVirtualHostPropertiesSupported() &&
!_virtualHostPropertiesPopulated)
{
retrieveVirtualHostPropertiesIfNecessary();
@@ -843,16 +882,26 @@ public class AMQConnection extends Close
public void setClientID(String clientID) throws JMSException
{
checkNotClosed();
- // in AMQP it is not possible to change the client ID. If one is not
specified
- // upon connection construction, an id is generated automatically.
Therefore
- // we can always throw an exception.
- if
(!Boolean.getBoolean(ClientProperties.IGNORE_SET_CLIENTID_PROP_NAME))
- {
- throw new IllegalStateException("Client name cannot be changed
after being set");
- }
- else
+ synchronized(_sessionCreationLock)
{
- _logger.info("Operation setClientID is ignored using ID: " +
getClientID());
+ if(_connectionAttempted)
+ {
+ // in AMQP it is not possible to change the client ID. If one
is not specified
+ // upon connection construction, an id is generated
automatically. Therefore
+ // we can always throw an exception.
+ if
(!Boolean.getBoolean(ClientProperties.IGNORE_SET_CLIENTID_PROP_NAME))
+ {
+ throw new IllegalStateException("Client name cannot be
changed after being set");
+ }
+ else
+ {
+ _logger.info("Operation setClientID is ignored using ID: "
+ getClientID());
+ }
+ }
+ else
+ {
+ _clientName = clientID;
+ }
}
}
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java?rev=1678273&r1=1678272&r2=1678273&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
Thu May 7 23:20:46 2015
@@ -109,7 +109,7 @@ public class AMQConnectionFactory implem
}
}
- public Connection createConnection() throws JMSException
+ public AMQConnection createConnection() throws JMSException
{
if(_connectionDetails == null)
{
@@ -131,12 +131,12 @@ public class AMQConnectionFactory implem
}
}
- public Connection createConnection(String userName, String password)
throws JMSException
+ public AMQConnection createConnection(String userName, String password)
throws JMSException
{
return createConnection(userName, password, null);
}
- public Connection createConnection(String userName, String password,
String id) throws JMSException
+ public AMQConnection createConnection(String userName, String password,
String id) throws JMSException
{
if (_connectionDetails != null)
{
@@ -170,22 +170,22 @@ public class AMQConnectionFactory implem
public QueueConnection createQueueConnection() throws JMSException
{
- return (QueueConnection) createConnection();
+ return createConnection();
}
public QueueConnection createQueueConnection(String username, String
password) throws JMSException
{
- return (QueueConnection) createConnection(username, password);
+ return createConnection(username, password);
}
public TopicConnection createTopicConnection() throws JMSException
{
- return (TopicConnection) createConnection();
+ return createConnection();
}
public TopicConnection createTopicConnection(String username, String
password) throws JMSException
{
- return (TopicConnection) createConnection(username, password);
+ return createConnection(username, password);
}
Added:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/CommonConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/CommonConnection.java?rev=1678273&view=auto
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/CommonConnection.java
(added)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/CommonConnection.java
Thu May 7 23:20:46 2015
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import javax.jms.QueueConnection;
+import javax.jms.TopicConnection;
+
+import org.apache.qpid.jms.Connection;
+
+public interface CommonConnection extends Connection, TopicConnection,
QueueConnection
+{
+ boolean isClosed();
+}
Propchange:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/CommonConnection.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/PooledConnectionFactory.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/PooledConnectionFactory.java?rev=1678273&view=auto
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/PooledConnectionFactory.java
(added)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/PooledConnectionFactory.java
Thu May 7 23:20:46 2015
@@ -0,0 +1,648 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.IllegalStateException;
+import javax.jms.Session;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.StringRefAddr;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.util.JMSExceptionHelper;
+import org.apache.qpid.jms.*;
+import org.apache.qpid.url.URLSyntaxException;
+
+public class PooledConnectionFactory implements ConnectionFactory,
QueueConnectionFactory, TopicConnectionFactory
+{
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PooledConnectionFactory.class);
+
+ private static final ScheduledExecutorService SCHEDULER =
+ Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactory()
+ {
+ private ThreadGroup _group;
+ {
+ SecurityManager securityManager =
System.getSecurityManager();
+ _group = securityManager == null
+ ? Thread.currentThread().getThreadGroup()
+ : securityManager.getThreadGroup();
+
+ }
+
+ @Override
+ public Thread newThread(Runnable runnable)
+ {
+ Thread thread = new Thread(_group,
+ runnable,
+
PooledConnectionFactory.class.getSimpleName() + "-Reaper");
+ if (!thread.isDaemon())
+ {
+ thread.setDaemon(true);
+ }
+ return thread;
+ }
+
+ });
+
+ private final AtomicInteger _maxPoolSize = new AtomicInteger(10);
+ private final AtomicLong _connectionTimeout = new AtomicLong(30000l);
+
+ private final AtomicReference<ConnectionURL> _connectionDetails = new
AtomicReference<>();
+
+ transient private final byte[] _factoryId = new byte[16];
+ transient private final Map<ConnectionDetailsIdentifier,
List<ConnectionHolder>> _pool = Collections.synchronizedMap(new
HashMap<ConnectionDetailsIdentifier, List<ConnectionHolder>>());
+ transient private final Runnable _connectionReaper = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+
_reaperScheduled.set(false);
+
if(removeExpiredConnections())
+ {
+ scheduleReaper();
+ }
+ }
+ };
+
+
+ transient private final AtomicBoolean _reaperScheduled = new
AtomicBoolean();
+
+ public PooledConnectionFactory()
+ {
+ final Random random = new Random();
+ random.nextBytes(_factoryId);
+
+ }
+
+ private void scheduleReaper()
+ {
+ if(_reaperScheduled.compareAndSet(false,true))
+ {
+ SCHEDULER.schedule(_connectionReaper, _connectionTimeout.get(),
TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private boolean removeExpiredConnections()
+ {
+ try
+ {
+ boolean scheduleAgain = false;
+ List<List<ConnectionHolder>> pooledConnections;
+ synchronized (_pool)
+ {
+ pooledConnections = new ArrayList<>(_pool.values());
+ }
+ if(!pooledConnections.isEmpty())
+ {
+ long now = System.currentTimeMillis();
+ for (List<ConnectionHolder> connections : pooledConnections)
+ {
+ synchronized (connections)
+ {
+ removeExpiredConnections(connections, now);
+ scheduleAgain = scheduleAgain ||
!connections.isEmpty();
+ }
+
+ }
+ }
+ return scheduleAgain;
+ }
+ catch(RuntimeException e)
+ {
+ LOGGER.warn("Error encountered in " +
PooledConnectionFactory.class.getSimpleName() + " reaper", e);
+ return true;
+ }
+ }
+
+ @Override
+ public QueueConnection createQueueConnection() throws JMSException
+ {
+ return getConnectionFromPool();
+ }
+
+ @Override
+ public QueueConnection createQueueConnection(final String userName, final
String password) throws JMSException
+ {
+ return getConnectionFromPool(userName, password);
+ }
+
+ @Override
+ public TopicConnection createTopicConnection() throws JMSException
+ {
+ return getConnectionFromPool();
+ }
+
+ @Override
+ public TopicConnection createTopicConnection(final String userName, final
String password) throws JMSException
+ {
+ return getConnectionFromPool(userName, password);
+ }
+
+ @Override
+ public Connection createConnection() throws JMSException
+ {
+ return getConnectionFromPool();
+ }
+
+ @Override
+ public Connection createConnection(final String userName, final String
password) throws JMSException
+ {
+ return getConnectionFromPool(userName, password);
+ }
+
+
+ private CommonConnection getConnectionFromPool() throws JMSException
+ {
+ final ConnectionURL connectionDetails = getConnectionURLOrError();
+ ConnectionDetailsIdentifier identity =
ConnectionDetailsIdentifier.newInstance(_factoryId,
+
connectionDetails.getURL(),
+
connectionDetails.getUsername(),
+
connectionDetails.getPassword());
+
+ return getConnectionFromPool(connectionDetails, identity);
+ }
+
+ private CommonConnection getConnectionFromPool(final ConnectionURL
connectionDetails,
+ final
ConnectionDetailsIdentifier identity)
+ throws JMSException
+ {
+ CommonConnection underlying = null;
+
+ synchronized (_pool)
+ {
+ List<ConnectionHolder> pooledConnections = _pool.get(identity);
+ if(pooledConnections != null)
+ {
+ synchronized (pooledConnections)
+ {
+ if (!pooledConnections.isEmpty())
+ {
+ ConnectionHolder holder =
pooledConnections.remove(pooledConnections.size() - 1);
+ underlying = holder._connection;
+ }
+ }
+ }
+ }
+
+
+ try
+ {
+ if(underlying == null)
+ {
+ underlying = newConnectionInstance(connectionDetails);
+ }
+ return proxyConnection(underlying, identity);
+ }
+ catch (AMQException e)
+ {
+ throw JMSExceptionHelper.chainJMSException(new JMSException("Error
creating connection: " + e.getMessage()),
+ e);
+ }
+ }
+
+ protected CommonConnection newConnectionInstance(final ConnectionURL
connectionDetails) throws AMQException
+ {
+ return new AMQConnection(connectionDetails);
+ }
+
+ private ConnectionURL getConnectionURLOrError() throws
IllegalStateException
+ {
+ final ConnectionURL connectionDetails = _connectionDetails.get();
+ if(connectionDetails == null)
+ {
+ throw new IllegalStateException("Cannot create a connection when
the connection URL has not yet been set");
+ }
+ return connectionDetails;
+ }
+
+ private CommonConnection getConnectionFromPool(String user, String
password) throws JMSException
+ {
+ ConnectionURL connectionDetails = getConnectionURLOrError();
+ ConnectionDetailsIdentifier identity =
ConnectionDetailsIdentifier.newInstance(_factoryId,
+
connectionDetails.getURL(),
+
user,
+
password);
+ try
+ {
+ connectionDetails = new
AMQConnectionURL(connectionDetails.getURL());
+ connectionDetails.setUsername(user);
+ connectionDetails.setPassword(password);
+ }
+ catch (URLSyntaxException e)
+ {
+ throw JMSExceptionHelper.chainJMSException(new JMSException("Error
creating connection: " + e.getMessage()),
+ e);
+ }
+ return getConnectionFromPool(connectionDetails, identity);
+ }
+
+
+
+ private synchronized void returnToPool(final CommonConnection connection,
final ConnectionDetailsIdentifier identityHash)
+ throws JMSException
+ {
+ if(!connection.isClosed())
+ {
+ connection.stop();
+ List<ConnectionHolder> connections;
+ synchronized (_pool)
+ {
+ connections = _pool.get(identityHash);
+ if(connections == null)
+ {
+ connections = new ArrayList<>();
+ _pool.put(identityHash, connections);
+ scheduleReaper();
+ }
+ }
+ synchronized (connections)
+ {
+ if(connections.size()<_maxPoolSize.get())
+ {
+ connections.add(new ConnectionHolder(connection,
System.currentTimeMillis()));
+ }
+ }
+ }
+ }
+
+ private void removeExpiredConnections(final List<ConnectionHolder>
connections,
+ final long now)
+ {
+ long expiryTime = now - _connectionTimeout.get();
+ Iterator<ConnectionHolder> iter = connections.iterator();
+ while(iter.hasNext())
+ {
+ ConnectionHolder ch = iter.next();
+ if(ch._lastUse < expiryTime)
+ {
+ iter.remove();
+ try
+ {
+ ch._connection.close();
+ }
+ catch (JMSException | RuntimeException e )
+ {
+ LOGGER.warn("Error when closing expired connection in
pool", e);
+ }
+ }
+ }
+ }
+
+ public int getMaxPoolSize()
+ {
+ return _maxPoolSize.get();
+ }
+
+ public long getConnectionTimeout()
+ {
+ return _connectionTimeout.get();
+ }
+
+ public void setMaxPoolSize(int maxPoolSize)
+ {
+ _maxPoolSize.set(maxPoolSize);
+ }
+
+ public void setConnectionTimeout(long timeout)
+ {
+ _connectionTimeout.set(timeout);
+ }
+
+
+ public synchronized ConnectionURL getConnectionURL()
+ {
+ return _connectionDetails.get();
+ }
+
+ public synchronized String getConnectionURLString()
+ {
+ return _connectionDetails.toString();
+ }
+
+ //setter necessary to use instances created with the default constructor
(which we can't remove)
+ public synchronized final void setConnectionURLString(String url) throws
URLSyntaxException
+ {
+ final AMQConnectionURL connectionDetails = new AMQConnectionURL(url);
+ if(!_connectionDetails.compareAndSet(null, connectionDetails))
+ {
+ throw new IllegalArgumentException("Cannot change factory URL
after it has already been set");
+ }
+ }
+
+ public Reference getReference() throws NamingException
+ {
+ return new Reference(
+ PooledConnectionFactory.class.getName(),
+ new StringRefAddr(PooledConnectionFactory.class.getName(),
_connectionDetails.get().getURL()),
+ PooledConnectionFactory.class.getName(), null); //
factory location
+ }
+
+ private CommonConnection proxyConnection(CommonConnection underlying,
ConnectionDetailsIdentifier identifier) throws JMSException
+ {
+
+ final ConnectionInvocationHandler invocationHandler = new
ConnectionInvocationHandler(underlying, identifier);
+ return (CommonConnection)
Proxy.newProxyInstance(getClass().getClassLoader(),
+ new Class[] {
CommonConnection.class },
+ invocationHandler);
+ }
+
+ private <X extends Session> X proxySession(X underlying,
ConnectionInvocationHandler connectionHandler)
+ {
+ List<Class<?>> interfaces = new ArrayList<>();
+ interfaces.add(Session.class);
+ if(underlying instanceof org.apache.qpid.jms.Session)
+ {
+ interfaces.add(org.apache.qpid.jms.Session.class);
+ }
+ if(underlying instanceof TopicSession)
+ {
+ interfaces.add(TopicSession.class);
+ }
+ if(underlying instanceof QueueSession)
+ {
+ interfaces.add(QueueSession.class);
+ }
+ return (X) Proxy.newProxyInstance(getClass().getClassLoader(),
+ interfaces.toArray(new
Class[interfaces.size()]),
+ new
SessionInvocationHandler<X>(underlying, connectionHandler));
+ }
+
+ private class ConnectionInvocationHandler implements InvocationHandler,
ExceptionListener
+ {
+ private final CommonConnection _underlyingConnection;
+ private final ConnectionDetailsIdentifier _identityHash;
+ private boolean _closed;
+ private volatile boolean _exceptionThrown;
+ private final List<Session> _openSessions = new ArrayList<>();
+ private volatile ExceptionListener _exceptionListener;
+
+ public ConnectionInvocationHandler(final CommonConnection underlying,
ConnectionDetailsIdentifier identityHash) throws JMSException
+ {
+ _underlyingConnection = underlying;
+ _underlyingConnection.setExceptionListener(this);
+ _identityHash = identityHash;
+ }
+
+ @Override
+ public synchronized Object invoke(final Object proxy, final Method
method, final Object[] args) throws Throwable
+ {
+ if(_closed)
+ {
+ throw new IllegalStateException("Connection is closed");
+ }
+ Method underlyingMethod =
_underlyingConnection.getClass().getMethod(method.getName(),
method.getParameterTypes());
+ if(method.getName().equals("getExceptionListener"))
+ {
+ return _exceptionListener;
+ }
+ else if(method.getName().equals("setExceptionListener") &&
method.getParameterTypes().length == 1 &&
method.getParameterTypes()[0].equals(ExceptionListener.class))
+ {
+ _exceptionListener = (ExceptionListener) args[0];
+ return null;
+ }
+ else if(method.getName().equals("close") &&
method.getParameterTypes().length == 0)
+ {
+ _closed = true;
+ _exceptionListener = null;
+ List<Session> openSessions = new ArrayList<>(_openSessions);
+ for(Session session : openSessions)
+ {
+ try
+ {
+ session.close();
+ }
+ catch(JMSException | RuntimeException | Error e)
+ {
+ _exceptionThrown = true;
+ throw e;
+ }
+ }
+ _openSessions.clear();
+
+ if(!_exceptionThrown)
+ {
+ returnToPool(_underlyingConnection, _identityHash);
+ }
+ else
+ {
+ _underlyingConnection.close();
+ }
+
+ return null;
+ }
+ else
+ {
+ try
+ {
+ Object returnVal =
underlyingMethod.invoke(_underlyingConnection, args);
+
+ if(returnVal instanceof Session)
+ {
+ returnVal = proxySession((Session)returnVal, this);
+ _openSessions.add((Session)returnVal);
+ }
+ return returnVal;
+ }
+ catch (InvocationTargetException e)
+ {
+ _exceptionThrown = true;
+ Throwable thrown = e.getCause();
+ throw thrown == null ? e : thrown;
+ }
+ }
+ }
+
+
+ @Override
+ public void onException(final JMSException exception)
+ {
+ _exceptionThrown = true;
+ ExceptionListener exceptionListener = _exceptionListener;
+ if(exceptionListener != null)
+ {
+ exceptionListener.onException(exception);
+ }
+ }
+
+ public synchronized void removeSession(final Session session)
+ {
+ _openSessions.remove(session);
+ }
+ }
+
+ private class SessionInvocationHandler<X extends Session> implements
InvocationHandler
+ {
+ private final X _underlying;
+ private final ConnectionInvocationHandler _connectionHandler;
+
+ public SessionInvocationHandler(final X underlying,
+ final ConnectionInvocationHandler
connectionHandler)
+ {
+ _underlying = underlying;
+ _connectionHandler = connectionHandler;
+ }
+
+ @Override
+ public Object invoke(final Object proxy, final Method method, final
Object[] args) throws Throwable
+ {
+ Method underlyingMethod =
_underlying.getClass().getMethod(method.getName(), method.getParameterTypes());
+ try
+ {
+ Object returnVal = underlyingMethod.invoke(_underlying, args);
+
+ if(method.getName().equals("close") &&
method.getParameterTypes().length == 0)
+ {
+ _connectionHandler.removeSession((Session)proxy);
+ }
+
+ return returnVal;
+ }
+ catch (InvocationTargetException e)
+ {
+ _connectionHandler._exceptionThrown = true;
+ Throwable thrown = e.getCause();
+ throw thrown == null ? e : thrown;
+ }
+
+ }
+ }
+
+ private static class ConnectionDetailsIdentifier
+ {
+ private final byte[] _urlHash;
+ private final String _user;
+ private final byte[] _userPasswordHash;
+
+ private static ConnectionDetailsIdentifier newInstance(byte[] id,
String url, final String user, String password)
+ {
+ try
+ {
+ MessageDigest digest = MessageDigest.getInstance("SHA-256");
+ digest.update(id);
+ digest.update(url.getBytes(StandardCharsets.UTF_8));
+ byte[] urlHash = digest.digest();
+
+ digest.update(id);
+ if(user != null)
+ {
+ digest.update(user.getBytes(StandardCharsets.UTF_8));
+ }
+ if(password != null)
+ {
+ digest.update(password.getBytes(StandardCharsets.UTF_8));
+ }
+ byte[] userPasswordHash = digest.digest();
+
+ return new ConnectionDetailsIdentifier(urlHash, user == null ?
"" : user, userPasswordHash);
+
+ }
+ catch (NoSuchAlgorithmException e)
+ {
+ throw new RuntimeException("SHA-256 not found, however
compliant Java implementations should always provide SHA-256", e);
+ }
+ }
+
+ private ConnectionDetailsIdentifier(final byte[] urlHash, final String
user, final byte[] userPasswordHash)
+ {
+ _urlHash = urlHash;
+ _user = user;
+ _userPasswordHash = userPasswordHash;
+ }
+
+ @Override
+ public boolean equals(final Object o)
+ {
+ if (this == o)
+ {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass())
+ {
+ return false;
+ }
+
+ final ConnectionDetailsIdentifier that =
(ConnectionDetailsIdentifier) o;
+
+ if (!Arrays.equals(_urlHash, that._urlHash))
+ {
+ return false;
+ }
+ if (!_user.equals(that._user))
+ {
+ return false;
+ }
+ return Arrays.equals(_userPasswordHash, that._userPasswordHash);
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int result = Arrays.hashCode(_urlHash);
+ result = 31 * result + _user.hashCode();
+ result = 31 * result + Arrays.hashCode(_userPasswordHash);
+ return result;
+ }
+ }
+
+ private class ConnectionHolder
+ {
+ private final CommonConnection _connection;
+ private final long _lastUse;
+
+ public ConnectionHolder(final CommonConnection connection, final long
lastUse)
+ {
+ _connection = connection;
+ _lastUse = lastUse;
+ }
+ }
+
+}
Propchange:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/PooledConnectionFactory.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/handler/PooledConnectionFactoryTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/handler/PooledConnectionFactoryTest.java?rev=1678273&view=auto
==============================================================================
---
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/handler/PooledConnectionFactoryTest.java
(added)
+++
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/handler/PooledConnectionFactoryTest.java
Thu May 7 23:20:46 2015
@@ -0,0 +1,432 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.CommonConnection;
+import org.apache.qpid.client.PooledConnectionFactory;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class PooledConnectionFactoryTest extends QpidTestCase
+{
+ private interface CommonConnectionCreator
+ {
+ CommonConnection newConnection(ConnectionURL connectionUrl);
+ }
+
+ private CommonConnectionCreator _connectionCreator;
+
+ private PooledConnectionFactory _connectionFactory;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _connectionCreator = mock(CommonConnectionCreator.class);
+
+ _connectionFactory = new PooledConnectionFactory()
+ {
+ @Override
+ protected CommonConnection newConnectionInstance(final
ConnectionURL connectionDetails) throws AMQException
+ {
+ return _connectionCreator.newConnection(connectionDetails);
+ }
+ };
+ }
+
+
+ public void testConnectionCreatedWithUrlUserAndPassword() throws Exception
+ {
+
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
+
+ final ArgumentCaptor<ConnectionURL> connectionCaptor =
ArgumentCaptor.forClass(ConnectionURL.class);
+ final List<CommonConnection> createdConnections = new ArrayList<>();
+
+
when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new
Answer<CommonConnection>()
+ {
+ @Override
+ public CommonConnection answer(final InvocationOnMock invocation)
throws Throwable
+ {
+ ConnectionURL url = connectionCaptor.getValue();
+ assertEquals("user", url.getUsername());
+ assertEquals("pass", url.getPassword());
+
+ final CommonConnection connection =
mock(CommonConnection.class);
+ when(connection.isClosed()).thenReturn(false);
+ createdConnections.add(connection);
+ return connection;
+ }
+ });
+
+ Connection conn = _connectionFactory.createConnection();
+ assertNotNull(conn);
+ assertEquals(1, createdConnections.size());
+ final String connToString = conn.toString();
+ assertEquals(createdConnections.get(0).toString(), connToString);
+
+ conn.close();
+
+ Connection conn2 = _connectionFactory.createConnection();
+ assertNotNull(conn2);
+ assertEquals(1, createdConnections.size());
+ assertEquals(createdConnections.get(0).toString(), conn2.toString());
+
+ assertEquals(connToString, conn2.toString());
+
+ try
+ {
+ conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ fail("Closed connection should not allow sessions to be created");
+ }
+ catch (IllegalStateException e)
+ {
+ // pass
+ }
+
+ Connection conn3 = _connectionFactory.createConnection();
+ assertNotNull(conn3);
+ assertEquals(2, createdConnections.size());
+ assertEquals(createdConnections.get(1).toString(), conn3.toString());
+ assertFalse(conn3.toString().equals(conn2.toString()));
+
+ conn2.close();
+ Connection conn4 = _connectionFactory.createConnection();
+ assertNotNull(conn4);
+ assertEquals(2, createdConnections.size());
+ assertEquals(createdConnections.get(0).toString(), conn4.toString());
+
+ }
+
+ public void testConnectionCreatedPassedUserAndPassword() throws Exception
+ {
+
+
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
+
+ final ArgumentCaptor<ConnectionURL> connectionCaptor =
ArgumentCaptor.forClass(ConnectionURL.class);
+ final List<CommonConnection> createdConnections = new ArrayList<>();
+ final List<String> usernames = new ArrayList<>();
+ final List<String> passwords = new ArrayList<>();
+
+
when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new
Answer<CommonConnection>()
+ {
+ @Override
+ public CommonConnection answer(final InvocationOnMock invocation)
throws Throwable
+ {
+ ConnectionURL url = connectionCaptor.getValue();
+ usernames.add(url.getUsername());
+ passwords.add(url.getPassword());
+
+ final CommonConnection connection =
mock(CommonConnection.class);
+ when(connection.isClosed()).thenReturn(false);
+ createdConnections.add(connection);
+ return connection;
+ }
+ });
+
+ Connection conn = _connectionFactory.createConnection("user1",
"pass1");
+ assertNotNull(conn);
+ assertEquals(1, createdConnections.size());
+ assertEquals("user1", usernames.get(0));
+ assertEquals("pass1", passwords.get(0));
+ conn.close();
+ Connection conn2 = _connectionFactory.createConnection("user2",
"pass2");
+ assertNotNull(conn2);
+ assertEquals(2, createdConnections.size());
+ assertEquals("user2", usernames.get(1));
+ assertEquals("pass2", passwords.get(1));
+ conn2.close();
+ Connection conn3 = _connectionFactory.createConnection("user1",
"pass1");
+ assertNotNull(conn3);
+ assertEquals(2, createdConnections.size());
+ assertEquals(createdConnections.get(0).toString(), conn3.toString());
+
+
+ }
+
+ public void testMaxPoolSize() throws Exception
+ {
+
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
+ _connectionFactory.setMaxPoolSize(3);
+ final ArgumentCaptor<ConnectionURL> connectionCaptor =
ArgumentCaptor.forClass(ConnectionURL.class);
+ final List<CommonConnection> createdConnections = new ArrayList<>();
+
+
when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new
Answer<CommonConnection>()
+ {
+ @Override
+ public CommonConnection answer(final InvocationOnMock invocation)
throws Throwable
+ {
+ ConnectionURL url = connectionCaptor.getValue();
+
+ final CommonConnection connection =
mock(CommonConnection.class);
+ when(connection.isClosed()).thenReturn(false);
+ createdConnections.add(connection);
+ return connection;
+ }
+ });
+
+ Connection conn1 = _connectionFactory.createQueueConnection();
+ Connection conn2 = _connectionFactory.createQueueConnection();
+ Connection conn3 = _connectionFactory.createQueueConnection();
+ Connection conn4 = _connectionFactory.createQueueConnection();
+
+ assertEquals(4, createdConnections.size());
+
+ conn1.close();
+ conn2.close();
+ conn3.close();
+ conn4.close();
+
+ Connection conn5 = _connectionFactory.createTopicConnection();
+ assertEquals(4, createdConnections.size());
+ Connection conn6 = _connectionFactory.createTopicConnection();
+ assertEquals(4, createdConnections.size());
+ Connection conn7 = _connectionFactory.createTopicConnection();
+ assertEquals(4, createdConnections.size());
+ Connection conn8 = _connectionFactory.createTopicConnection();
+ assertEquals(5, createdConnections.size());
+
+ }
+
+ public void testConnectionTimeout() throws Exception
+ {
+
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
+ _connectionFactory.setMaxPoolSize(4);
+ _connectionFactory.setConnectionTimeout(100l);
+ final ArgumentCaptor<ConnectionURL> connectionCaptor =
ArgumentCaptor.forClass(ConnectionURL.class);
+ final List<CommonConnection> createdConnections = new ArrayList<>();
+
+
when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new
Answer<CommonConnection>()
+ {
+ @Override
+ public CommonConnection answer(final InvocationOnMock invocation)
throws Throwable
+ {
+ ConnectionURL url = connectionCaptor.getValue();
+
+ final CommonConnection connection =
mock(CommonConnection.class);
+ when(connection.isClosed()).thenReturn(false);
+ createdConnections.add(connection);
+ return connection;
+ }
+ });
+
+ Connection conn1 = _connectionFactory.createQueueConnection();
+ Connection conn2 = _connectionFactory.createQueueConnection();
+ Connection conn3 = _connectionFactory.createQueueConnection();
+ Connection conn4 = _connectionFactory.createQueueConnection();
+
+ assertEquals(4, createdConnections.size());
+
+ conn1.close();
+ conn2.close();
+ conn3.close();
+ conn4.close();
+
+ Thread.sleep(500l);
+
+ Connection conn5 = _connectionFactory.createTopicConnection();
+ assertEquals(5, createdConnections.size());
+ Connection conn6 = _connectionFactory.createTopicConnection();
+ assertEquals(6, createdConnections.size());
+ Connection conn7 = _connectionFactory.createTopicConnection();
+ assertEquals(7, createdConnections.size());
+ Connection conn8 = _connectionFactory.createTopicConnection();
+ assertEquals(8, createdConnections.size());
+
+
+ }
+
+ public void testSessionsAreClosed() throws Exception
+ {
+
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
+
+ final ArgumentCaptor<ConnectionURL> connectionCaptor =
ArgumentCaptor.forClass(ConnectionURL.class);
+ final List<CommonConnection> createdConnections = new ArrayList<>();
+ final Session session = mock(Session.class);
+
when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new
Answer<CommonConnection>()
+ {
+ @Override
+ public CommonConnection answer(final InvocationOnMock invocation)
throws Throwable
+ {
+ ConnectionURL url = connectionCaptor.getValue();
+
+ final CommonConnection connection =
mock(CommonConnection.class);
+ when(connection.isClosed()).thenReturn(false);
+
when(connection.createSession(anyBoolean(),anyInt())).thenReturn(session);
+ createdConnections.add(connection);
+ return connection;
+ }
+ });
+
+ Connection conn = _connectionFactory.createConnection();
+ Session createdSession = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ conn.close();
+ verify(session, times(1)).close();
+
+ }
+
+ public void testConnectionWithExceptionNotPooled() throws Exception
+ {
+
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
+
+ final ArgumentCaptor<ConnectionURL> connectionCaptor =
ArgumentCaptor.forClass(ConnectionURL.class);
+ final List<CommonConnection> createdConnections = new ArrayList<>();
+
when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new
Answer<CommonConnection>()
+ {
+ @Override
+ public CommonConnection answer(final InvocationOnMock invocation)
throws Throwable
+ {
+ ConnectionURL url = connectionCaptor.getValue();
+
+ final CommonConnection connection =
mock(CommonConnection.class);
+ when(connection.isClosed()).thenReturn(false);
+
when(connection.createSession(anyBoolean(),anyInt())).thenThrow(new
JMSException("foo"));
+ createdConnections.add(connection);
+ return connection;
+ }
+ });
+
+ Connection conn = _connectionFactory.createConnection();
+ try
+ {
+ Session createdSession = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+ fail("Expected an exception");
+ }
+ catch(JMSException e)
+ {
+ assertEquals("foo", e.getMessage());
+ }
+ conn.close();
+ Connection conn2 = _connectionFactory.createConnection();
+ assertEquals(2, createdConnections.size());
+ }
+
+
+ public void testConnectionWithExceptionListenerExceptionNotPooled() throws
Exception
+ {
+
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
+
+
+ final List<CommonConnection> createdConnections = new ArrayList<>();
+
+
when(_connectionCreator.newConnection(any(ConnectionURL.class))).thenAnswer(new
Answer<CommonConnection>()
+ {
+ @Override
+ public CommonConnection answer(final InvocationOnMock invocation)
throws Throwable
+ {
+ final CommonConnection connection =
mock(CommonConnection.class);
+ when(connection.isClosed()).thenReturn(false);
+ createdConnections.add(connection);
+ final ArgumentCaptor<ExceptionListener> listenerCaptor =
+ ArgumentCaptor.forClass(ExceptionListener.class);
+
+ doAnswer(new Answer()
+ {
+ @Override
+ public Object answer(final InvocationOnMock invocation)
throws Throwable
+ {
+
when(connection.getExceptionListener()).thenReturn(listenerCaptor.getValue());
+ return null;
+ }
+
}).when(connection).setExceptionListener(listenerCaptor.capture());
+
+ return connection;
+ }
+ });
+
+ Connection conn = _connectionFactory.createConnection();
+ assertEquals(1, createdConnections.size());
+
+ ExceptionListener listener = mock(ExceptionListener.class);
+ conn.setExceptionListener(listener);
+ assertEquals(listener, conn.getExceptionListener());
+ verify(listener, never()).onException(any(JMSException.class));
+ createdConnections.get(0).getExceptionListener().onException(new
JMSException("bar"));
+ verify(listener, times(1)).onException(any(JMSException.class));
+
+ conn.close();
+ Connection conn2 = _connectionFactory.createConnection();
+ assertEquals(2, createdConnections.size());
+ }
+
+ public void testSessionCloseException() throws Exception
+ {
+
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
+
+ final ArgumentCaptor<ConnectionURL> connectionCaptor =
ArgumentCaptor.forClass(ConnectionURL.class);
+ final List<CommonConnection> createdConnections = new ArrayList<>();
+ final Session session = mock(Session.class);
+ doThrow(new JMSException("foo")).when(session).close();
+
when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new
Answer<CommonConnection>()
+ {
+ @Override
+ public CommonConnection answer(final InvocationOnMock invocation)
throws Throwable
+ {
+ ConnectionURL url = connectionCaptor.getValue();
+
+ final CommonConnection connection =
mock(CommonConnection.class);
+ when(connection.isClosed()).thenReturn(false);
+
when(connection.createSession(anyBoolean(),anyInt())).thenReturn(session);
+ createdConnections.add(connection);
+ return connection;
+ }
+ });
+
+ Connection conn = _connectionFactory.createConnection();
+ Session createdSession = conn.createSession(false,
Session.AUTO_ACKNOWLEDGE);
+
+ try
+ {
+ conn.close();
+ fail("Expected an exception");
+ }
+ catch(JMSException e)
+ {
+ assertEquals("foo", e.getMessage());
+ }
+ Connection conn2 = _connectionFactory.createConnection();
+ assertEquals(2, createdConnections.size());
+ }
+
+
+}
Propchange:
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/handler/PooledConnectionFactoryTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
qpid/java/trunk/client/src/test/java/org/apache/qpid/jms/FailoverPolicyTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/jms/FailoverPolicyTest.java?rev=1678273&r1=1678272&r2=1678273&view=diff
==============================================================================
---
qpid/java/trunk/client/src/test/java/org/apache/qpid/jms/FailoverPolicyTest.java
(original)
+++
qpid/java/trunk/client/src/test/java/org/apache/qpid/jms/FailoverPolicyTest.java
Thu May 7 23:20:46 2015
@@ -236,7 +236,7 @@ public class FailoverPolicyTest extends
}
@Override
- public javax.jms.Session createSession(boolean arg0, int arg1)
+ public Session createSession(boolean arg0, int arg1)
throws JMSException
{
return null;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]