Author: kwall
Date: Sun Jul 26 19:39:00 2015
New Revision: 1692750

URL: http://svn.apache.org/r1692750
Log:
QPID-6655: [Java Broker] Refactor slow authentication detection to use a ticker

* Also added system tests around connection negotiation
* Prevented a decoding exception on the 0-10 path leaving open a connection

Added:
    
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1692750&r1=1692749&r2=1692750&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
 Sun Jul 26 19:39:00 2015
@@ -36,12 +36,17 @@ import javax.security.auth.Subject;
 
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.SettableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.server.connection.ConnectionPrincipal;
+import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.model.AbstractConfiguredObject;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Session;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.StateTransition;
@@ -54,12 +59,15 @@ import org.apache.qpid.server.util.Actio
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.Ticker;
 
 public abstract class AbstractAMQPConnection<C extends 
AbstractAMQPConnection<C>>
         extends AbstractConfiguredObject<C>
         implements ProtocolEngine, AMQPConnection<C>
 
 {
+    private static final Logger _logger = 
LoggerFactory.getLogger(AbstractAMQPConnection.class);
+
     private final Broker<?> _broker;
     private final NetworkConnection _network;
     private final AmqpPort<?> _port;
@@ -80,6 +88,8 @@ public abstract class AbstractAMQPConnec
     private final SettableFuture<Void> _transportClosedFuture = 
SettableFuture.create();
     private final SettableFuture<Void> _modelClosedFuture = 
SettableFuture.create();
     private final AtomicBoolean _modelClosing = new AtomicBoolean();
+    private volatile long _lastReadTime;
+    private volatile long _lastWriteTime;
 
     public AbstractAMQPConnection(Broker<?> broker,
                                   NetworkConnection network,
@@ -132,6 +142,16 @@ public abstract class AbstractAMQPConnec
         return attributes;
     }
 
+    @Override
+    protected void onOpen()
+    {
+        super.onOpen();
+        long maxAuthDelay = _port.getContextValue(Long.class, 
Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY);
+        _aggregateTicker.addTicker(new SlowConnectionOpenTicker(maxAuthDelay));
+        _lastReadTime = _lastWriteTime = getCreatedTime();
+
+    }
+
     public final Broker<?> getBroker()
     {
         return _broker;
@@ -158,11 +178,33 @@ public abstract class AbstractAMQPConnec
         return _aggregateTicker;
     }
 
-    public long getLastIoTime()
+    public final long getLastIoTime()
     {
         return Math.max(getLastReadTime(), getLastWriteTime());
     }
 
+    @Override
+    public final long getLastReadTime()
+    {
+        return _lastReadTime;
+    }
+
+    public final void updateLastReadTime()
+    {
+        _lastReadTime = System.currentTimeMillis();
+    }
+
+    @Override
+    public final long getLastWriteTime()
+    {
+        return _lastWriteTime;
+    }
+
+    public final void updateLastWriteTime()
+    {
+        _lastWriteTime = System.currentTimeMillis();
+    }
+
     public final long getConnectionId()
     {
         return _connectionId;
@@ -352,8 +394,6 @@ public abstract class AbstractAMQPConnec
         getVirtualHost().registerConnection(this);
     }
 
-
-
     @Override
     public boolean isIncoming()
     {
@@ -413,11 +453,6 @@ public abstract class AbstractAMQPConnec
     }
 
     @Override
-    protected void onClose()
-    {
-    }
-
-    @Override
     public <C extends ConfiguredObject> ListenableFuture<C> 
addChildAsync(Class<C> childClass, Map<String, Object> attributes, 
ConfiguredObject... otherParents)
     {
         if(childClass == Session.class)
@@ -473,4 +508,45 @@ public abstract class AbstractAMQPConnec
     {
         _transportClosedFuture.set(null);
     }
+
+    protected abstract EventLogger getEventLogger();
+
+    private class SlowConnectionOpenTicker implements Ticker
+    {
+        private final long _allowedTime;
+
+        public SlowConnectionOpenTicker(long timeoutTime)
+        {
+            _allowedTime = timeoutTime;
+        }
+
+        @Override
+        public int getTimeToNextTick(final long currentTime)
+        {
+            final int timeToNextTick = (int) (getCreatedTime() + _allowedTime 
- currentTime);
+            return timeToNextTick;
+        }
+
+        @Override
+        public int tick(final long currentTime)
+        {
+            int nextTick = getTimeToNextTick(currentTime);
+            if(nextTick <= 0)
+            {
+                if (getAuthorizedPrincipal() == null)
+                {
+                    _logger.warn("Connection has taken more than {} ms to 
establish identity.  Closing as possible DoS.",
+                                 _allowedTime);
+                    getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
+                    _network.close();
+                }
+                else
+                {
+                    _aggregateTicker.removeTicker(this);
+                }
+            }
+            return nextTick;
+        }
+    }
+
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1692750&r1=1692749&r2=1692750&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
 Sun Jul 26 19:39:00 2015
@@ -421,11 +421,12 @@ public class MultiVersionProtocolEngine
                 //delegate. Also save most recent supported version and 
associated reply header bytes
                 for(int i = 0; newDelegate == null && i < _creators.length; 
i++)
                 {
-                    if(_supported.contains(_creators[i].getVersion()))
+                    final ProtocolEngineCreator creator = _creators[i];
+                    if(_supported.contains(creator.getVersion()))
                     {
-                        supportedReplyBytes = 
_creators[i].getHeaderIdentifier();
-                        supportedReplyVersion = _creators[i].getVersion();
-                        byte[] compareBytes = 
_creators[i].getHeaderIdentifier();
+                        supportedReplyBytes = creator.getHeaderIdentifier();
+                        supportedReplyVersion = creator.getVersion();
+                        byte[] compareBytes = creator.getHeaderIdentifier();
                         boolean equal = true;
                         for(int j = 0; equal && j<compareBytes.length; j++)
                         {
@@ -433,21 +434,21 @@ public class MultiVersionProtocolEngine
                         }
                         if(equal)
                         {
-                            newDelegate = 
_creators[i].newProtocolEngine(_broker,
-                                                                         
_network, _port, _transport, _id,
-                                                                         
_aggregateTicker);
-                            if(newDelegate == null && 
_creators[i].getSuggestedAlternativeHeader() != null)
+                            newDelegate = creator.newProtocolEngine(_broker,
+                                                                    _network, 
_port, _transport, _id,
+                                                                    
_aggregateTicker);
+                            if(newDelegate == null && 
creator.getSuggestedAlternativeHeader() != null)
                             {
-                                defaultSupportedReplyBytes = 
_creators[i].getSuggestedAlternativeHeader();
+                                defaultSupportedReplyBytes = 
creator.getSuggestedAlternativeHeader();
                             }
                         }
                     }
 
                     //If there is a configured default reply to an unsupported 
version initiation,
                     //then save the associated reply header bytes when we 
encounter them
-                    if(defaultSupportedReplyBytes == null && 
_defaultSupportedReply != null && _creators[i].getVersion() == 
_defaultSupportedReply)
+                    if(defaultSupportedReplyBytes == null && 
_defaultSupportedReply != null && creator.getVersion() == 
_defaultSupportedReply)
                     {
-                        defaultSupportedReplyBytes = 
_creators[i].getHeaderIdentifier();
+                        defaultSupportedReplyBytes = 
creator.getHeaderIdentifier();
                     }
                 }
 

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1692750&r1=1692749&r2=1692750&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
 Sun Jul 26 19:39:00 2015
@@ -35,20 +35,23 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.store.StoreException;
 import org.apache.qpid.server.transport.AbstractAMQPConnection;
 import org.apache.qpid.server.transport.ProtocolEngine;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.logging.messages.ConnectionMessages;
 import org.apache.qpid.server.model.Consumer;
-import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.ConnectionDelegate;
 import org.apache.qpid.transport.Constant;
@@ -64,11 +67,8 @@ public class AMQPConnection_0_10 extends
 
 
     private final NetworkConnection _network;
-    private ServerConnection _connection;
+    private final ServerConnection _connection;
 
-    private long _createTime = System.currentTimeMillis();
-    private volatile long _lastReadTime = _createTime;
-    private volatile long _lastWriteTime = _createTime;
     private volatile boolean _transportBlockedForWriting;
 
     private final AtomicReference<Thread> _messageAssignmentSuspended = new 
AtomicReference<>();
@@ -164,7 +164,7 @@ public class AMQPConnection_0_10 extends
             @Override
             public void send(ByteBuffer msg)
             {
-                _lastWriteTime = System.currentTimeMillis();
+                updateLastWriteTime();
                 sender.send(msg);
             }
 
@@ -183,18 +183,6 @@ public class AMQPConnection_0_10 extends
         };
     }
 
