Author: philharveyonline Date: Wed Jul 10 15:21:28 2013 New Revision: 1501794
URL: http://svn.apache.org/r1501794 Log: NO-JIRA: fixed race condition in AmqpConnectionDriver that caused Driver.wakeup calls to be ignored. Also added simple stop mechanism to AmqpConnectionDriver Also minor logging etc changes Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java - copied, changed from r1501793, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SimplePredicateTest.java Modified: qpid/jms/trunk/pom.xml qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriver.java qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/Predicate.java qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SimplePredicate.java qpid/jms/trunk/src/test/java/org/apache/qpid/jms/JmsTest.java qpid/jms/trunk/src/test/resources/logging.properties Modified: qpid/jms/trunk/pom.xml URL: http://svn.apache.org/viewvc/qpid/jms/trunk/pom.xml?rev=1501794&r1=1501793&r2=1501794&view=diff ============================================================================== --- qpid/jms/trunk/pom.xml (original) +++ qpid/jms/trunk/pom.xml Wed Jul 10 15:21:28 2013 @@ -51,6 +51,12 @@ <version>${junit-version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.hamcrest</groupId> + <artifactId>hamcrest-all</artifactId> + <version>1.3</version> + <scope>test</scope> + </dependency> </dependencies> <build> Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java?rev=1501794&r1=1501793&r2=1501794&view=diff ============================================================================== --- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java (original) +++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnection.java Wed Jul 10 15:21:28 2013 @@ -21,7 +21,6 @@ package org.apache.qpid.jms.engine; import java.util.ArrayList; - import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -35,7 +34,9 @@ import org.apache.qpid.proton.engine.Con import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.EngineFactory; import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Sasl; +import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.message.MessageFactory; @@ -170,12 +171,21 @@ public class AmqpConnection _password = password; } + /** + * For all the the "pending" AmqpXXX objects, update their state to reflect the remote state of their + * Proton counterparts, and remove them from the pending set. + * + * The "pending" AmqpXXX objects are the ones that whose local modifications are expected to cause a + * remote state change, e.g. newly created sessions. + * + * @return true if any AmqpXXX objects were updated by this method + */ public synchronized boolean process() { boolean updated = false; //Connection - EndpointState remoteState = _connection.getRemoteState(); - if (!_connected && (remoteState == EndpointState.UNINITIALIZED)) + EndpointState connectionRemoteState = _connection.getRemoteState(); + if (!_connected && (connectionRemoteState == EndpointState.UNINITIALIZED)) { if(_sasl != null) { @@ -183,14 +193,14 @@ public class AmqpConnection } } - if (!_connected && (remoteState == EndpointState.ACTIVE)) + if (!_connected && (connectionRemoteState == EndpointState.ACTIVE)) { _logger.log(Level.FINEST, "Set connected to true"); updated = true; _connected = true; } - if(_connected && (_connection.getLocalState() == EndpointState.CLOSED && remoteState == EndpointState.CLOSED)) + if(_connected && (_connection.getLocalState() == EndpointState.CLOSED && connectionRemoteState == EndpointState.CLOSED)) { _closed = true; _connected = false; @@ -232,13 +242,29 @@ public class AmqpConnection while(pendingLinks.hasNext()) { l = pendingLinks.next(); - if(l.getRemoteState() != EndpointState.UNINITIALIZED) + + EndpointState linkRemoteState = l.getRemoteState(); + if(linkRemoteState == EndpointState.ACTIVE || linkRemoteState == EndpointState.CLOSED) { AmqpLink amqpLink = (AmqpLink) l.getContext(); - amqpLink.setEstablished(); + if(linkRemoteState == EndpointState.ACTIVE && getRemoteNode(l) != null) + { + amqpLink.setEstablished(); + } + else + { + amqpLink.setLinkError(); + amqpLink.setClosed(); + } + pendingLinks.remove(); updated = true; } + else + { + // no nothing + } + } Iterator<Link> pendingCloseLinks = _pendingCloseLinks.iterator(); @@ -258,6 +284,22 @@ public class AmqpConnection return updated; } + private Object getRemoteNode(Link link) + { + if(link instanceof Sender) + { + return link.getRemoteTarget(); + } + else if(link instanceof Receiver) + { + return link.getRemoteSource(); + } + else + { + throw new IllegalArgumentException(String.format("%s is not a %s or a %s", link, Sender.class, Receiver.class)); + } + } + private boolean processSasl() { boolean updated = false; Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriver.java URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriver.java?rev=1501794&r1=1501793&r2=1501794&view=diff ============================================================================== --- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriver.java (original) +++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpConnectionDriver.java Wed Jul 10 15:21:28 2013 @@ -33,20 +33,48 @@ import org.apache.qpid.proton.driver.Dri import org.apache.qpid.proton.driver.DriverFactory; import org.apache.qpid.proton.engine.Sasl; +/** + * <p> + * Asynchronously processes local and remote updates to the AmqpXXX objects on + * the registered {@link AmqpConnection}s. Specifically:</p> + * <p> + * - Once notified that a connection has local updates via {@link #setLocallyUpdated(AmqpConnection)}, + * these updates are written to the network. + * </p> + * <p> + * - Reads any remote updates from the network and updates the local AmqpXXX objects accordingly. + * </p> + * <p> + * Thread-safe. + * </p> + * + * TODO verify that multiple connections are handled properly + */ public class AmqpConnectionDriver { - private static Logger _logger = Logger.getLogger("qpid.jms-client.connection.driver"); - private static final ProtonFactoryLoader<DriverFactory> driverFactoryLoader = new ProtonFactoryLoader<DriverFactory>(DriverFactory.class); - private DriverFactory _driverFactory; - private Driver _driver; + private static Logger _logger = Logger.getLogger(AmqpConnectionDriver.class.getName()); - private final ConcurrentHashMap<AmqpConnection,Boolean> _updated = new ConcurrentHashMap<AmqpConnection,Boolean>(); + private final Driver _driver; + + private final ConcurrentHashMap<AmqpConnection,Boolean> _locallyUpdatedConnections = + new ConcurrentHashMap<AmqpConnection,Boolean>(); + + private DriverRunnable _driverRunnable; + private Thread _driverThread; + + public enum AmqpDriverState + { + UNINIT, + OPEN, + STOPPED, + ERROR; + } public AmqpConnectionDriver() throws IOException { - _driverFactory = defaultDriverFactory(); - _driver = _driverFactory.createDriver(); - } + DriverFactory driverFactory = new ProtonFactoryLoader<DriverFactory>(DriverFactory.class).loadFactory(); + _driver = driverFactory.createDriver(); + } public void registerConnection(AmqpConnection amqpConnection) { @@ -79,63 +107,127 @@ public class AmqpConnectionDriver amqpConnection.setSasl(sasl); - new Thread(new DriverRunnable(amqpConnection)).start(); + _driverRunnable = new DriverRunnable(); + _driverThread = new Thread(_driverRunnable); // TODO set a sensible thread name + _driverThread.start(); } - private static DriverFactory defaultDriverFactory() + public void stop() throws InterruptedException { - return driverFactoryLoader.loadFactory(); + _driverRunnable.requestStop(); + _driverThread.join(AmqpConnection.TIMEOUT); + AmqpDriverState state = _driverRunnable.getState(); + if(state != AmqpDriverState.STOPPED) + { + throw new IllegalStateException("After trying to stop, Driver is in state " + state); + } } - public class DriverRunnable implements Runnable + private class DriverRunnable implements Runnable { - //TODO: delete - private AmqpConnection _connection; - - public DriverRunnable(AmqpConnection connection) - { - _connection = connection; - } + private volatile AmqpDriverState _state = AmqpDriverState.UNINIT; + private volatile boolean _stopRequested; @Override public void run() { - while(true) + _state = AmqpDriverState.OPEN; + while(!_stopRequested) { - Connector<?> connector; try { + // Process connectors with local updates for (Connector<?> c : _driver.connectors()) { AmqpConnection amqpConnection = (AmqpConnection) (c.getContext()); - if(isUpdated(amqpConnection)) - { + if(getAndClearLocallyUpdated(amqpConnection)) + { processConnector(c); } } + // Process connectors with in-bound data + // (may incidentally also process connectors with out-bound data whose + // sockets have just become writeable) + Connector<?> connector; while((connector = _driver.connector()) != null) { processConnector(connector); } - _logger.log(Level.FINEST, "Waiting"); - _driver.doWait(AmqpConnection.TIMEOUT); - _logger.log(Level.FINEST, "Stopped Waiting"); + + waitForLocalOrRemoteUpdates(); } catch (IOException e) { - // TODO - e.printStackTrace(); + // TODO proper error handling + _logger.log(Level.SEVERE, "Driver error", e); + _state = AmqpDriverState.ERROR; break; } } + + closeAndDestroyDriver(); } - private boolean isUpdated(AmqpConnection amqpConnection) + private void closeAndDestroyDriver() { - return _updated.remove(amqpConnection) != null; + try + { + for (Connector<?> c : _driver.connectors()) + { + c.close(); + } + + _driver.destroy(); + + if(_state != AmqpDriverState.ERROR) + { + _state = AmqpDriverState.STOPPED; + } + if(_logger.isLoggable(Level.FINE)) + { + _logger.fine(this + " closed and destroyed driver"); + } + } + catch(Exception e) + { + // TODO proper error handling + _logger.log(Level.SEVERE, "Driver error", e); + _state = AmqpDriverState.ERROR; + } } + private void waitForLocalOrRemoteUpdates() + { + // We're careful below whether we call Driver.doWait(). + // Any prior setLocallyUpdated() calls would have set the Driver's wake-up status, + // but this may have been cleared by the Driver.connector() call above. + // Therefore, we guard the doWait() call with a check for pending local updates. + + if(!_stopRequested && _locallyUpdatedConnections.isEmpty()) + { + _driver.doWait(AmqpConnection.TIMEOUT); + } + } + + public void requestStop() + { + _stopRequested = true; + } + + public AmqpDriverState getState() + { + return _state; + } + + private boolean getAndClearLocallyUpdated(AmqpConnection amqpConnection) + { + return _locallyUpdatedConnections.remove(amqpConnection) != null; + } + + /** + * Handle the connector's inbound data and send its outbound data + */ public void processConnector(Connector<?> connector) throws IOException { AmqpConnection amqpConnection = (AmqpConnection) (connector.getContext()); @@ -150,6 +242,7 @@ public class AmqpConnectionDriver if(amqpConnection.isClosed()) { connector.destroy(); + _state = AmqpDriverState.STOPPED; } amqpConnection.notifyAll(); @@ -157,13 +250,14 @@ public class AmqpConnectionDriver } } - public void wakeup() + /** + * Indicate that at least one AmqpXXX object on the supplied connection + * has been locally updated, so the driver knows it needs to send updates + * to the peer. + */ + public void setLocallyUpdated(AmqpConnection amqpConnection) { + _locallyUpdatedConnections.put(amqpConnection, Boolean.TRUE); _driver.wakeup(); } - - public void updated(AmqpConnection amqpConnection) - { - _updated.put(amqpConnection, Boolean.TRUE); - } } \ No newline at end of file Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java?rev=1501794&r1=1501793&r2=1501794&view=diff ============================================================================== --- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java (original) +++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java Wed Jul 10 15:21:28 2013 @@ -33,8 +33,10 @@ public abstract class AmqpLink private final AmqpSession _amqpSession; private final Link _protonLink; private boolean _established; + private boolean _linkError; private boolean _closed; + public AmqpLink(AmqpSession amqpSession, Link protonLink) { _amqpSession = amqpSession; @@ -52,6 +54,16 @@ public abstract class AmqpLink _established = true; } + public boolean getLinkError() + { + return _linkError; + } + + public void setLinkError() + { + _linkError = true; + } + AmqpConnection getAmqpConnection() { return _amqpConnection; @@ -83,4 +95,5 @@ public abstract class AmqpLink { return _closed; } + } Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java?rev=1501794&r1=1501793&r2=1501794&view=diff ============================================================================== --- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java (original) +++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java Wed Jul 10 15:21:28 2013 @@ -20,6 +20,7 @@ */ package org.apache.qpid.jms.engine; +//TODO make me (or wrap me in) a JMSException public class ConnectionException extends Exception { private static final long serialVersionUID = 419676688719664719L; Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java (from r1501793, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java) URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java&r1=1501793&r2=1501794&rev=1501794&view=diff ============================================================================== --- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/ConnectionException.java (original) +++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/LinkException.java Wed Jul 10 15:21:28 2013 @@ -20,11 +20,12 @@ */ package org.apache.qpid.jms.engine; -public class ConnectionException extends Exception +// TODO make me (or wrap me in) a JMSException +public class LinkException extends Exception { private static final long serialVersionUID = 419676688719664719L; - public ConnectionException(String msg) + public LinkException(String msg) { super(msg); } Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java?rev=1501794&r1=1501793&r2=1501794&view=diff ============================================================================== --- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java (original) +++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ConnectionImpl.java Wed Jul 10 15:21:28 2013 @@ -21,6 +21,8 @@ package org.apache.qpid.jms.impl; import java.io.IOException; +import java.util.logging.Level; +import java.util.logging.Logger; import org.apache.qpid.jms.engine.AmqpConnection; import org.apache.qpid.jms.engine.AmqpConnectionDriver; @@ -30,7 +32,11 @@ import org.apache.qpid.proton.TimeoutExc public class ConnectionImpl { + private static final Logger _logger = Logger.getLogger(ConnectionImpl.class.getName()); + private AmqpConnection _amqpConnection; + + /** The driver dedicated to this connection */ private AmqpConnectionDriver _amqpConnectionDriver; private ConnectionLock _connectionLock; @@ -54,10 +60,10 @@ public class ConnectionImpl _connectionLock = new ConnectionLock(this); _connectionLock.setConnectionStateChangeListener(new ConnectionStateChangeListener() { + @Override public void stateChanged(ConnectionImpl connection) { - connection._amqpConnectionDriver.updated(connection._amqpConnection); - connection._amqpConnectionDriver.wakeup(); + connection._amqpConnectionDriver.setLocallyUpdated(connection._amqpConnection); } }); } @@ -74,6 +80,12 @@ public class ConnectionImpl { while (first || (!done && wait)) { + if(_logger.isLoggable(Level.FINER)) + { + _logger.log(Level.FINER, + "About to waitUntil {0}. first={1}, done={2}, wait={3}", + new Object[] {condition, first, done, wait}); + } if (wait && !done && !first) { _amqpConnection.wait(timeoutMillis < 0 ? 0 : deadline - System.currentTimeMillis()); @@ -83,9 +95,16 @@ public class ConnectionImpl done = done || condition.test(); first = false; } + if(_logger.isLoggable(Level.FINER)) + { + _logger.log(Level.FINER, + "Finished waitUntil {0}. first={1}, done={2}, wait={3}", + new Object[] {condition, first, done, wait}); + } + if (!done) { - throw new TimeoutException(timeoutMillis, condition.toString()); + throw new TimeoutException(timeoutMillis, condition.getCurrentState()); } } } @@ -97,6 +116,7 @@ public class ConnectionImpl { waitUntil(new SimplePredicate("Connection established or failed", _amqpConnection) { + @Override public boolean test() { return _amqpConnection.isConnected() || _amqpConnection.isAuthenticationError() || _amqpConnection.getConnectionError().getCondition() != null; @@ -131,16 +151,16 @@ public class ConnectionImpl { _amqpConnection.close(); stateChanged(); - while(!_amqpConnection.isClosed()) + waitUntil(new SimplePredicate("Connection is closed", _amqpConnection) { - waitUntil(new SimplePredicate("Connection is closed", _amqpConnection) + @Override + public boolean test() { - public boolean test() - { - return _amqpConnection.isClosed(); - } - }, AmqpConnection.TIMEOUT); - } + return _amqpConnection.isClosed(); + } + }, AmqpConnection.TIMEOUT); + + _amqpConnectionDriver.stop(); if(_amqpConnection.getConnectionError().getCondition() != null) { Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java?rev=1501794&r1=1501793&r2=1501794&view=diff ============================================================================== --- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java (original) +++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/LinkImpl.java Wed Jul 10 15:21:28 2013 @@ -23,6 +23,7 @@ package org.apache.qpid.jms.impl; import org.apache.qpid.jms.engine.AmqpConnection; import org.apache.qpid.jms.engine.AmqpLink; import org.apache.qpid.jms.engine.ConnectionException; +import org.apache.qpid.jms.engine.LinkException; import org.apache.qpid.proton.TimeoutException; public class LinkImpl @@ -38,16 +39,20 @@ public class LinkImpl _amqpLink = amqpLink; } - public void establish() throws TimeoutException, InterruptedException + public void establish() throws TimeoutException, InterruptedException, LinkException { - _connectionImpl.waitUntil(new SimplePredicate("Link is closed", _amqpLink) + _connectionImpl.waitUntil(new SimplePredicate("Link is established or failed", _amqpLink) { @Override public boolean test() { - return _amqpLink.isEstablished(); + return _amqpLink.isEstablished() || _amqpLink.getLinkError(); } }, AmqpConnection.TIMEOUT); + if(!_amqpLink.isEstablished()) + { + throw new LinkException("Failed to establish link " + _amqpLink); // TODO make message less verbose + } } public void close() throws TimeoutException, InterruptedException, ConnectionException Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/Predicate.java URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/Predicate.java?rev=1501794&r1=1501793&r2=1501794&view=diff ============================================================================== --- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/Predicate.java (original) +++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/Predicate.java Wed Jul 10 15:21:28 2013 @@ -22,11 +22,16 @@ package org.apache.qpid.jms.impl; /** * A simple predicate. - * - * Used for general purpose logic so should provide a useful toString() implementation - * for logging purposes. */ interface Predicate { + /** + * Returns whether the predicate is true. + */ boolean test(); + + /** + * Returns the current state, primarily for logging purposes if something goes wrong + */ + String getCurrentState(); } \ No newline at end of file Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java?rev=1501794&r1=1501793&r2=1501794&view=diff ============================================================================== --- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java (original) +++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java Wed Jul 10 15:21:28 2013 @@ -25,6 +25,7 @@ import org.apache.qpid.jms.engine.AmqpRe import org.apache.qpid.jms.engine.AmqpSender; import org.apache.qpid.jms.engine.AmqpSession; import org.apache.qpid.jms.engine.ConnectionException; +import org.apache.qpid.jms.engine.LinkException; import org.apache.qpid.proton.TimeoutException; public class SessionImpl @@ -40,7 +41,7 @@ public class SessionImpl public void establish() throws TimeoutException, InterruptedException { - _connectionImpl.waitUntil(new SimplePredicate("Session established") + _connectionImpl.waitUntil(new SimplePredicate("Session established", _amqpSession) { @Override public boolean test() @@ -85,7 +86,7 @@ public class SessionImpl return _connectionImpl; } - public SenderImpl createSender(String name, String address) throws TimeoutException, InterruptedException + public SenderImpl createSender(String name, String address) throws TimeoutException, InterruptedException, LinkException { _connectionImpl.lock(); try @@ -102,7 +103,7 @@ public class SessionImpl } } - public ReceiverImpl createReceiver(String name, String address) throws TimeoutException, InterruptedException + public ReceiverImpl createReceiver(String name, String address) throws TimeoutException, InterruptedException, LinkException { _connectionImpl.lock(); try Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SimplePredicate.java URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SimplePredicate.java?rev=1501794&r1=1501793&r2=1501794&view=diff ============================================================================== --- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SimplePredicate.java (original) +++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SimplePredicate.java Wed Jul 10 15:21:28 2013 @@ -41,16 +41,20 @@ abstract class SimplePredicate implement _predicatedObjects = predicatedObjects; } - /** intended to be overridden to provide more useful information */ - protected Object getCurrentState() + @Override + public String getCurrentState() { - return getPredicatedObjects(); + StringBuilder builder = new StringBuilder("CurrentState [") + .append(this) + .append(getPredicatedObjects()) + .append("]"); + return builder.toString(); } /** * Returns the predicated objects as a list, or null if none exist */ - protected Object getPredicatedObjects() + private Object getPredicatedObjects() { if(_predicatedObjects == null) { @@ -65,9 +69,9 @@ abstract class SimplePredicate implement @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("SimplePredicate [_description=").append(_description) - .append(", currentState=").append(getCurrentState()).append("]"); + StringBuilder builder = new StringBuilder(super.toString()) + .append(" [_description=").append(_description) + .append("]"); return builder.toString(); } } Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/JmsTest.java URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/JmsTest.java?rev=1501794&r1=1501793&r2=1501794&view=diff ============================================================================== --- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/JmsTest.java (original) +++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/JmsTest.java Wed Jul 10 15:21:28 2013 @@ -20,6 +20,7 @@ */ package org.apache.qpid.jms; +import java.util.logging.Level; import java.util.logging.Logger; import org.apache.qpid.jms.impl.ConnectionImpl; @@ -43,48 +44,54 @@ import org.junit.Test; public class JmsTest { //TODO: use another logger - private static Logger _logger = Logger.getLogger("qpid.jms-client.connection"); + private static Logger _logger = Logger.getLogger(JmsTest.class.getName()); private final MessageFactory _messageFactory = new ProtonFactoryLoader<MessageFactory>(MessageFactory.class).loadFactory(); @Test public void test() throws Exception { - System.out.println("PHDEBUG " + System.getProperty("java.util.logging.config.file")); - ConnectionImpl connection = new ConnectionImpl("clientName", "localhost", 5672, "guest", "guest"); - connection.connect(); - - SessionImpl session = connection.createSession(); - session.establish(); - - SenderImpl sender = session.createSender("1","queue"); - sender.establish(); - - Message message = _messageFactory.createMessage(); - AmqpValue body = new AmqpValue("Hello World!"); - message.setBody(body); - sender.sendMessage(message); - - sender.close(); - - ReceiverImpl receiver = session.createReceiver("1", "queue"); - receiver.credit(5); - receiver.establish(); - ReceivedMessageImpl receivedMessage = receiver.receive(5000); - receivedMessage.accept(true); - - _logger.info("========================="); - _logger.info(receivedMessage.getMessage().getBody().toString()); - _logger.info("========================="); - - receiver.close(); - - session.close(); - - _logger.info("About to close " + connection); - - connection.close(); - _logger.info("Closed connection."); + try + { + ConnectionImpl connection = new ConnectionImpl("clientName", "localhost", 5672, "guest", "guest"); + connection.connect(); + + SessionImpl session = connection.createSession(); + session.establish(); + + SenderImpl sender = session.createSender("1","queue"); + sender.establish(); + + Message message = _messageFactory.createMessage(); + AmqpValue body = new AmqpValue("Hello World!"); + message.setBody(body); + sender.sendMessage(message); + + sender.close(); + + ReceiverImpl receiver = session.createReceiver("1", "queue"); + receiver.credit(5); + receiver.establish(); + ReceivedMessageImpl receivedMessage = receiver.receive(5000); + receivedMessage.accept(true); + + _logger.info("Received message:"); + _logger.info("========================="); + _logger.info(receivedMessage.getMessage().getBody().toString()); + _logger.info("========================="); + + receiver.close(); + + session.close(); + + connection.close(); + } + catch (Exception e) + { + // log the error so its timestamp is recorded - useful for timeout-type errors + _logger.log(Level.SEVERE, "Test failed", e); + throw e; + } } } Added: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SimplePredicateTest.java URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SimplePredicateTest.java?rev=1501794&view=auto ============================================================================== --- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SimplePredicateTest.java (added) +++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SimplePredicateTest.java Wed Jul 10 15:21:28 2013 @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.jms.impl; + +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThat; + +import org.junit.Test; + +public class SimplePredicateTest +{ + private static final String DESCRIPTION = "description1"; + private static final String PREDICATED_OBJECT = "predicatedObject1"; + + @Test + public void testGetCurrentStateWithDefaultPredicate() + { + SimplePredicate predicate = new SimplePredicate() + { + @Override + public boolean test() + { + return true; + } + }; + String currentState = predicate.getCurrentState(); + assertThat(currentState, containsString(predicate.getClass().getName())); + } + + @Test + public void testGetCurrentStateWithAPredicatedObject() + { + SimplePredicate predicate = new SimplePredicate(DESCRIPTION, PREDICATED_OBJECT) + { + @Override + public boolean test() + { + return true; + } + }; + + String currentState = predicate.getCurrentState(); + assertThat(currentState, containsString(DESCRIPTION)); + assertThat(currentState, containsString(PREDICATED_OBJECT)); + } + + @Test + public void testToStringWithDefaultPredicate() + { + SimplePredicate predicate = new SimplePredicate() + { + @Override + public boolean test() + { + return true; + } + }; + + assertThat(predicate.toString(), containsString(predicate.getClass().getName())); + } + + @Test + public void testToStringWithAPredicatedObject() + { + SimplePredicate predicate = new SimplePredicate(DESCRIPTION, PREDICATED_OBJECT) + { + @Override + public boolean test() + { + return true; + } + }; + + assertThat(predicate.toString(), containsString(DESCRIPTION)); + assertThat( + "toString should not return the predicated object because " + + "this is too verbose for the usual toString use cases", + predicate.toString(), + not(containsString(PREDICATED_OBJECT))); + } +} Modified: qpid/jms/trunk/src/test/resources/logging.properties URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/resources/logging.properties?rev=1501794&r1=1501793&r2=1501794&view=diff ============================================================================== --- qpid/jms/trunk/src/test/resources/logging.properties (original) +++ qpid/jms/trunk/src/test/resources/logging.properties Wed Jul 10 15:21:28 2013 @@ -27,7 +27,3 @@ java.util.logging.ConsoleHandler.formatt java.util.logging.SimpleFormatter.format = %1$tF %1$tT.%tL %4$s %3$s %5$s%n .level = INFO - -# TODO-PH remove -org.apache.qpid.proton.logging.LoggingProtocolTracer.sent.level = ALL -org.apache.qpid.proton.logging.LoggingProtocolTracer.received.level = ALL --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
