Author: rgodfrey
Date: Thu May  7 23:20:46 2015
New Revision: 1678273

URL: http://svn.apache.org/r1678273
Log:
QPID-6534 : [Java Client] Add a PooledConnectionFactory

Added:
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/CommonConnection.java
   (with props)
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/PooledConnectionFactory.java
   (with props)
    
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/handler/PooledConnectionFactoryTest.java
   (with props)
Modified:
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
    
qpid/java/trunk/client/src/test/java/org/apache/qpid/jms/FailoverPolicyTest.java

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=1678273&r1=1678272&r2=1678273&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java 
(original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnection.java 
Thu May  7 23:20:46 2015
@@ -22,6 +22,7 @@ package org.apache.qpid.client;
 
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
+import java.net.InetAddress;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.nio.channels.UnresolvedAddressException;
@@ -33,6 +34,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
@@ -88,7 +90,7 @@ import org.apache.qpid.jms.Session;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.url.URLSyntaxException;
 
-public class AMQConnection extends Closeable implements Connection, 
QueueConnection, TopicConnection, Referenceable
+public class AMQConnection extends Closeable implements CommonConnection, 
Referenceable
 {
     private static final Logger _logger = 
LoggerFactory.getLogger(AMQConnection.class);
     private static final AtomicLong CONN_NUMBER_GENERATOR = new AtomicLong();
@@ -125,7 +127,7 @@ public class AMQConnection extends Close
     /** Maps from session id (Integer) to AMQSession instance */
     private final ChannelToSessionMap _sessions = new ChannelToSessionMap();
 
-    private final String _clientName;
+    private String _clientName;
 
     /** The user name to use for authentication */
     private String _username;
@@ -157,6 +159,8 @@ public class AMQConnection extends Close
      */
     private boolean _connected;
 
+    private boolean _connectionAttempted;
+
     /*
      * The connection meta data
      */
@@ -403,7 +407,6 @@ public class AMQConnection extends Close
         }
 
         _failoverPolicy = new FailoverPolicy(connectionURL, this);
-        BrokerDetails brokerDetails = 
_failoverPolicy.getCurrentBrokerDetails();
         if ("0-8".equals(amqpVersion))
         {
             _delegate = new AMQConnectionDelegate_8_0(this);
@@ -459,6 +462,31 @@ public class AMQConnection extends Close
         // We are not currently connected
         setConnected(false);
 
+        if(_clientName != null)
+        {
+            makeConnection();
+        }
+
+        _connectionMetaData = new QpidConnectionMetaData(this);
+    }
+
+    private void makeConnection() throws AMQException
+    {
+        _connectionAttempted = true;
+        if(_clientName == null)
+        {
+
+            try
+            {
+                InetAddress addr = InetAddress.getLocalHost();
+                _clientName =  addr.getHostName() + System.currentTimeMillis();
+            }
+            catch (UnknownHostException e)
+            {
+                _clientName = "UnknownHost" + UUID.randomUUID();
+            }
+        }
+        BrokerDetails brokerDetails = 
_failoverPolicy.getCurrentBrokerDetails();
         boolean retryAllowed = true;
         Exception connectionException = null;
         while (!isConnected() && retryAllowed && brokerDetails != null)
@@ -565,8 +593,6 @@ public class AMQConnection extends Close
 
         _sessions.setMaxChannelID(_delegate.getMaxChannelID());
         _sessions.setMinChannelID(_delegate.getMinChannelID());
-
-        _connectionMetaData = new QpidConnectionMetaData(this);
     }
 
     private void initDelegate(ProtocolVersion pe) throws AMQProtocolException
@@ -744,8 +770,21 @@ public class AMQConnection extends Close
     {
         synchronized (_sessionCreationLock)
         {
+
             checkNotClosed();
 
+            if(!_connectionAttempted)
+            {
+                try
+                {
+                    makeConnection();
+                }
+                catch (AMQException e)
+                {
+                    throw JMSExceptionHelper.chainJMSException(new 
JMSException("Unable to establish connection"),e);
+                }
+            }
+
             if(_delegate.isVirtualHostPropertiesSupported() && 
!_virtualHostPropertiesPopulated)
             {
                 retrieveVirtualHostPropertiesIfNecessary();
@@ -843,16 +882,26 @@ public class AMQConnection extends Close
     public void setClientID(String clientID) throws JMSException
     {
         checkNotClosed();
-        // in AMQP it is not possible to change the client ID. If one is not 
specified
-        // upon connection construction, an id is generated automatically. 
Therefore
-        // we can always throw an exception.
-        if 
(!Boolean.getBoolean(ClientProperties.IGNORE_SET_CLIENTID_PROP_NAME))
-        {
-            throw new IllegalStateException("Client name cannot be changed 
after being set");
-        }
-        else
+        synchronized(_sessionCreationLock)
         {
-            _logger.info("Operation setClientID is ignored using ID: " + 
getClientID());
+            if(_connectionAttempted)
+            {
+                // in AMQP it is not possible to change the client ID. If one 
is not specified
+                // upon connection construction, an id is generated 
automatically. Therefore
+                // we can always throw an exception.
+                if 
(!Boolean.getBoolean(ClientProperties.IGNORE_SET_CLIENTID_PROP_NAME))
+                {
+                    throw new IllegalStateException("Client name cannot be 
changed after being set");
+                }
+                else
+                {
+                    _logger.info("Operation setClientID is ignored using ID: " 
+ getClientID());
+                }
+            }
+            else
+            {
+                _clientName = clientID;
+            }
         }
     }
 

Modified: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java?rev=1678273&r1=1678272&r2=1678273&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
 (original)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionFactory.java
 Thu May  7 23:20:46 2015
@@ -109,7 +109,7 @@ public class AMQConnectionFactory implem
         }
     }
 
-    public Connection createConnection() throws JMSException
+    public AMQConnection createConnection() throws JMSException
     {
         if(_connectionDetails == null)
         {
@@ -131,12 +131,12 @@ public class AMQConnectionFactory implem
         }
     }
 
-    public Connection createConnection(String userName, String password) 
throws JMSException
+    public AMQConnection createConnection(String userName, String password) 
throws JMSException
     {
         return createConnection(userName, password, null);
     }
     
-    public Connection createConnection(String userName, String password, 
String id) throws JMSException
+    public AMQConnection createConnection(String userName, String password, 
String id) throws JMSException
     {
         if (_connectionDetails != null)
         {
@@ -170,22 +170,22 @@ public class AMQConnectionFactory implem
 
     public QueueConnection createQueueConnection() throws JMSException
     {
-        return (QueueConnection) createConnection();
+        return createConnection();
     }
 
     public QueueConnection createQueueConnection(String username, String 
password) throws JMSException
     {
-        return (QueueConnection) createConnection(username, password);
+        return createConnection(username, password);
     }
 
     public TopicConnection createTopicConnection() throws JMSException
     {
-        return (TopicConnection) createConnection();
+        return createConnection();
     }
 
     public TopicConnection createTopicConnection(String username, String 
password) throws JMSException
     {
-        return (TopicConnection) createConnection(username, password);
+        return createConnection(username, password);
     }
 
 

Added: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/CommonConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/CommonConnection.java?rev=1678273&view=auto
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/CommonConnection.java
 (added)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/CommonConnection.java
 Thu May  7 23:20:46 2015
@@ -0,0 +1,31 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import javax.jms.QueueConnection;
+import javax.jms.TopicConnection;
+
+import org.apache.qpid.jms.Connection;
+
+public interface CommonConnection extends Connection, TopicConnection, 
QueueConnection
+{
+    boolean isClosed();
+}

Propchange: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/CommonConnection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/PooledConnectionFactory.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/PooledConnectionFactory.java?rev=1678273&view=auto
==============================================================================
--- 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/PooledConnectionFactory.java
 (added)
+++ 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/PooledConnectionFactory.java
 Thu May  7 23:20:46 2015
@@ -0,0 +1,648 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.nio.charset.StandardCharsets;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.IllegalStateException;
+import javax.jms.Session;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.StringRefAddr;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.util.JMSExceptionHelper;
+import org.apache.qpid.jms.*;
+import org.apache.qpid.url.URLSyntaxException;
+
+public class PooledConnectionFactory implements ConnectionFactory, 
QueueConnectionFactory, TopicConnectionFactory
+{
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PooledConnectionFactory.class);
+
+    private static final ScheduledExecutorService SCHEDULER =
+            Executors.newSingleThreadScheduledExecutor(
+                    new ThreadFactory()
+                    {
+                        private ThreadGroup _group;
+                        {
+                            SecurityManager securityManager = 
System.getSecurityManager();
+                            _group = securityManager == null
+                                    ? Thread.currentThread().getThreadGroup()
+                                    : securityManager.getThreadGroup();
+
+                        }
+
+                        @Override
+                        public Thread newThread(Runnable runnable)
+                        {
+                            Thread thread = new Thread(_group,
+                                                       runnable,
+                                                       
PooledConnectionFactory.class.getSimpleName() + "-Reaper");
+                            if (!thread.isDaemon())
+                            {
+                                thread.setDaemon(true);
+                            }
+                            return thread;
+                        }
+
+                    });
+
+    private final AtomicInteger _maxPoolSize = new AtomicInteger(10);
+    private final AtomicLong _connectionTimeout = new AtomicLong(30000l);
+
+    private final AtomicReference<ConnectionURL> _connectionDetails = new 
AtomicReference<>();
+
+    transient private final byte[] _factoryId = new byte[16];
+    transient private final Map<ConnectionDetailsIdentifier, 
List<ConnectionHolder>> _pool = Collections.synchronizedMap(new 
HashMap<ConnectionDetailsIdentifier, List<ConnectionHolder>>());
+    transient private final Runnable _connectionReaper = new Runnable()
+                                                {
+                                                    @Override
+                                                    public void run()
+                                                    {
+                                                        
_reaperScheduled.set(false);
+                                                        
if(removeExpiredConnections())
+                                                        {
+                                                            scheduleReaper();
+                                                        }
+                                                    }
+                                                };
+
+
+    transient private final AtomicBoolean _reaperScheduled = new 
AtomicBoolean();
+
+    public PooledConnectionFactory()
+    {
+        final Random random = new Random();
+        random.nextBytes(_factoryId);
+
+    }
+
+    private void scheduleReaper()
+    {
+        if(_reaperScheduled.compareAndSet(false,true))
+        {
+            SCHEDULER.schedule(_connectionReaper, _connectionTimeout.get(), 
TimeUnit.MILLISECONDS);
+        }
+    }
+
+    private boolean removeExpiredConnections()
+    {
+        try
+        {
+            boolean scheduleAgain = false;
+            List<List<ConnectionHolder>> pooledConnections;
+            synchronized (_pool)
+            {
+                pooledConnections = new ArrayList<>(_pool.values());
+            }
+            if(!pooledConnections.isEmpty())
+            {
+                long now = System.currentTimeMillis();
+                for (List<ConnectionHolder> connections : pooledConnections)
+                {
+                    synchronized (connections)
+                    {
+                        removeExpiredConnections(connections, now);
+                        scheduleAgain = scheduleAgain || 
!connections.isEmpty();
+                    }
+
+                }
+            }
+            return scheduleAgain;
+        }
+        catch(RuntimeException e)
+        {
+            LOGGER.warn("Error encountered in " + 
PooledConnectionFactory.class.getSimpleName() + " reaper", e);
+            return true;
+        }
+    }
+
+    @Override
+    public QueueConnection createQueueConnection() throws JMSException
+    {
+        return getConnectionFromPool();
+    }
+
+    @Override
+    public QueueConnection createQueueConnection(final String userName, final 
String password) throws JMSException
+    {
+        return getConnectionFromPool(userName, password);
+    }
+
+    @Override
+    public TopicConnection createTopicConnection() throws JMSException
+    {
+        return getConnectionFromPool();
+    }
+
+    @Override
+    public TopicConnection createTopicConnection(final String userName, final 
String password) throws JMSException
+    {
+        return getConnectionFromPool(userName, password);
+    }
+
+    @Override
+    public Connection createConnection() throws JMSException
+    {
+        return getConnectionFromPool();
+    }
+
+    @Override
+    public Connection createConnection(final String userName, final String 
password) throws JMSException
+    {
+        return getConnectionFromPool(userName, password);
+    }
+
+
+    private CommonConnection getConnectionFromPool() throws JMSException
+    {
+        final ConnectionURL connectionDetails = getConnectionURLOrError();
+        ConnectionDetailsIdentifier identity = 
ConnectionDetailsIdentifier.newInstance(_factoryId,
+                                                                               
        connectionDetails.getURL(),
+                                                                               
        connectionDetails.getUsername(),
+                                                                               
        connectionDetails.getPassword());
+
+        return getConnectionFromPool(connectionDetails, identity);
+    }
+
+    private CommonConnection getConnectionFromPool(final ConnectionURL 
connectionDetails,
+                                                   final 
ConnectionDetailsIdentifier identity)
+            throws JMSException
+    {
+        CommonConnection underlying = null;
+
+        synchronized (_pool)
+        {
+            List<ConnectionHolder> pooledConnections = _pool.get(identity);
+            if(pooledConnections != null)
+            {
+                synchronized (pooledConnections)
+                {
+                    if (!pooledConnections.isEmpty())
+                    {
+                        ConnectionHolder holder = 
pooledConnections.remove(pooledConnections.size() - 1);
+                        underlying = holder._connection;
+                    }
+                }
+            }
+        }
+
+
+        try
+        {
+            if(underlying == null)
+            {
+                underlying = newConnectionInstance(connectionDetails);
+            }
+            return proxyConnection(underlying, identity);
+        }
+        catch (AMQException e)
+        {
+            throw JMSExceptionHelper.chainJMSException(new JMSException("Error 
creating connection: " + e.getMessage()),
+                                                       e);
+        }
+    }
+
+    protected CommonConnection newConnectionInstance(final ConnectionURL 
connectionDetails) throws AMQException
+    {
+        return new AMQConnection(connectionDetails);
+    }
+
+    private ConnectionURL getConnectionURLOrError() throws 
IllegalStateException
+    {
+        final ConnectionURL connectionDetails = _connectionDetails.get();
+        if(connectionDetails == null)
+        {
+            throw new IllegalStateException("Cannot create a connection when 
the connection URL has not yet been set");
+        }
+        return connectionDetails;
+    }
+
+    private CommonConnection getConnectionFromPool(String user, String 
password) throws JMSException
+    {
+        ConnectionURL connectionDetails = getConnectionURLOrError();
+        ConnectionDetailsIdentifier identity = 
ConnectionDetailsIdentifier.newInstance(_factoryId,
+                                                                               
        connectionDetails.getURL(),
+                                                                               
        user,
+                                                                               
        password);
+        try
+        {
+            connectionDetails = new 
AMQConnectionURL(connectionDetails.getURL());
+            connectionDetails.setUsername(user);
+            connectionDetails.setPassword(password);
+        }
+        catch (URLSyntaxException e)
+        {
+            throw JMSExceptionHelper.chainJMSException(new JMSException("Error 
creating connection: " + e.getMessage()),
+                                                       e);
+        }
+        return getConnectionFromPool(connectionDetails, identity);
+    }
+
+
+
+    private synchronized void returnToPool(final CommonConnection connection, 
final ConnectionDetailsIdentifier identityHash)
+            throws JMSException
+    {
+        if(!connection.isClosed())
+        {
+            connection.stop();
+            List<ConnectionHolder> connections;
+            synchronized (_pool)
+            {
+                connections = _pool.get(identityHash);
+                if(connections == null)
+                {
+                    connections = new ArrayList<>();
+                    _pool.put(identityHash, connections);
+                    scheduleReaper();
+                }
+            }
+            synchronized (connections)
+            {
+                if(connections.size()<_maxPoolSize.get())
+                {
+                    connections.add(new ConnectionHolder(connection, 
System.currentTimeMillis()));
+                }
+            }
+        }
+    }
+
+    private void removeExpiredConnections(final List<ConnectionHolder> 
connections,
+                                          final long now)
+    {
+        long expiryTime = now - _connectionTimeout.get();
+        Iterator<ConnectionHolder> iter = connections.iterator();
+        while(iter.hasNext())
+        {
+            ConnectionHolder ch = iter.next();
+            if(ch._lastUse < expiryTime)
+            {
+                iter.remove();
+                try
+                {
+                    ch._connection.close();
+                }
+                catch (JMSException | RuntimeException e )
+                {
+                    LOGGER.warn("Error when closing expired connection in 
pool", e);
+                }
+            }
+        }
+    }
+
+    public int getMaxPoolSize()
+    {
+        return _maxPoolSize.get();
+    }
+
+    public long getConnectionTimeout()
+    {
+        return _connectionTimeout.get();
+    }
+
+    public void setMaxPoolSize(int maxPoolSize)
+    {
+        _maxPoolSize.set(maxPoolSize);
+    }
+
+    public void setConnectionTimeout(long timeout)
+    {
+        _connectionTimeout.set(timeout);
+    }
+
+
+    public synchronized ConnectionURL getConnectionURL()
+    {
+        return _connectionDetails.get();
+    }
+
+    public synchronized String getConnectionURLString()
+    {
+        return _connectionDetails.toString();
+    }
+
+    //setter necessary to use instances created with the default constructor 
(which we can't remove)
+    public synchronized final void setConnectionURLString(String url) throws 
URLSyntaxException
+    {
+        final AMQConnectionURL connectionDetails = new AMQConnectionURL(url);
+        if(!_connectionDetails.compareAndSet(null, connectionDetails))
+        {
+            throw new IllegalArgumentException("Cannot change factory URL 
after it has already been set");
+        }
+    }
+
+    public Reference getReference() throws NamingException
+    {
+        return new Reference(
+                PooledConnectionFactory.class.getName(),
+                new StringRefAddr(PooledConnectionFactory.class.getName(), 
_connectionDetails.get().getURL()),
+                PooledConnectionFactory.class.getName(), null);          // 
factory location
+    }
+
+    private CommonConnection proxyConnection(CommonConnection underlying, 
ConnectionDetailsIdentifier identifier) throws JMSException
+    {
+
+        final ConnectionInvocationHandler invocationHandler = new 
ConnectionInvocationHandler(underlying, identifier);
+        return (CommonConnection) 
Proxy.newProxyInstance(getClass().getClassLoader(),
+                                                         new Class[] { 
CommonConnection.class },
+                                                         invocationHandler);
+    }
+
+    private <X extends Session> X proxySession(X underlying, 
ConnectionInvocationHandler connectionHandler)
+    {
+        List<Class<?>> interfaces = new ArrayList<>();
+        interfaces.add(Session.class);
+        if(underlying instanceof org.apache.qpid.jms.Session)
+        {
+            interfaces.add(org.apache.qpid.jms.Session.class);
+        }
+        if(underlying instanceof TopicSession)
+        {
+            interfaces.add(TopicSession.class);
+        }
+        if(underlying instanceof QueueSession)
+        {
+            interfaces.add(QueueSession.class);
+        }
+        return (X) Proxy.newProxyInstance(getClass().getClassLoader(),
+                                          interfaces.toArray(new 
Class[interfaces.size()]),
+                                          new 
SessionInvocationHandler<X>(underlying, connectionHandler));
+    }
+
+    private class ConnectionInvocationHandler  implements InvocationHandler, 
ExceptionListener
+    {
+        private final CommonConnection _underlyingConnection;
+        private final ConnectionDetailsIdentifier _identityHash;
+        private boolean _closed;
+        private volatile boolean _exceptionThrown;
+        private final List<Session> _openSessions = new ArrayList<>();
+        private volatile ExceptionListener _exceptionListener;
+
+        public ConnectionInvocationHandler(final CommonConnection underlying, 
ConnectionDetailsIdentifier identityHash) throws JMSException
+        {
+            _underlyingConnection = underlying;
+            _underlyingConnection.setExceptionListener(this);
+            _identityHash = identityHash;
+        }
+
+        @Override
+        public synchronized Object invoke(final Object proxy, final Method 
method, final Object[] args) throws Throwable
+        {
+            if(_closed)
+            {
+                throw new IllegalStateException("Connection is closed");
+            }
+            Method underlyingMethod = 
_underlyingConnection.getClass().getMethod(method.getName(), 
method.getParameterTypes());
+            if(method.getName().equals("getExceptionListener"))
+            {
+                return _exceptionListener;
+            }
+            else if(method.getName().equals("setExceptionListener") && 
method.getParameterTypes().length == 1 && 
method.getParameterTypes()[0].equals(ExceptionListener.class))
+            {
+                _exceptionListener = (ExceptionListener) args[0];
+                return null;
+            }
+            else if(method.getName().equals("close") && 
method.getParameterTypes().length == 0)
+            {
+                _closed = true;
+                _exceptionListener = null;
+                List<Session> openSessions = new ArrayList<>(_openSessions);
+                for(Session session : openSessions)
+                {
+                    try
+                    {
+                        session.close();
+                    }
+                    catch(JMSException | RuntimeException | Error e)
+                    {
+                        _exceptionThrown = true;
+                        throw e;
+                    }
+                }
+                _openSessions.clear();
+
+                if(!_exceptionThrown)
+                {
+                    returnToPool(_underlyingConnection, _identityHash);
+                }
+                else
+                {
+                    _underlyingConnection.close();
+                }
+
+                return null;
+            }
+            else
+            {
+                try
+                {
+                    Object returnVal = 
underlyingMethod.invoke(_underlyingConnection, args);
+
+                    if(returnVal instanceof Session)
+                    {
+                        returnVal = proxySession((Session)returnVal, this);
+                        _openSessions.add((Session)returnVal);
+                    }
+                    return returnVal;
+                }
+                catch (InvocationTargetException e)
+                {
+                    _exceptionThrown = true;
+                    Throwable thrown = e.getCause();
+                    throw thrown == null ? e : thrown;
+                }
+            }
+        }
+
+
+        @Override
+        public void onException(final JMSException exception)
+        {
+            _exceptionThrown = true;
+            ExceptionListener exceptionListener = _exceptionListener;
+            if(exceptionListener != null)
+            {
+                exceptionListener.onException(exception);
+            }
+        }
+
+        public synchronized void removeSession(final Session session)
+        {
+            _openSessions.remove(session);
+        }
+    }
+
+    private class SessionInvocationHandler<X extends Session> implements 
InvocationHandler
+    {
+        private final X _underlying;
+        private final ConnectionInvocationHandler _connectionHandler;
+
+        public SessionInvocationHandler(final X underlying,
+                                        final ConnectionInvocationHandler 
connectionHandler)
+        {
+            _underlying = underlying;
+            _connectionHandler = connectionHandler;
+        }
+
+        @Override
+        public Object invoke(final Object proxy, final Method method, final 
Object[] args) throws Throwable
+        {
+            Method underlyingMethod = 
_underlying.getClass().getMethod(method.getName(), method.getParameterTypes());
+            try
+            {
+                Object returnVal = underlyingMethod.invoke(_underlying, args);
+
+                if(method.getName().equals("close") && 
method.getParameterTypes().length == 0)
+                {
+                    _connectionHandler.removeSession((Session)proxy);
+                }
+
+                return returnVal;
+            }
+            catch (InvocationTargetException e)
+            {
+                _connectionHandler._exceptionThrown = true;
+                Throwable thrown = e.getCause();
+                throw thrown == null ? e : thrown;
+            }
+
+        }
+    }
+
+    private static class ConnectionDetailsIdentifier
+    {
+        private final byte[] _urlHash;
+        private final String _user;
+        private final byte[] _userPasswordHash;
+
+        private static ConnectionDetailsIdentifier newInstance(byte[] id, 
String url, final String user, String password)
+        {
+            try
+            {
+                MessageDigest digest = MessageDigest.getInstance("SHA-256");
+                digest.update(id);
+                digest.update(url.getBytes(StandardCharsets.UTF_8));
+                byte[] urlHash = digest.digest();
+
+                digest.update(id);
+                if(user != null)
+                {
+                    digest.update(user.getBytes(StandardCharsets.UTF_8));
+                }
+                if(password != null)
+                {
+                    digest.update(password.getBytes(StandardCharsets.UTF_8));
+                }
+                byte[] userPasswordHash = digest.digest();
+
+                return new ConnectionDetailsIdentifier(urlHash, user == null ? 
"" : user, userPasswordHash);
+
+            }
+            catch (NoSuchAlgorithmException e)
+            {
+                throw new RuntimeException("SHA-256 not found, however 
compliant Java implementations should always provide SHA-256", e);
+            }
+        }
+
+        private ConnectionDetailsIdentifier(final byte[] urlHash, final String 
user, final byte[] userPasswordHash)
+        {
+            _urlHash = urlHash;
+            _user = user;
+            _userPasswordHash = userPasswordHash;
+        }
+
+        @Override
+        public boolean equals(final Object o)
+        {
+            if (this == o)
+            {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass())
+            {
+                return false;
+            }
+
+            final ConnectionDetailsIdentifier that = 
(ConnectionDetailsIdentifier) o;
+
+            if (!Arrays.equals(_urlHash, that._urlHash))
+            {
+                return false;
+            }
+            if (!_user.equals(that._user))
+            {
+                return false;
+            }
+            return Arrays.equals(_userPasswordHash, that._userPasswordHash);
+
+        }
+
+        @Override
+        public int hashCode()
+        {
+            int result = Arrays.hashCode(_urlHash);
+            result = 31 * result + _user.hashCode();
+            result = 31 * result + Arrays.hashCode(_userPasswordHash);
+            return result;
+        }
+    }
+
+    private class ConnectionHolder
+    {
+        private final CommonConnection _connection;
+        private final long _lastUse;
+
+        public ConnectionHolder(final CommonConnection connection, final long 
lastUse)
+        {
+            _connection = connection;
+            _lastUse = lastUse;
+        }
+    }
+
+}

Propchange: 
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/PooledConnectionFactory.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/handler/PooledConnectionFactoryTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/handler/PooledConnectionFactoryTest.java?rev=1678273&view=auto
==============================================================================
--- 
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/handler/PooledConnectionFactoryTest.java
 (added)
+++ 
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/handler/PooledConnectionFactoryTest.java
 Thu May  7 23:20:46 2015
@@ -0,0 +1,432 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.handler;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.CommonConnection;
+import org.apache.qpid.client.PooledConnectionFactory;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+public class PooledConnectionFactoryTest extends QpidTestCase
+{
+    private interface CommonConnectionCreator
+    {
+        CommonConnection newConnection(ConnectionURL connectionUrl);
+    }
+
+    private CommonConnectionCreator _connectionCreator;
+
+    private PooledConnectionFactory _connectionFactory;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _connectionCreator = mock(CommonConnectionCreator.class);
+
+        _connectionFactory = new PooledConnectionFactory()
+        {
+            @Override
+            protected CommonConnection newConnectionInstance(final 
ConnectionURL connectionDetails) throws AMQException
+            {
+                return _connectionCreator.newConnection(connectionDetails);
+            }
+        };
+    }
+
+
+    public void testConnectionCreatedWithUrlUserAndPassword() throws Exception
+    {
+        
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
+
+        final ArgumentCaptor<ConnectionURL> connectionCaptor = 
ArgumentCaptor.forClass(ConnectionURL.class);
+        final List<CommonConnection> createdConnections = new ArrayList<>();
+
+        
when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new
 Answer<CommonConnection>()
+        {
+            @Override
+            public CommonConnection answer(final InvocationOnMock invocation) 
throws Throwable
+            {
+                ConnectionURL url = connectionCaptor.getValue();
+                assertEquals("user", url.getUsername());
+                assertEquals("pass", url.getPassword());
+
+                final CommonConnection connection = 
mock(CommonConnection.class);
+                when(connection.isClosed()).thenReturn(false);
+                createdConnections.add(connection);
+                return connection;
+            }
+        });
+
+        Connection conn = _connectionFactory.createConnection();
+        assertNotNull(conn);
+        assertEquals(1, createdConnections.size());
+        final String connToString = conn.toString();
+        assertEquals(createdConnections.get(0).toString(), connToString);
+
+        conn.close();
+
+        Connection conn2 = _connectionFactory.createConnection();
+        assertNotNull(conn2);
+        assertEquals(1, createdConnections.size());
+        assertEquals(createdConnections.get(0).toString(), conn2.toString());
+
+        assertEquals(connToString, conn2.toString());
+
+        try
+        {
+            conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            fail("Closed connection should not allow sessions to be created");
+        }
+        catch (IllegalStateException e)
+        {
+            // pass
+        }
+
+        Connection conn3 = _connectionFactory.createConnection();
+        assertNotNull(conn3);
+        assertEquals(2, createdConnections.size());
+        assertEquals(createdConnections.get(1).toString(), conn3.toString());
+        assertFalse(conn3.toString().equals(conn2.toString()));
+
+        conn2.close();
+        Connection conn4 = _connectionFactory.createConnection();
+        assertNotNull(conn4);
+        assertEquals(2, createdConnections.size());
+        assertEquals(createdConnections.get(0).toString(), conn4.toString());
+
+    }
+
+    public void testConnectionCreatedPassedUserAndPassword() throws Exception
+    {
+
+        
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
+
+        final ArgumentCaptor<ConnectionURL> connectionCaptor = 
ArgumentCaptor.forClass(ConnectionURL.class);
+        final List<CommonConnection> createdConnections = new ArrayList<>();
+        final List<String> usernames = new ArrayList<>();
+        final List<String> passwords = new ArrayList<>();
+
+        
when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new
 Answer<CommonConnection>()
+        {
+            @Override
+            public CommonConnection answer(final InvocationOnMock invocation) 
throws Throwable
+            {
+                ConnectionURL url = connectionCaptor.getValue();
+                usernames.add(url.getUsername());
+                passwords.add(url.getPassword());
+
+                final CommonConnection connection = 
mock(CommonConnection.class);
+                when(connection.isClosed()).thenReturn(false);
+                createdConnections.add(connection);
+                return connection;
+            }
+        });
+
+        Connection conn = _connectionFactory.createConnection("user1", 
"pass1");
+        assertNotNull(conn);
+        assertEquals(1, createdConnections.size());
+        assertEquals("user1", usernames.get(0));
+        assertEquals("pass1", passwords.get(0));
+        conn.close();
+        Connection conn2 = _connectionFactory.createConnection("user2", 
"pass2");
+        assertNotNull(conn2);
+        assertEquals(2, createdConnections.size());
+        assertEquals("user2", usernames.get(1));
+        assertEquals("pass2", passwords.get(1));
+        conn2.close();
+        Connection conn3 = _connectionFactory.createConnection("user1", 
"pass1");
+        assertNotNull(conn3);
+        assertEquals(2, createdConnections.size());
+        assertEquals(createdConnections.get(0).toString(), conn3.toString());
+
+
+    }
+
+    public void testMaxPoolSize() throws Exception
+    {
+        
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
+        _connectionFactory.setMaxPoolSize(3);
+        final ArgumentCaptor<ConnectionURL> connectionCaptor = 
ArgumentCaptor.forClass(ConnectionURL.class);
+        final List<CommonConnection> createdConnections = new ArrayList<>();
+
+        
when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new
 Answer<CommonConnection>()
+        {
+            @Override
+            public CommonConnection answer(final InvocationOnMock invocation) 
throws Throwable
+            {
+                ConnectionURL url = connectionCaptor.getValue();
+
+                final CommonConnection connection = 
mock(CommonConnection.class);
+                when(connection.isClosed()).thenReturn(false);
+                createdConnections.add(connection);
+                return connection;
+            }
+        });
+
+        Connection conn1 = _connectionFactory.createQueueConnection();
+        Connection conn2 = _connectionFactory.createQueueConnection();
+        Connection conn3 = _connectionFactory.createQueueConnection();
+        Connection conn4 = _connectionFactory.createQueueConnection();
+
+        assertEquals(4, createdConnections.size());
+
+        conn1.close();
+        conn2.close();
+        conn3.close();
+        conn4.close();
+
+        Connection conn5 = _connectionFactory.createTopicConnection();
+        assertEquals(4, createdConnections.size());
+        Connection conn6 = _connectionFactory.createTopicConnection();
+        assertEquals(4, createdConnections.size());
+        Connection conn7 = _connectionFactory.createTopicConnection();
+        assertEquals(4, createdConnections.size());
+        Connection conn8 = _connectionFactory.createTopicConnection();
+        assertEquals(5, createdConnections.size());
+
+    }
+
+    public void testConnectionTimeout() throws Exception
+    {
+        
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
+        _connectionFactory.setMaxPoolSize(4);
+        _connectionFactory.setConnectionTimeout(100l);
+        final ArgumentCaptor<ConnectionURL> connectionCaptor = 
ArgumentCaptor.forClass(ConnectionURL.class);
+        final List<CommonConnection> createdConnections = new ArrayList<>();
+
+        
when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new
 Answer<CommonConnection>()
+        {
+            @Override
+            public CommonConnection answer(final InvocationOnMock invocation) 
throws Throwable
+            {
+                ConnectionURL url = connectionCaptor.getValue();
+
+                final CommonConnection connection = 
mock(CommonConnection.class);
+                when(connection.isClosed()).thenReturn(false);
+                createdConnections.add(connection);
+                return connection;
+            }
+        });
+
+        Connection conn1 = _connectionFactory.createQueueConnection();
+        Connection conn2 = _connectionFactory.createQueueConnection();
+        Connection conn3 = _connectionFactory.createQueueConnection();
+        Connection conn4 = _connectionFactory.createQueueConnection();
+
+        assertEquals(4, createdConnections.size());
+
+        conn1.close();
+        conn2.close();
+        conn3.close();
+        conn4.close();
+
+        Thread.sleep(500l);
+
+        Connection conn5 = _connectionFactory.createTopicConnection();
+        assertEquals(5, createdConnections.size());
+        Connection conn6 = _connectionFactory.createTopicConnection();
+        assertEquals(6, createdConnections.size());
+        Connection conn7 = _connectionFactory.createTopicConnection();
+        assertEquals(7, createdConnections.size());
+        Connection conn8 = _connectionFactory.createTopicConnection();
+        assertEquals(8, createdConnections.size());
+
+
+    }
+
+    public void testSessionsAreClosed() throws Exception
+    {
+        
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
+
+        final ArgumentCaptor<ConnectionURL> connectionCaptor = 
ArgumentCaptor.forClass(ConnectionURL.class);
+        final List<CommonConnection> createdConnections = new ArrayList<>();
+        final Session session = mock(Session.class);
+        
when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new
 Answer<CommonConnection>()
+        {
+            @Override
+            public CommonConnection answer(final InvocationOnMock invocation) 
throws Throwable
+            {
+                ConnectionURL url = connectionCaptor.getValue();
+
+                final CommonConnection connection = 
mock(CommonConnection.class);
+                when(connection.isClosed()).thenReturn(false);
+                
when(connection.createSession(anyBoolean(),anyInt())).thenReturn(session);
+                createdConnections.add(connection);
+                return connection;
+            }
+        });
+
+        Connection conn = _connectionFactory.createConnection();
+        Session createdSession = conn.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        conn.close();
+        verify(session, times(1)).close();
+
+    }
+
+    public void testConnectionWithExceptionNotPooled() throws Exception
+    {
+        
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
+
+        final ArgumentCaptor<ConnectionURL> connectionCaptor = 
ArgumentCaptor.forClass(ConnectionURL.class);
+        final List<CommonConnection> createdConnections = new ArrayList<>();
+        
when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new
 Answer<CommonConnection>()
+        {
+            @Override
+            public CommonConnection answer(final InvocationOnMock invocation) 
throws Throwable
+            {
+                ConnectionURL url = connectionCaptor.getValue();
+
+                final CommonConnection connection = 
mock(CommonConnection.class);
+                when(connection.isClosed()).thenReturn(false);
+                
when(connection.createSession(anyBoolean(),anyInt())).thenThrow(new 
JMSException("foo"));
+                createdConnections.add(connection);
+                return connection;
+            }
+        });
+
+        Connection conn = _connectionFactory.createConnection();
+        try
+        {
+            Session createdSession = conn.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            fail("Expected an exception");
+        }
+        catch(JMSException e)
+        {
+            assertEquals("foo", e.getMessage());
+        }
+        conn.close();
+        Connection conn2 = _connectionFactory.createConnection();
+        assertEquals(2, createdConnections.size());
+    }
+
+
+    public void testConnectionWithExceptionListenerExceptionNotPooled() throws 
Exception
+    {
+        
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
+
+
+        final List<CommonConnection> createdConnections = new ArrayList<>();
+
+        
when(_connectionCreator.newConnection(any(ConnectionURL.class))).thenAnswer(new 
Answer<CommonConnection>()
+        {
+            @Override
+            public CommonConnection answer(final InvocationOnMock invocation) 
throws Throwable
+            {
+                final CommonConnection connection = 
mock(CommonConnection.class);
+                when(connection.isClosed()).thenReturn(false);
+                createdConnections.add(connection);
+                final ArgumentCaptor<ExceptionListener> listenerCaptor =
+                        ArgumentCaptor.forClass(ExceptionListener.class);
+
+                doAnswer(new Answer()
+                {
+                    @Override
+                    public Object answer(final InvocationOnMock invocation) 
throws Throwable
+                    {
+                        
when(connection.getExceptionListener()).thenReturn(listenerCaptor.getValue());
+                        return null;
+                    }
+                
}).when(connection).setExceptionListener(listenerCaptor.capture());
+
+                return connection;
+            }
+        });
+
+        Connection conn = _connectionFactory.createConnection();
+        assertEquals(1, createdConnections.size());
+
+        ExceptionListener listener = mock(ExceptionListener.class);
+        conn.setExceptionListener(listener);
+        assertEquals(listener, conn.getExceptionListener());
+        verify(listener, never()).onException(any(JMSException.class));
+        createdConnections.get(0).getExceptionListener().onException(new 
JMSException("bar"));
+        verify(listener, times(1)).onException(any(JMSException.class));
+
+        conn.close();
+        Connection conn2 = _connectionFactory.createConnection();
+        assertEquals(2, createdConnections.size());
+    }
+
+    public void testSessionCloseException() throws Exception
+    {
+        
_connectionFactory.setConnectionURLString("amqp://user:pass@/?brokerlist='tcp://localhost:5672'");
+
+        final ArgumentCaptor<ConnectionURL> connectionCaptor = 
ArgumentCaptor.forClass(ConnectionURL.class);
+        final List<CommonConnection> createdConnections = new ArrayList<>();
+        final Session session = mock(Session.class);
+        doThrow(new JMSException("foo")).when(session).close();
+        
when(_connectionCreator.newConnection(connectionCaptor.capture())).thenAnswer(new
 Answer<CommonConnection>()
+        {
+            @Override
+            public CommonConnection answer(final InvocationOnMock invocation) 
throws Throwable
+            {
+                ConnectionURL url = connectionCaptor.getValue();
+
+                final CommonConnection connection = 
mock(CommonConnection.class);
+                when(connection.isClosed()).thenReturn(false);
+                
when(connection.createSession(anyBoolean(),anyInt())).thenReturn(session);
+                createdConnections.add(connection);
+                return connection;
+            }
+        });
+
+        Connection conn = _connectionFactory.createConnection();
+            Session createdSession = conn.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+
+        try
+        {
+            conn.close();
+            fail("Expected an exception");
+        }
+        catch(JMSException e)
+        {
+            assertEquals("foo", e.getMessage());
+        }
+        Connection conn2 = _connectionFactory.createConnection();
+        assertEquals(2, createdConnections.size());
+    }
+
+
+}

Propchange: 
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/handler/PooledConnectionFactoryTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
qpid/java/trunk/client/src/test/java/org/apache/qpid/jms/FailoverPolicyTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/jms/FailoverPolicyTest.java?rev=1678273&r1=1678272&r2=1678273&view=diff
==============================================================================
--- 
qpid/java/trunk/client/src/test/java/org/apache/qpid/jms/FailoverPolicyTest.java
 (original)
+++ 
qpid/java/trunk/client/src/test/java/org/apache/qpid/jms/FailoverPolicyTest.java
 Thu May  7 23:20:46 2015
@@ -236,7 +236,7 @@ public class FailoverPolicyTest extends
             }
 
             @Override
-            public javax.jms.Session createSession(boolean arg0, int arg1)
+            public Session createSession(boolean arg0, int arg1)
                     throws JMSException
             {
                 return null;



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to