-    @Override
-    public long getLastReadTime()
-    {
-        return _lastReadTime;
-    }
-
-    @Override
-    public long getLastWriteTime()
-    {
-        return _lastWriteTime;
-    }
-
     public void received(final ByteBuffer buf)
     {
         Subject.doAs(_connection.getAuthorizedSubject(), new 
PrivilegedAction<Object>()
@@ -202,22 +190,28 @@ public class AMQPConnection_0_10 extends
             @Override
             public Object run()
             {
-                _lastReadTime = System.currentTimeMillis();
-                if (_connection.getAuthorizedPrincipal() == null &&
-                    (_lastReadTime - _createTime) > 
_connection.getPort().getContextValue(Long.class,
-                                                                               
           Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))
+                updateLastReadTime();
+                try
                 {
-
-                    _logger.warn("Connection has taken more than "
-                                 + _connection.getPort()
-                                         .getContextValue(Long.class, 
Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)
-                                 + "ms to establish identity.  Closing as 
possible DoS.");
-                    
_connection.getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
-                    _network.close();
-
+                    _inputHandler.received(buf);
+                    _connection.receivedComplete();
+                }
+                catch (IllegalArgumentException | IllegalStateException e)
+                {
+                    throw new ConnectionScopedRuntimeException(e);
                 }
-                _inputHandler.received(buf);
-                _connection.receivedComplete();
+                catch (StoreException e)
+                {
+                    if (getVirtualHost().getState() == State.ACTIVE)
+                    {
+                        throw new ServerScopedRuntimeException(e);
+                    }
+                    else
+                    {
+                        throw new ConnectionScopedRuntimeException(e);
+                    }
+                }
+
                 return null;
             }
         });
@@ -337,7 +331,7 @@ public class AMQPConnection_0_10 extends
     public void closeSessionAsync(final AMQSessionModel<?> session,
                                   final AMQConstant cause, final String 
message)
     {
-        _connection.closeSessionAsync((ServerSession)session, cause, message);
+        _connection.closeSessionAsync((ServerSession) session, cause, message);
     }
 
     public void block()
@@ -365,14 +359,15 @@ public class AMQPConnection_0_10 extends
         _connection.unblock();
     }
 
