Author: philharveyonline
Date: Wed Jul 10 15:21:28 2013
New Revision: 1501794

URL: http://svn.apache.org/r1501794
Log:
NO-JIRA: fixed race condition in AmqpConnectionDriver that caused Driver.wakeup 
calls to be ignored.
Also added simple stop mechanism to AmqpConnectionDriver
Also minor logging etc changes

Added:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java
      - copied, changed from r1501793, 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/
    
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SimplePredicateTest.java
Modified:
    qpid/jms/trunk/pom.xml
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java
    
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriver.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java
    
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/Predicate.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SimplePredicate.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/JmsTest.java
    qpid/jms/trunk/src/test/resources/logging.properties

Modified: qpid/jms/trunk/pom.xml
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/pom.xml?rev=1501794&r1=1501793&r2=1501794&view=diff
==============================================================================
--- qpid/jms/trunk/pom.xml (original)
+++ qpid/jms/trunk/pom.xml Wed Jul 10 15:21:28 2013
@@ -51,6 +51,12 @@
       <version>${junit-version}</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <version>1.3</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

Modified: 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java?rev=1501794&r1=1501793&r2=1501794&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java 
(original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java 
Wed Jul 10 15:21:28 2013
@@ -21,7 +21,6 @@
 package org.apache.qpid.jms.engine;
 
 import java.util.ArrayList;
-
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -35,7 +34,9 @@ import org.apache.qpid.proton.engine.Con
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.EngineFactory;
 import org.apache.qpid.proton.engine.Link;
+import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sender;
 import org.apache.qpid.proton.engine.Session;
 import org.apache.qpid.proton.message.MessageFactory;
 
@@ -170,12 +171,21 @@ public class AmqpConnection
         _password = password;
     }
 