-    public LogSubject getLogSubject()
+    public long getSessionCountLimit()
     {
-        return _connection.getLogSubject();
+        return _connection.getSessionCountLimit();
     }
 
-    public long getSessionCountLimit()
+    @Override
+    protected EventLogger getEventLogger()
     {
-        return _connection.getSessionCountLimit();
+        return _connection.getEventLogger();
     }
 
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1692750&r1=1692749&r2=1692750&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
 Sun Jul 26 19:39:00 2015
@@ -332,7 +332,7 @@ public class AMQChannel
             @Override
             public long getActivityTime()
             {
-                return _connection.getLastReceivedTime();
+                return _connection.getLastReadTime();
             }
         });
         _txnStarts.incrementAndGet();
@@ -441,7 +441,7 @@ public class AMQChannel
                 final MessageMetaData messageMetaData =
                         new MessageMetaData(messagePublishInfo,
                                             contentHeader,
-                                            
getConnection().getLastReceivedTime());
+                                            getConnection().getLastReadTime());
 
                 final MessageHandle<MessageMetaData> handle = 
_messageStore.addMessage(messageMetaData);
                 int bodyCount = _currentMessage.getBodyCount();

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1692750&r1=1692749&r2=1692750&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
 Sun Jul 26 19:39:00 2015
@@ -41,7 +41,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -74,7 +73,6 @@ import org.apache.qpid.server.message.In
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Consumer;
-import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.State;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
@@ -88,8 +86,6 @@ import org.apache.qpid.server.util.Conne
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.transport.ByteBufferSender;
-import org.apache.qpid.transport.SenderClosedException;
-import org.apache.qpid.transport.SenderException;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.NetworkConnection;
@@ -119,7 +115,6 @@ public class AMQPConnection_0_8
 
     private static final long CLOSE_OK_TIMEOUT = 10000l;
 
-    private final long _creationTime;
     private final AtomicBoolean _stateChanged = new AtomicBoolean();
     private final AtomicReference<Action<ProtocolEngine>> _workListener = new 
AtomicReference<>();
 
@@ -139,7 +134,7 @@ public class AMQPConnection_0_8
      */
     private final Set<AMQChannel> _channelsForCurrentMessage = new HashSet<>();
 
-    private AMQDecoder _decoder;
+    private final AMQDecoder _decoder;
 
     private SaslServer _saslServer;
 
@@ -147,16 +142,14 @@ public class AMQPConnection_0_8
 
     private ProtocolVersion _protocolVersion = 
ProtocolVersion.getLatestSupportedVersion();
     private final MethodRegistry _methodRegistry = new 
MethodRegistry(_protocolVersion);
-    private final List<Action<? super AMQPConnection_0_8>> 
_connectionCloseTaskList =
-            new CopyOnWriteArrayList<>();
 
     private final Queue<Action<? super AMQPConnection_0_8>> _asyncTaskList =
             new ConcurrentLinkedQueue<>();
 
-    private Map<Integer, Long> _closingChannelsList = new 
ConcurrentHashMap<>();
+    private final Map<Integer, Long> _closingChannelsList = new 
ConcurrentHashMap<>();
     private ProtocolOutputConverter _protocolOutputConverter;
 
-    private Object _reference = new Object();
+    private final Object _reference = new Object();
 
     private LogSubject _logSubject;
 
@@ -167,12 +160,9 @@ public class AMQPConnection_0_8
     private final ByteBufferSender _sender;
 
     private volatile boolean _deferFlush;
-    private volatile long _lastReceivedTime = System.currentTimeMillis();
-    private volatile long _lastWriteTime = System.currentTimeMillis();
     private boolean _blocking;
 
     private volatile boolean _closeWhenNoRoute;
-    private boolean _authenticated;
     private boolean _compressionSupported;
     private int _messageCompressionThreshold;
     private int _currentClassId;
@@ -215,9 +205,6 @@ public class AMQPConnection_0_8
             }
         });
         _closeWhenNoRoute = getBroker().getConnection_closeWhenNoRoute();
-
-        _creationTime = System.currentTimeMillis();
-
     }
 
     @Override