+    /**
+     * For all the the "pending" AmqpXXX objects, update their state to 
reflect the remote state of their
+     * Proton counterparts, and remove them from the pending set.
+     *
+     * The "pending" AmqpXXX objects are the ones that whose local 
modifications are expected to cause a
+     * remote state change, e.g. newly created sessions.
+     *
+     * @return true if any AmqpXXX objects were updated by this method
+     */
     public synchronized boolean process()
     {
         boolean updated = false;
         //Connection
-        EndpointState remoteState = _connection.getRemoteState();
-        if (!_connected && (remoteState == EndpointState.UNINITIALIZED))
+        EndpointState connectionRemoteState = _connection.getRemoteState();
+        if (!_connected && (connectionRemoteState == 
EndpointState.UNINITIALIZED))
         {
             if(_sasl != null)
             {
@@ -183,14 +193,14 @@ public class AmqpConnection
             }
         }
 
-        if (!_connected && (remoteState == EndpointState.ACTIVE))
+        if (!_connected && (connectionRemoteState == EndpointState.ACTIVE))
         {
             _logger.log(Level.FINEST, "Set connected to true");
             updated = true;
             _connected = true;
         }
 
-        if(_connected && (_connection.getLocalState() == EndpointState.CLOSED 
&& remoteState == EndpointState.CLOSED))
+        if(_connected && (_connection.getLocalState() == EndpointState.CLOSED 
&& connectionRemoteState == EndpointState.CLOSED))
         {
             _closed = true;
             _connected = false;
@@ -232,13 +242,29 @@ public class AmqpConnection
         while(pendingLinks.hasNext())
         {
             l = pendingLinks.next();
-            if(l.getRemoteState() != EndpointState.UNINITIALIZED)
+
+            EndpointState linkRemoteState = l.getRemoteState();
+            if(linkRemoteState == EndpointState.ACTIVE || linkRemoteState == 
EndpointState.CLOSED)
             {
                 AmqpLink amqpLink = (AmqpLink) l.getContext();
-                amqpLink.setEstablished();
+                if(linkRemoteState == EndpointState.ACTIVE && getRemoteNode(l) 
!= null)
+                {
+                    amqpLink.setEstablished();
+                }
+                else
+                {
+                    amqpLink.setLinkError();
+                    amqpLink.setClosed();
+                }
+
                 pendingLinks.remove();
                 updated = true;
             }
+            else
+            {
+                // no nothing
+            }
+
         }
 
         Iterator<Link> pendingCloseLinks = _pendingCloseLinks.iterator();
@@ -258,6 +284,22 @@ public class AmqpConnection
         return updated;
     }
 
+    private Object getRemoteNode(Link link)
+    {
+        if(link instanceof Sender)
+        {
+            return  link.getRemoteTarget();
+        }
+        else if(link instanceof Receiver)
+        {
+            return link.getRemoteSource();
+        }
+        else
+        {
+            throw new IllegalArgumentException(String.format("%s is not a %s 
or a %s", link, Sender.class, Receiver.class));
+        }
+    }
+
     private boolean processSasl()
     {
         boolean updated = false;

Modified: 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriver.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriver.java?rev=1501794&r1=1501793&r2=1501794&view=diff
==============================================================================
--- 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriver.java
 (original)
+++ 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriver.java
 Wed Jul 10 15:21:28 2013
@@ -33,20 +33,48 @@ import org.apache.qpid.proton.driver.Dri
 import org.apache.qpid.proton.driver.DriverFactory;
 import org.apache.qpid.proton.engine.Sasl;
 
+/**
+ * <p>
+ * Asynchronously processes local and remote updates to the AmqpXXX objects on
+ * the registered {@link AmqpConnection}s. Specifically:</p>
+ * <p>
+ * - Once notified that a connection has local updates via {@link 
#setLocallyUpdated(AmqpConnection)},
+ * these updates are written to the network.
+ * </p>
+ * <p>
+ * - Reads any remote updates from the network and updates the local AmqpXXX 
objects accordingly.
+ * </p>
+ * <p>
+ * Thread-safe.
+ * </p>
+ *
+ * TODO verify that multiple connections are handled properly
+ */
 public class AmqpConnectionDriver
 {
-    private static Logger _logger = 
Logger.getLogger("qpid.jms-client.connection.driver");
-    private static final ProtonFactoryLoader<DriverFactory> 
driverFactoryLoader = new 
ProtonFactoryLoader<DriverFactory>(DriverFactory.class);
-    private DriverFactory _driverFactory;
-    private Driver _driver;
+    private static Logger _logger = 
Logger.getLogger(AmqpConnectionDriver.class.getName());
 
-    private final ConcurrentHashMap<AmqpConnection,Boolean> _updated = new 
ConcurrentHashMap<AmqpConnection,Boolean>();
+    private final Driver _driver;
+
+    private final ConcurrentHashMap<AmqpConnection,Boolean> 
_locallyUpdatedConnections =
+            new ConcurrentHashMap<AmqpConnection,Boolean>();
+
+    private DriverRunnable _driverRunnable;
+    private Thread _driverThread;
+
+    public enum AmqpDriverState
+    {
+        UNINIT,
+        OPEN,
+        STOPPED,
+        ERROR;
+    }
 
     public AmqpConnectionDriver() throws IOException
     {
-        _driverFactory = defaultDriverFactory();
-        _driver = _driverFactory.createDriver();
-    }   
+        DriverFactory driverFactory = new 
ProtonFactoryLoader<DriverFactory>(DriverFactory.class).loadFactory();
+        _driver = driverFactory.createDriver();
+    }
 
     public void registerConnection(AmqpConnection amqpConnection)
     {
@@ -79,63 +107,127 @@ public class AmqpConnectionDriver
 
         amqpConnection.setSasl(sasl);
 
-        new Thread(new DriverRunnable(amqpConnection)).start();
+        _driverRunnable = new DriverRunnable();
+        _driverThread = new Thread(_driverRunnable); // TODO set a sensible 
thread name
+        _driverThread.start();
     }
 
-    private static DriverFactory defaultDriverFactory()
+    public void stop() throws InterruptedException
     {
-        return driverFactoryLoader.loadFactory();
+        _driverRunnable.requestStop();
+        _driverThread.join(AmqpConnection.TIMEOUT);
+        AmqpDriverState state = _driverRunnable.getState();
+        if(state != AmqpDriverState.STOPPED)
+        {
+            throw new IllegalStateException("After trying to stop, Driver is 
in state " + state);
+        }
     }
 
-    public class DriverRunnable implements Runnable
+    private class DriverRunnable implements Runnable
     {
-        //TODO: delete
-        private AmqpConnection _connection;
-
-        public DriverRunnable(AmqpConnection connection)
-        {
-            _connection = connection;
-        }
+        private volatile AmqpDriverState _state = AmqpDriverState.UNINIT;
+        private volatile boolean _stopRequested;
 
         @Override
         public void run()
         {
-            while(true)
+            _state = AmqpDriverState.OPEN;
+            while(!_stopRequested)
             {
-                Connector<?> connector;
                 try
                 {
+                    // Process connectors with local updates
                     for (Connector<?> c : _driver.connectors())
                     {
                         AmqpConnection amqpConnection = (AmqpConnection) 
(c.getContext());
-                        if(isUpdated(amqpConnection))
-                        {                            
+                        if(getAndClearLocallyUpdated(amqpConnection))
+                        {
                             processConnector(c);
                         }
                     }
 
+                    // Process connectors with in-bound data
+                    // (may incidentally also process connectors with 
out-bound data whose
+                    // sockets have just become writeable)
+                    Connector<?> connector;
                     while((connector = _driver.connector()) != null)
                     {
                         processConnector(connector);
                     }
-                    _logger.log(Level.FINEST, "Waiting");
-                    _driver.doWait(AmqpConnection.TIMEOUT);
-                    _logger.log(Level.FINEST, "Stopped Waiting");
+
+                    waitForLocalOrRemoteUpdates();
                 }
                 catch (IOException e)
                 {
-                    // TODO
-                    e.printStackTrace();
+                    // TODO proper error handling
+                    _logger.log(Level.SEVERE, "Driver error", e);
+                    _state = AmqpDriverState.ERROR;
                     break;
                 }
             }
+
+            closeAndDestroyDriver();
         }
 
-        private boolean isUpdated(AmqpConnection amqpConnection)
+        private void closeAndDestroyDriver()
         {
-            return _updated.remove(amqpConnection) != null;
+            try
+            {
+                for (Connector<?> c : _driver.connectors())
+                {
+                    c.close();
+                }
+
+                _driver.destroy();
+
+                if(_state != AmqpDriverState.ERROR)
+                {
+                    _state = AmqpDriverState.STOPPED;
+                }
+                if(_logger.isLoggable(Level.FINE))
+                {
+                    _logger.fine(this + " closed and destroyed driver");
+                }
+            }
+            catch(Exception e)
+            {
+                 // TODO proper error handling
+                _logger.log(Level.SEVERE, "Driver error", e);
+                _state = AmqpDriverState.ERROR;
+            }
         }
 
+        private void waitForLocalOrRemoteUpdates()
+        {
+            // We're careful below whether we call Driver.doWait().
+            // Any prior setLocallyUpdated() calls would have set the Driver's 
wake-up status,
+            // but this may have been cleared by the Driver.connector() call 
above.
+            // Therefore, we guard the doWait() call with a check for pending 
local updates.
+
+            if(!_stopRequested && _locallyUpdatedConnections.isEmpty())
+            {
+                _driver.doWait(AmqpConnection.TIMEOUT);
+            }
+        }
+
+        public void requestStop()
+        {
+            _stopRequested = true;
+        }
+
+        public AmqpDriverState getState()
+        {
+            return _state;
+        }
+
+        private boolean getAndClearLocallyUpdated(AmqpConnection 
amqpConnection)
+        {
+            return _locallyUpdatedConnections.remove(amqpConnection) != null;
+        }
+
+        /**
+         * Handle the connector's inbound data and send its outbound data
+         */
         public void processConnector(Connector<?> connector) throws IOException
         {
             AmqpConnection amqpConnection = (AmqpConnection) 
(connector.getContext());
@@ -150,6 +242,7 @@ public class AmqpConnectionDriver
                 if(amqpConnection.isClosed())
                 {
                     connector.destroy();
+                    _state = AmqpDriverState.STOPPED;
                 }
 
                 amqpConnection.notifyAll();
@@ -157,13 +250,14 @@ public class AmqpConnectionDriver
         }
     }
 
-    public void wakeup()
+    /**
+     * Indicate that at least one AmqpXXX object on the supplied connection
+     * has been locally updated, so the driver knows it needs to send updates
+     * to the peer.
+     */
+    public void setLocallyUpdated(AmqpConnection amqpConnection)
     {
+        _locallyUpdatedConnections.put(amqpConnection, Boolean.TRUE);
         _driver.wakeup();
     }
-
-    public void updated(AmqpConnection amqpConnection)
-    {
-        _updated.put(amqpConnection, Boolean.TRUE);
-    }
 }
\ No newline at end of file

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java?rev=1501794&r1=1501793&r2=1501794&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java 
(original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java Wed 
Jul 10 15:21:28 2013
@@ -33,8 +33,10 @@ public abstract class AmqpLink
     private final AmqpSession _amqpSession;
     private final Link _protonLink;
     private boolean _established;
+    private boolean _linkError;
     private boolean _closed;
 
+
     public AmqpLink(AmqpSession amqpSession, Link protonLink)
     {
         _amqpSession = amqpSession;
@@ -52,6 +54,16 @@ public abstract class AmqpLink
         _established = true;
     }
 
+    public boolean getLinkError()
+    {
+        return _linkError;
+    }
+
+    public void setLinkError()
+    {
+        _linkError = true;
+    }
+
     AmqpConnection getAmqpConnection()
     {
         return _amqpConnection;
@@ -83,4 +95,5 @@ public abstract class AmqpLink
     {
         return _closed;
     }
+
 }

Modified: 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java?rev=1501794&r1=1501793&r2=1501794&view=diff
==============================================================================
--- 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java
 (original)
+++ 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java
 Wed Jul 10 15:21:28 2013
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.jms.engine;
 
+//TODO make me (or wrap me in) a JMSException
 public class ConnectionException extends Exception
 {
     private static final long serialVersionUID = 419676688719664719L;

Copied: 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java 
(from r1501793, 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java)
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java&r1=1501793&r2=1501794&rev=1501794&view=diff
==============================================================================
--- 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java
 (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java 
Wed Jul 10 15:21:28 2013
@@ -20,11 +20,12 @@
  */
 package org.apache.qpid.jms.engine;
 
-public class ConnectionException extends Exception
+// TODO make me (or wrap me in) a JMSException
+public class LinkException extends Exception
 {
     private static final long serialVersionUID = 419676688719664719L;
 
-    public ConnectionException(String msg)
+    public LinkException(String msg)
     {
         super(msg);
     }

Modified: 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java?rev=1501794&r1=1501793&r2=1501794&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java 
(original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java 
Wed Jul 10 15:21:28 2013
@@ -21,6 +21,8 @@
 package org.apache.qpid.jms.impl;
 
 import java.io.IOException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
 
 import org.apache.qpid.jms.engine.AmqpConnection;
 import org.apache.qpid.jms.engine.AmqpConnectionDriver;
@@ -30,7 +32,11 @@ import org.apache.qpid.proton.TimeoutExc
 
 public class ConnectionImpl
 {
+    private static final Logger _logger = 
Logger.getLogger(ConnectionImpl.class.getName());
+
     private AmqpConnection _amqpConnection;
+
+    /** The driver dedicated to this connection */
     private AmqpConnectionDriver _amqpConnectionDriver;
     private ConnectionLock _connectionLock;
 
@@ -54,10 +60,10 @@ public class ConnectionImpl
         _connectionLock = new ConnectionLock(this);
         _connectionLock.setConnectionStateChangeListener(new 
ConnectionStateChangeListener()
         {
+            @Override
             public void stateChanged(ConnectionImpl connection)
             {
-                
connection._amqpConnectionDriver.updated(connection._amqpConnection);           
     
-                connection._amqpConnectionDriver.wakeup();
+                
connection._amqpConnectionDriver.setLocallyUpdated(connection._amqpConnection);
             }
         });
     }
@@ -74,6 +80,12 @@ public class ConnectionImpl
         {
             while (first || (!done && wait))
             {
+                if(_logger.isLoggable(Level.FINER))
+                {
+                    _logger.log(Level.FINER,
+                            "About to waitUntil {0}. first={1}, done={2}, 
wait={3}",
+                            new Object[] {condition, first, done, wait});
+                }
                 if (wait && !done && !first)
                 {
                     _amqpConnection.wait(timeoutMillis < 0 ? 0 : deadline - 
System.currentTimeMillis());
@@ -83,9 +95,16 @@ public class ConnectionImpl
                 done = done || condition.test();
                 first = false;
             }
+            if(_logger.isLoggable(Level.FINER))
+            {
+                _logger.log(Level.FINER,
+                        "Finished waitUntil {0}. first={1}, done={2}, 
wait={3}",
+                        new Object[] {condition, first, done, wait});
+            }
+
             if (!done)
             {
-                throw new TimeoutException(timeoutMillis, 
condition.toString());
+                throw new TimeoutException(timeoutMillis, 
condition.getCurrentState());
             }
         }
     }
@@ -97,6 +116,7 @@ public class ConnectionImpl
         {
             waitUntil(new SimplePredicate("Connection established or failed", 
_amqpConnection)
             {
+                @Override
                 public boolean test()
                 {
                     return _amqpConnection.isConnected() || 
_amqpConnection.isAuthenticationError() || 
_amqpConnection.getConnectionError().getCondition() != null;
@@ -131,16 +151,16 @@ public class ConnectionImpl
         {
             _amqpConnection.close();
             stateChanged();
-            while(!_amqpConnection.isClosed())
+            waitUntil(new SimplePredicate("Connection is closed", 
_amqpConnection)
             {
-                waitUntil(new SimplePredicate("Connection is closed", 
_amqpConnection)
+                @Override
+                public boolean test()
                 {
-                    public boolean test()
-                    {
-                        return _amqpConnection.isClosed();
-                    }
-                }, AmqpConnection.TIMEOUT);
-            }
+                    return _amqpConnection.isClosed();
+                }
+            }, AmqpConnection.TIMEOUT);
+
+            _amqpConnectionDriver.stop();
 
             if(_amqpConnection.getConnectionError().getCondition() != null)
             {

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java?rev=1501794&r1=1501793&r2=1501794&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java 
(original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java Wed Jul 
10 15:21:28 2013
@@ -23,6 +23,7 @@ package org.apache.qpid.jms.impl;
 import org.apache.qpid.jms.engine.AmqpConnection;
 import org.apache.qpid.jms.engine.AmqpLink;
 import org.apache.qpid.jms.engine.ConnectionException;
+import org.apache.qpid.jms.engine.LinkException;
 import org.apache.qpid.proton.TimeoutException;
 
 public class LinkImpl
@@ -38,16 +39,20 @@ public class LinkImpl
         _amqpLink = amqpLink;
     }
 
-    public void establish() throws TimeoutException, InterruptedException
+    public void establish() throws TimeoutException, InterruptedException, 
LinkException
     {
-        _connectionImpl.waitUntil(new SimplePredicate("Link is closed", 
_amqpLink)
+        _connectionImpl.waitUntil(new SimplePredicate("Link is established or 
failed", _amqpLink)
         {
             @Override
             public boolean test()
             {
-                return _amqpLink.isEstablished();
+                return _amqpLink.isEstablished() || _amqpLink.getLinkError();
             }
         }, AmqpConnection.TIMEOUT);
+        if(!_amqpLink.isEstablished())
+        {
+            throw new LinkException("Failed to establish link " + _amqpLink); 
// TODO make message less verbose
+        }
     }
 
     public void close() throws TimeoutException, InterruptedException, 
ConnectionException

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/Predicate.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/Predicate.java?rev=1501794&r1=1501793&r2=1501794&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/Predicate.java 
(original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/Predicate.java Wed 
Jul 10 15:21:28 2013
@@ -22,11 +22,16 @@ package org.apache.qpid.jms.impl;
 
 /**
  * A simple predicate.
- *
- * Used for general purpose logic so should provide a useful toString() 
implementation
- * for logging purposes.
  */
 interface Predicate
 {
+    /**
+     * Returns whether the predicate is true.
+     */
     boolean test();
+
+    /**
+     * Returns the current state, primarily for logging purposes if something 
goes wrong
+     */
+    String getCurrentState();
 }
\ No newline at end of file

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java?rev=1501794&r1=1501793&r2=1501794&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java 
(original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java Wed 
Jul 10 15:21:28 2013
@@ -25,6 +25,7 @@ import org.apache.qpid.jms.engine.AmqpRe
 import org.apache.qpid.jms.engine.AmqpSender;
 import org.apache.qpid.jms.engine.AmqpSession;
 import org.apache.qpid.jms.engine.ConnectionException;
+import org.apache.qpid.jms.engine.LinkException;
 import org.apache.qpid.proton.TimeoutException;
 
 public class SessionImpl
@@ -40,7 +41,7 @@ public class SessionImpl
 
     public void establish() throws TimeoutException, InterruptedException
     {
-        _connectionImpl.waitUntil(new SimplePredicate("Session established")
+        _connectionImpl.waitUntil(new SimplePredicate("Session established", 
_amqpSession)
         {
             @Override
             public boolean test()
@@ -85,7 +86,7 @@ public class SessionImpl
         return _connectionImpl;
     }
 
-    public SenderImpl createSender(String name, String address) throws 
TimeoutException, InterruptedException
+    public SenderImpl createSender(String name, String address) throws 
TimeoutException, InterruptedException, LinkException
     {
         _connectionImpl.lock();
         try
@@ -102,7 +103,7 @@ public class SessionImpl
         }
     }
 
-    public ReceiverImpl createReceiver(String name, String address) throws 
TimeoutException, InterruptedException
+    public ReceiverImpl createReceiver(String name, String address) throws 
TimeoutException, InterruptedException, LinkException
     {
         _connectionImpl.lock();
         try

Modified: 
qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SimplePredicate.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SimplePredicate.java?rev=1501794&r1=1501793&r2=1501794&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SimplePredicate.java 
(original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SimplePredicate.java 
Wed Jul 10 15:21:28 2013
@@ -41,16 +41,20 @@ abstract class SimplePredicate implement
         _predicatedObjects = predicatedObjects;
     }
 
-    /** intended to be overridden to provide more useful information */
-    protected Object getCurrentState()
+    @Override
+    public String getCurrentState()
     {
-        return getPredicatedObjects();
+        StringBuilder builder = new StringBuilder("CurrentState [")
+            .append(this)
+            .append(getPredicatedObjects())
+            .append("]");
+        return builder.toString();
     }
 
     /**
      * Returns the predicated objects as a list, or null if none exist
      */
-    protected Object getPredicatedObjects()
+    private Object getPredicatedObjects()
     {
         if(_predicatedObjects == null)
         {
@@ -65,9 +69,9 @@ abstract class SimplePredicate implement
     @Override
     public String toString()
     {
-        StringBuilder builder = new StringBuilder();
-        builder.append("SimplePredicate [_description=").append(_description)
-            .append(", currentState=").append(getCurrentState()).append("]");
+        StringBuilder builder = new StringBuilder(super.toString())
+            .append(" [_description=").append(_description)
+            .append("]");
         return builder.toString();
     }
 }

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/JmsTest.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/JmsTest.java?rev=1501794&r1=1501793&r2=1501794&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/JmsTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/JmsTest.java Wed Jul 10 
15:21:28 2013
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.jms;
 
+import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.qpid.jms.impl.ConnectionImpl;
@@ -43,48 +44,54 @@ import org.junit.Test;
 public class JmsTest
 {
     //TODO: use another logger
-    private static Logger _logger = 
Logger.getLogger("qpid.jms-client.connection");
+    private static Logger _logger = Logger.getLogger(JmsTest.class.getName());
 
     private final MessageFactory _messageFactory = new 
ProtonFactoryLoader<MessageFactory>(MessageFactory.class).loadFactory();
 
     @Test
     public void test() throws Exception
     {
-        System.out.println("PHDEBUG " + 
System.getProperty("java.util.logging.config.file"));
-        ConnectionImpl connection =  new ConnectionImpl("clientName", 
"localhost", 5672, "guest", "guest");
-        connection.connect();
-
-        SessionImpl session = connection.createSession();
-        session.establish();
-
-        SenderImpl sender = session.createSender("1","queue");
-        sender.establish();
-
-        Message message = _messageFactory.createMessage();
-        AmqpValue body = new AmqpValue("Hello World!");
-        message.setBody(body);
-        sender.sendMessage(message);
-
-        sender.close();
-
-        ReceiverImpl receiver = session.createReceiver("1", "queue");
-        receiver.credit(5);
-        receiver.establish();
-        ReceivedMessageImpl receivedMessage = receiver.receive(5000);
-        receivedMessage.accept(true);
-
-        _logger.info("=========================");
-        _logger.info(receivedMessage.getMessage().getBody().toString());
-        _logger.info("=========================");
-
-        receiver.close();
-
-        session.close();
-
-        _logger.info("About to close " + connection);
-
-        connection.close();
-        _logger.info("Closed connection.");
+        try
+        {
+            ConnectionImpl connection =  new ConnectionImpl("clientName", 
"localhost", 5672, "guest", "guest");
+            connection.connect();
+
+            SessionImpl session = connection.createSession();
+            session.establish();
+
+            SenderImpl sender = session.createSender("1","queue");
+            sender.establish();
+
+            Message message = _messageFactory.createMessage();
+            AmqpValue body = new AmqpValue("Hello World!");
+            message.setBody(body);
+            sender.sendMessage(message);
+
+            sender.close();
+
+            ReceiverImpl receiver = session.createReceiver("1", "queue");
+            receiver.credit(5);
+            receiver.establish();
+            ReceivedMessageImpl receivedMessage = receiver.receive(5000);
+            receivedMessage.accept(true);
+
+            _logger.info("Received message:");
+            _logger.info("=========================");
+            _logger.info(receivedMessage.getMessage().getBody().toString());
+            _logger.info("=========================");
+
+            receiver.close();
+
+            session.close();
+
+            connection.close();
+        }
+        catch (Exception e)
+        {
+            // log the error so its timestamp is recorded - useful for 
timeout-type errors
+            _logger.log(Level.SEVERE, "Test failed", e);
+            throw e;
+        }
     }
 
 }

Added: 
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SimplePredicateTest.java
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SimplePredicateTest.java?rev=1501794&view=auto
==============================================================================
--- 
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SimplePredicateTest.java 
(added)
+++ 
qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SimplePredicateTest.java 
Wed Jul 10 15:21:28 2013
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.impl;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.not;
+import static org.junit.Assert.assertThat;
+
+import org.junit.Test;
+
+public class SimplePredicateTest
+{
+    private static final String DESCRIPTION = "description1";
+    private static final String PREDICATED_OBJECT = "predicatedObject1";
+
+    @Test
+    public void testGetCurrentStateWithDefaultPredicate()
+    {
+        SimplePredicate predicate = new SimplePredicate()
+        {
+            @Override
+            public boolean test()
+            {
+                return true;
+            }
+        };
+        String currentState = predicate.getCurrentState();
+        assertThat(currentState, 
containsString(predicate.getClass().getName()));
+    }
+
+    @Test
+    public void testGetCurrentStateWithAPredicatedObject()
+    {
+        SimplePredicate predicate = new SimplePredicate(DESCRIPTION, 
PREDICATED_OBJECT)
+        {
+            @Override
+            public boolean test()
+            {
+                return true;
+            }
+        };
+
+        String currentState = predicate.getCurrentState();
+        assertThat(currentState, containsString(DESCRIPTION));
+        assertThat(currentState, containsString(PREDICATED_OBJECT));
+    }
+
+    @Test
+    public void testToStringWithDefaultPredicate()
+    {
+        SimplePredicate predicate = new SimplePredicate()
+        {
+            @Override
+            public boolean test()
+            {
+                return true;
+            }
+        };
+
+        assertThat(predicate.toString(), 
containsString(predicate.getClass().getName()));
+    }
+
+    @Test
+    public void testToStringWithAPredicatedObject()
+    {
+        SimplePredicate predicate = new SimplePredicate(DESCRIPTION, 
PREDICATED_OBJECT)
+        {
+            @Override
+            public boolean test()
+            {
+                return true;
+            }
+        };
+
+        assertThat(predicate.toString(), containsString(DESCRIPTION));
+        assertThat(
+                "toString should not return the predicated object because " +
+                "this is too verbose for the usual toString use cases",
+                predicate.toString(),
+                not(containsString(PREDICATED_OBJECT)));
+    }
+}

Modified: qpid/jms/trunk/src/test/resources/logging.properties
URL: 
http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/resources/logging.properties?rev=1501794&r1=1501793&r2=1501794&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/resources/logging.properties (original)
+++ qpid/jms/trunk/src/test/resources/logging.properties Wed Jul 10 15:21:28 
2013
@@ -27,7 +27,3 @@ java.util.logging.ConsoleHandler.formatt
 java.util.logging.SimpleFormatter.format = %1$tF %1$tT.%tL %4$s %3$s %5$s%n
 
 .level = INFO
-
-# TODO-PH remove
-org.apache.qpid.proton.logging.LoggingProtocolTracer.sent.level = ALL
-org.apache.qpid.proton.logging.LoggingProtocolTracer.received.level = ALL



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to