@@ -300,69 +287,27 @@ public class AMQPConnection_0_8
             @Override
             public Void run()
             {
-
-                final long arrivalTime = System.currentTimeMillis();
-                if (!_authenticated &&
-                    (arrivalTime - _creationTime) > 
getPort().getContextValue(Long.class,
-                                                                              
Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY))
-                {
-                    _logger.warn("Connection has taken more than "
-                                 + getPort().getContextValue(Long.class, 
Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)
-                                 + "ms to establish identity.  Closing as 
possible DoS.");
-                    getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
-                    closeNetworkConnection();
-                }
-                _lastReceivedTime = arrivalTime;
+                updateLastReadTime();
 
                 try
                 {
                     _decoder.decodeBuffer(msg);
                     receivedCompleteAllChannels();
                 }
-                catch (ConnectionScopedRuntimeException e)
+                catch (TransportException | AMQFrameDecodingException | 
IOException e)
                 {
                     _logger.error("Unexpected exception", e);
-                    closeNetworkConnection();
-                }
-                catch (AMQProtocolVersionException e)
-                {
-                    _logger.error("Unexpected protocol version", e);
-                    closeNetworkConnection();
-                }
-                catch (SenderClosedException e)
-                {
-                    _logger.debug("Sender was closed abruptly, closing 
network.", e);
-                    closeNetworkConnection();
-                }
-                catch (SenderException e)
-                {
-                    _logger.info("Unexpected exception on send, closing 
network.", e);
-                    closeNetworkConnection();
-                }
-                catch (TransportException e)
-                {
-                    _logger.error("Unexpected transport exception", e);
-                    closeNetworkConnection();
-                }
-                catch (AMQFrameDecodingException e)
-                {
-                    _logger.error("Frame decoding", e);
-                    closeNetworkConnection();
-                }
-                catch (IOException e)
-                {
-                    _logger.error("I/O Exception", e);
-                    closeNetworkConnection();
+                    throw new ConnectionScopedRuntimeException(e);
                 }
                 catch (StoreException e)
                 {
                     if (_virtualHost.getState() == State.ACTIVE)
                     {
-                        throw e;
+                        throw new ServerScopedRuntimeException(e);
                     }
                     else
                     {
-                        _logger.error("Store Exception ignored as virtual host 
no longer active", e);
+                        throw new ConnectionScopedRuntimeException(e);
                     }
                 }
                 return null;
@@ -496,8 +441,7 @@ public class AMQPConnection_0_8
         }
 
 
-        final long time = System.currentTimeMillis();
-        _lastWriteTime = time;
+        updateLastWriteTime();
 
         if(!_deferFlush)
         {
@@ -868,7 +812,6 @@ public class AMQPConnection_0_8
             throw new IllegalArgumentException("authorizedSubject cannot be 
null");
         }
 
-        _authenticated = true;
         getSubject().getPrincipals().addAll(authorizedSubject.getPrincipals());
         
getSubject().getPrivateCredentials().addAll(authorizedSubject.getPrivateCredentials());
         
getSubject().getPublicCredentials().addAll(authorizedSubject.getPublicCredentials());
@@ -953,11 +896,6 @@ public class AMQPConnection_0_8
         writeFrame(HeartbeatBody.FRAME);
     }
 
-    public long getLastReceivedTime()
-    {
-        return _lastReceivedTime;
-    }
-
     public long getSessionCountLimit()
     {
         return getMaximumNumberOfChannels();
@@ -1514,18 +1452,6 @@ public class AMQPConnection_0_8
         return _reference;
     }
 
-    @Override
-    public long getLastReadTime()
-    {
-        return _lastReceivedTime;
-    }
-
-    @Override
-    public long getLastWriteTime()
-    {
-        return _lastWriteTime;
-    }
-
     public boolean isCloseWhenNoRoute()
     {
         return _closeWhenNoRoute;

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java?rev=1692750&r1=1692749&r2=1692750&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
 Sun Jul 26 19:39:00 2015
@@ -61,6 +61,7 @@ public class AMQProtocolEngineTest exten
         when(_port.getChildExecutor()).thenReturn(childExecutor);
         when(_port.getCategoryClass()).thenReturn(Port.class);
         when(_port.getModel()).thenReturn(BrokerModel.getInstance());
+        when(_port.getContextValue(Long.class, 
Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)).thenReturn(2500l);
 
         _network = mock(NetworkConnection.class);
         _transport = Transport.TCP;

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1692750&r1=1692749&r2=1692750&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
 Sun Jul 26 19:39:00 2015
@@ -53,7 +53,7 @@ import org.apache.qpid.amqp_1_0.type.Sym
 import org.apache.qpid.common.QpidProperties;
 import org.apache.qpid.common.ServerPropertyNames;
 import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.logging.EventLogger;
 import org.apache.qpid.server.model.VirtualHost;
 import org.apache.qpid.server.protocol.ConnectionClosingTicker;
 import org.apache.qpid.server.transport.AbstractAMQPConnection;
@@ -72,6 +72,7 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.transport.NetworkConnectionScheduler;
 import org.apache.qpid.server.transport.NonBlockingConnection;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.NetworkConnection;
@@ -82,11 +83,9 @@ public class AMQPConnection_1_0 extends
 
     public static Logger LOGGER = 
LoggerFactory.getLogger(AMQPConnection_1_0.class);
 
-    public static final long CLOSE_REPONSE_TIMEOUT = 10000l;
+    public static final long CLOSE_RESPONSE_TIMEOUT = 10000l;
+    private final Broker<?> _broker;
 
-    private volatile long _lastReadTime;
-    private volatile long _lastWriteTime;
-    private long _createTime = System.currentTimeMillis();
     private ConnectionEndpoint _endpoint;
     private final AtomicBoolean _stateChanged = new AtomicBoolean();
     private final AtomicReference<Action<ProtocolEngine>> _workListener = new 
AtomicReference<>();
@@ -154,6 +153,7 @@ public class AMQPConnection_1_0 extends
                               final boolean useSASL)
     {
         super(broker, network, port, transport, id, aggregateTicker);
+        _broker = broker;
         _connection = createConnection(broker, network, port, transport, id, 
useSASL);
 
         _connection.setAmqpConnection(this);
@@ -346,7 +346,7 @@ public class AMQPConnection_1_0 extends
     {
         try
         {
-            _lastReadTime = System.currentTimeMillis();
+            updateLastReadTime();
             if(RAW_LOGGER.isDebugEnabled())
             {
                 ByteBuffer dup = msg.duplicate();
@@ -495,12 +495,6 @@ public class AMQPConnection_1_0 extends
     }
 
 
-    public long getCreateTime()
-    {
-        return _createTime;
-    }
-
-
     public boolean canSend()
     {
         return true;
@@ -519,7 +513,7 @@ public class AMQPConnection_1_0 extends
 
         synchronized (_sendLock)
         {
-            _lastWriteTime = System.currentTimeMillis();
+            updateLastWriteTime();
             if (FRAME_LOGGER.isDebugEnabled())
             {
                 FRAME_LOGGER.debug("SEND["
@@ -573,21 +567,11 @@ public class AMQPConnection_1_0 extends
 
     public void close()
     {
-        getAggregateTicker().addTicker(new 
ConnectionClosingTicker(System.currentTimeMillis()+ CLOSE_REPONSE_TIMEOUT,
+        getAggregateTicker().addTicker(new 
ConnectionClosingTicker(System.currentTimeMillis()+ CLOSE_RESPONSE_TIMEOUT,
                                                                    
getNetwork()));
 
     }
 
-    public long getLastReadTime()
-    {
-        return _lastReadTime;
-    }
-
-    public long getLastWriteTime()
-    {
-        return _lastWriteTime;
-    }
-
     @Override
     public boolean isTransportBlockedForWriting()
     {
@@ -684,14 +668,22 @@ public class AMQPConnection_1_0 extends
         _connection.unblock();
     }
 
-    public LogSubject getLogSubject()
-    {
-        return _connection.getLogSubject();
-    }
-
     public long getSessionCountLimit()
     {
         return _connection.getSessionCountLimit();
     }
 
+    @Override
+    protected EventLogger getEventLogger()
+    {
+        final VirtualHostImpl virtualHost = _connection.getVirtualHost();
+        if (virtualHost !=  null)
+        {
+            return virtualHost.getEventLogger();
+        }
+        else
+        {
+            return _broker.getEventLogger();
+        }
+    }
 }

Modified: 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java?rev=1692750&r1=1692749&r2=1692750&view=diff
==============================================================================
--- 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
 (original)
+++ 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/test/unit/client/connection/BrokerClosesClientConnectionTest.java
 Sun Jul 26 19:39:00 2015
@@ -42,8 +42,6 @@ import javax.jms.Session;
 /**
  * Tests the behaviour of the client when the Broker terminates client 
connection
  * by the Broker being shutdown gracefully or otherwise.
- *
- * @see ManagedConnectionMBeanTest
  */
 public class BrokerClosesClientConnectionTest extends QpidBrokerTestCase
 {

Added: 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java?rev=1692750&view=auto
==============================================================================
--- 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
 (added)
+++ 
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
 Sun Jul 26 19:39:00 2015
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.qpid.framing.ByteArrayDataInput;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.protocol.v0_10.ServerDisassembler;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.transport.network.Frame;
+
+public class ProtocolNegotiationTest extends QpidBrokerTestCase
+{
+    private static final int SO_TIMEOUT = 5000;
+    public static final int AMQP_HEADER_LEN = 8;
+    private ProtocolVersion _expectedProtocolInit;
+
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        _expectedProtocolInit = 
convertProtocolToProtocolVersion(getBrokerProtocol());
+    }
+
+    public void 
testWrongProtocolHeaderSent_BrokerRespondsWithSupportedProtocol() throws 
Exception
+    {
+        try(Socket socket = new Socket())
+        {
+            socket.setSoTimeout(SO_TIMEOUT);
+
+            final InetSocketAddress inetSocketAddress = new 
InetSocketAddress("localhost", getPort());
+            _logger.debug("Making connection to {}", inetSocketAddress);
+
+            socket.connect(inetSocketAddress);
+
+            assertTrue("Expected socket to be connected", 
socket.isConnected());
+
+            socket.getOutputStream().write("NOTANAMQPHEADER".getBytes());
+            byte[] receivedHeader = new byte[AMQP_HEADER_LEN];
+            int len = socket.getInputStream().read(receivedHeader);
+            assertEquals("Unexpected number of bytes available from socket", 
receivedHeader.length, len);
+            assertEquals("Expected end-of-stream from socket signifying socket 
closed)",
+                         -1,
+                         socket.getInputStream().read());
+
+            ProtocolInitiation protocolInitiation = new ProtocolInitiation(new 
ByteArrayDataInput(receivedHeader));
+
+            assertEquals("Unexpected protocol initialisation", 
_expectedProtocolInit, protocolInitiation.checkVersion());
+        }
+    }
+
+
+    public void testNoProtocolHeaderSent_BrokerClosesConnection() throws 
Exception
+    {
+        try(Socket socket = new Socket())
+        {
+            socket.setSoTimeout(SO_TIMEOUT);
+
+            final InetSocketAddress inetSocketAddress = new 
InetSocketAddress("localhost", getPort());
+            _logger.debug("Making connection to {}", inetSocketAddress);
+
+            socket.connect(inetSocketAddress);
+
+            assertTrue("Expected socket to be connected", 
socket.isConnected());
+
+            int c = 0;
+            try
+            {
+                c = socket.getInputStream().read();
+                _logger.debug("Read {}", c);
+
+            }
+            catch(SocketTimeoutException ste)
+            {
+                fail("Broker did not close connection with no activity within 
expected timeout");
+            }
+
+            assertEquals("Expected end-of-stream from socket signifying socket 
closed)", -1, c);
+        }
+    }
+
+    public void testNoConnectionOpenSent_BrokerClosesConnection() throws 
Exception
+    {
+        setSystemProperty(Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY, 
"1000");
+
+        try(Socket socket = new Socket())
+        {
+            socket.setSoTimeout(5000);
+
+            final ProtocolVersion protocolVersion = 
convertProtocolToProtocolVersion(getBrokerProtocol());
+            ProtocolInitiation pi = new ProtocolInitiation(protocolVersion);
+
+            final InetSocketAddress inetSocketAddress = new 
InetSocketAddress("localhost", getPort());
+            _logger.debug("Making connection to {}", inetSocketAddress);
+
+            socket.connect(inetSocketAddress);
+
+            assertTrue("Expected socket to be connected", 
socket.isConnected());
+
+            final DataOutputStream dataOutputStream = new 
DataOutputStream(socket.getOutputStream());
+            final InputStream inputStream = socket.getInputStream();
+
+            // write header
+            pi.writePayload(dataOutputStream);
+
+            // reader header
+            byte[] receivedHeader = new byte[AMQP_HEADER_LEN];
+            int len = inputStream.read(receivedHeader);
+            assertEquals("Unexpected number of bytes available from socket", 
receivedHeader.length, len);
+
+            // Send heartbeat frames to simulate a client that, although 
active, fails to
+            // authenticate within the allowed period
+
+            long timeout = System.currentTimeMillis() + 3000;
+            boolean brokenPipe = false;
+            while(timeout > System.currentTimeMillis())
+            {
+                if (!writeHeartbeat(dataOutputStream));
+                {
+                    brokenPipe = true;
+                    break;
+                }
+            }
+            assertTrue("Expected pipe to become broken within "
+                       + Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY + " 
timeout", brokenPipe);
+        }
+    }
+
+    public void testIllegalFrameSent_BrokerClosesConnection() throws Exception
+    {
+        try(Socket socket = new Socket())
+        {
+            socket.setSoTimeout(5000);
+
+            final ProtocolVersion protocolVersion = 
convertProtocolToProtocolVersion(getBrokerProtocol());
+            ProtocolInitiation pi = new ProtocolInitiation(protocolVersion);
+
+            final InetSocketAddress inetSocketAddress = new 
InetSocketAddress("localhost", getPort());
+            _logger.debug("Making connection to {}", inetSocketAddress);
+
+            socket.connect(inetSocketAddress);
+
+            assertTrue("Expected socket to be connected", 
socket.isConnected());
+
+            final DataOutputStream dataOutputStream = new 
DataOutputStream(socket.getOutputStream());
+            final InputStream inputStream = socket.getInputStream();
+
+            // write header
+            pi.writePayload(dataOutputStream);
+
+            // reader header
+            byte[] receivedHeader = new byte[AMQP_HEADER_LEN];
+            int len = inputStream.read(receivedHeader);
+            assertEquals("Unexpected number of bytes available from socket", 
receivedHeader.length, len);
+
+            dataOutputStream.write("NOTANAMPQFRAME".getBytes());
+
+
+        }
+    }
+
+    private boolean writeHeartbeat(final DataOutputStream dataOutputStream)
+            throws IOException
+    {
+        final AtomicBoolean success = new AtomicBoolean(true);
+        if (isBroker010())
+        {
+            ConnectionHeartbeat heartbeat = new ConnectionHeartbeat();
+            ServerDisassembler serverDisassembler = new ServerDisassembler(new 
ByteBufferSender()
+            {
+                @Override
+                public void send(final ByteBuffer msg)
+                {
+                    try
+                    {
+                        dataOutputStream.write(msg.array(), msg.position(), 
msg.remaining());
+                    }
+                    catch (SocketException se)
+                    {
+
+                        success.set(false);
+                    }
+                    catch(IOException e)
+                    {
+                        throw new RuntimeException("Unexpected IOException", 
e);
+                    }
+                }
+
+                @Override
+                public void flush()
+                {
+                }
+
+                @Override
+                public void close()
+                {
+                }
+            }, Frame.HEADER_SIZE + 1);
+            serverDisassembler.command(null, heartbeat);
+        }
+        else
+        {
+            try
+            {
+                HeartbeatBody.FRAME.writePayload(dataOutputStream);
+            }
+            catch (SocketException se)
+            {
+                success.set(false);
+            }
+        }
+
+        return success.get();
+    }
+
+    private ProtocolVersion convertProtocolToProtocolVersion(final Protocol p)
+    {
+        final ProtocolVersion protocolVersion;
+        switch(p)
+        {
+            case AMQP_0_10:
+                protocolVersion = ProtocolVersion.v0_10;
+                break;
+            case AMQP_0_9_1:
+                protocolVersion = ProtocolVersion.v0_91;
+                break;
+            case AMQP_0_9:
+                protocolVersion = ProtocolVersion.v0_9;
+                break;
+            case AMQP_0_8:
+                protocolVersion = ProtocolVersion.v0_8;
+                break;
+            default:
+                throw new IllegalArgumentException("Unexpected " + p.name());
+        }
+        return protocolVersion;
+    }
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to