Author: rgodfrey
Date: Fri Jun  5 13:54:57 2015
New Revision: 1683757

URL: http://svn.apache.org/r1683757
Log:
QPID-6573 : Add broker connection close guard to protect against a client that 
does not respond to the connection close command in a timely manner

Added:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
   (with props)
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/AggregateTicker.java
   (with props)
Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/plugin/ProtocolEngineCreator.java
 Fri Jun  5 13:54:57 2015
@@ -24,6 +24,7 @@ import org.apache.qpid.server.model.Brok
 import org.apache.qpid.server.model.Protocol;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
+import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.NetworkConnection;
 
 public interface ProtocolEngineCreator extends Pluggable
@@ -34,6 +35,6 @@ public interface ProtocolEngineCreator e
                                            NetworkConnection network,
                                            AmqpPort<?> port,
                                            Transport transport,
-                                           long id);
+                                           long id, final AggregateTicker 
aggregateTicker);
 }
 

Added: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java?rev=1683757&view=auto
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
 (added)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
 Fri Jun  5 13:54:57 2015
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol;
+
+import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.transport.network.Ticker;
+
+public class ConnectionClosingTicker implements Ticker
+{
+    private final long _timeoutTime;
+    private final NetworkConnection _network;
+
+    public ConnectionClosingTicker(final long timeoutTime, final 
NetworkConnection network)
+    {
+        _timeoutTime = timeoutTime;
+        _network = network;
+    }
+
+    @Override
+    public int getTimeToNextTick(final long currentTime)
+    {
+        return (int) (_timeoutTime - currentTime);
+    }
+
+    @Override
+    public int tick(final long currentTime)
+    {
+        int nextTick = getTimeToNextTick(currentTime);
+        if(nextTick <= 0)
+        {
+            _network.close();
+        }
+        return nextTick;
+    }
+}

Propchange: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
 Fri Jun  5 13:54:57 2015
@@ -24,7 +24,6 @@ package org.apache.qpid.server.protocol;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
-import java.security.Principal;
 import java.security.cert.Certificate;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -44,6 +43,7 @@ import org.apache.qpid.server.plugin.Pro
 import org.apache.qpid.server.security.ManagedPeerCertificateTrustStore;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.NetworkConnection;
 
 public class MultiVersionProtocolEngine implements ServerProtocolEngine
@@ -65,6 +65,7 @@ public class MultiVersionProtocolEngine
 
     private volatile ServerProtocolEngine _delegate = new 
SelfDelegateProtocolEngine();
     private final AtomicReference<Action<ServerProtocolEngine>> _workListener 
= new AtomicReference<>();
+    private final AggregateTicker _aggregateTicker = new AggregateTicker();
 
     public MultiVersionProtocolEngine(final Broker<?> broker,
                                       final Set<Protocol> supported,
@@ -250,6 +251,12 @@ public class MultiVersionProtocolEngine
         _delegate.clearWork();
     }
 
+    @Override
+    public AggregateTicker getAggregateTicker()
+    {
+        return _aggregateTicker;
+    }
+
     private class ClosedDelegateProtocolEngine implements ServerProtocolEngine
     {
 
@@ -384,6 +391,12 @@ public class MultiVersionProtocolEngine
         public void setTransportBlockedForWriting(final boolean blocked)
         {
         }
+
+        @Override
+        public AggregateTicker getAggregateTicker()
+        {
+            return _aggregateTicker;
+        }
     }
 
     private class SelfDelegateProtocolEngine implements ServerProtocolEngine
@@ -448,6 +461,12 @@ public class MultiVersionProtocolEngine
         }
 
         @Override
+        public AggregateTicker getAggregateTicker()
+        {
+            return _aggregateTicker;
+        }
+
+        @Override
         public void clearWork()
         {
             _hasWork.set(false);
@@ -499,8 +518,8 @@ public class MultiVersionProtocolEngine
                         if(equal)
                         {
                             newDelegate = 
_creators[i].newProtocolEngine(_broker,
-                                                                         
_network, _port, _transport, _id
-                                                                        );
+                                                                         
_network, _port, _transport, _id,
+                                                                         
_aggregateTicker);
                         }
                     }
 
@@ -566,6 +585,8 @@ public class MultiVersionProtocolEngine
 
             }
 
+
+
         }
 
         public long getConnectionId()

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/protocol/ServerProtocolEngine.java
 Fri Jun  5 13:54:57 2015
@@ -24,6 +24,7 @@ import javax.security.auth.Subject;
 
 import org.apache.qpid.protocol.ProtocolEngine;
 import org.apache.qpid.server.util.Action;
+import org.apache.qpid.transport.network.AggregateTicker;
 
 public interface ServerProtocolEngine extends ProtocolEngine
 {
@@ -51,4 +52,6 @@ public interface ServerProtocolEngine ex
     void notifyWork();
 
     void setWorkListener(Action<ServerProtocolEngine> listener);
+
+    AggregateTicker getAggregateTicker();
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
 Fri Jun  5 13:54:57 2015
@@ -83,8 +83,7 @@ class QueueConsumerImpl
     }
 
     private final ConsumerTarget _target;
-    private final StateChangeListener<ConsumerTarget, ConsumerTarget.State>
-            _listener;
+    private final StateChangeListener<ConsumerTarget, ConsumerTarget.State> 
_listener;
     private volatile QueueContext _queueContext;
     private StateChangeListener<? super QueueConsumerImpl, State> 
_stateListener = new StateChangeListener<QueueConsumerImpl, State>()
     {

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
 Fri Jun  5 13:54:57 2015
@@ -59,7 +59,6 @@ public class NonBlockingConnection imple
     private static final int NUMBER_OF_BYTES_FOR_TLS_CHECK = 6;
 
     private final SocketChannel _socketChannel;
-    private final Ticker _ticker;
     private final Object _peerPrincipalLock = new Object();
     private final SelectorThread _selector;
     private final ConcurrentLinkedQueue<ByteBuffer> _buffers = new 
ConcurrentLinkedQueue<>();
@@ -103,7 +102,6 @@ public class NonBlockingConnection imple
                                  final SelectorThread selectorThread)
     {
         _socketChannel = socketChannel;
-        _ticker = ticker;
         _selector = selectorThread;
 
         _protocolEngine = delegate;
@@ -152,7 +150,7 @@ public class NonBlockingConnection imple
 
     Ticker getTicker()
     {
-        return _ticker;
+        return _protocolEngine.getAggregateTicker();
     }
 
     SocketChannel getSocketChannel()
@@ -282,10 +280,10 @@ public class NonBlockingConnection imple
                 _workDone = false;
 
                 long currentTime = System.currentTimeMillis();
-                int tick = _ticker.getTimeToNextTick(currentTime);
+                int tick = getTicker().getTimeToNextTick(currentTime);
                 if (tick <= 0)
                 {
-                    _ticker.tick(currentTime);
+                    getTicker().tick(currentTime);
                 }
 
                 _protocolEngine.setMessageAssignmentSuspended(true);

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
 Fri Jun  5 13:54:57 2015
@@ -39,6 +39,7 @@ import org.apache.qpid.server.protocol.M
 import org.apache.qpid.server.protocol.ServerProtocolEngine;
 import org.apache.qpid.transport.NetworkTransportConfiguration;
 import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.TransportEncryption;
 import org.apache.qpid.transport.network.io.AbstractNetworkTransport;
 import org.apache.qpid.transport.network.io.IdleTimeoutTicker;
@@ -161,13 +162,16 @@ public class NonBlockingNetworkTransport
 
                 socketChannel.configureBlocking(false);
 
-                final IdleTimeoutTicker ticker = new IdleTimeoutTicker(engine, 
_timeout);
+                AggregateTicker aggregateTicker = engine.getAggregateTicker();
+
+                final IdleTimeoutTicker idleTimeoutTicker = new 
IdleTimeoutTicker(engine, _timeout);
+                aggregateTicker.addTicker(idleTimeoutTicker);
 
                 NonBlockingConnection connection =
                         new NonBlockingConnection(socketChannel,
                                                   engine,
                                                   receiveBufferSize,
-                                                  ticker,
+                                                  idleTimeoutTicker,
                                                   _encryptionSet,
                                                   _sslContext,
                                                   _config.wantClientAuth(),
@@ -188,7 +192,7 @@ public class NonBlockingNetworkTransport
                 engine.setNetworkConnection(connection, 
connection.getSender());
                 connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
 
-                ticker.setConnection(connection);
+                idleTimeoutTicker.setConnection(connection);
 
                 connection.start();
 

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngineCreator_0_10.java
 Fri Jun  5 13:54:57 2015
@@ -32,6 +32,7 @@ import org.apache.qpid.server.plugin.Plu
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
 import org.apache.qpid.server.security.SubjectCreator;
 import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.NetworkConnection;
 
 @PluggableService
@@ -69,7 +70,7 @@ public class ProtocolEngineCreator_0_10
                                                   NetworkConnection network,
                                                   AmqpPort<?> port,
                                                   Transport transport,
-                                                  long id)
+                                                  long id, final 
AggregateTicker aggregateTicker)
     {
         String fqdn = null;
         SocketAddress address = network.getLocalAddress();
@@ -86,7 +87,7 @@ public class ProtocolEngineCreator_0_10
         conn.setRemoteAddress(network.getRemoteAddress());
         conn.setLocalAddress(network.getLocalAddress());
 
-        ProtocolEngine_0_10 protocolEngine = new ProtocolEngine_0_10(conn, 
network);
+        ProtocolEngine_0_10 protocolEngine = new ProtocolEngine_0_10(conn, 
network, aggregateTicker);
         conn.setProtocolEngine(protocolEngine);
 
         return protocolEngine;

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ProtocolEngine_0_10.java
 Fri Jun  5 13:54:57 2015
@@ -41,6 +41,7 @@ import org.apache.qpid.server.protocol.A
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.Constant;
+import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.Assembler;
 import org.apache.qpid.transport.network.InputHandler;
 import org.apache.qpid.transport.network.NetworkConnection;
@@ -50,6 +51,7 @@ public class ProtocolEngine_0_10  extend
 {
     public static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
     private static final Logger _logger = 
LoggerFactory.getLogger(ProtocolEngine_0_10.class);
+    private final AggregateTicker _aggregateTicker;
 
 
     private NetworkConnection _network;
@@ -69,10 +71,11 @@ public class ProtocolEngine_0_10  extend
 
 
     public ProtocolEngine_0_10(ServerConnection conn,
-                               NetworkConnection network)
+                               NetworkConnection network, final 
AggregateTicker aggregateTicker)
     {
         super(new ServerAssembler(conn));
         _connection = conn;
+        _aggregateTicker = aggregateTicker;
 
         if(network != null)
         {
@@ -81,6 +84,12 @@ public class ProtocolEngine_0_10  extend
     }
 
     @Override
+    public AggregateTicker getAggregateTicker()
+    {
+        return _aggregateTicker;
+    }
+
+    @Override
     public boolean isMessageAssignmentSuspended()
     {
         Thread lock = _messageAssignmentSuspended.get();

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
 Fri Jun  5 13:54:57 2015
@@ -40,6 +40,7 @@ import java.util.concurrent.atomic.Atomi
 import javax.security.auth.Subject;
 
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.protocol.ConnectionClosingTicker;
 import org.apache.qpid.server.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.connection.ConnectionPrincipal;
 import org.apache.qpid.server.logging.EventLogger;
@@ -64,6 +65,7 @@ import org.apache.qpid.transport.Connect
 import org.apache.qpid.transport.ExecutionErrorCode;
 import org.apache.qpid.transport.ExecutionException;
 import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.Option;
 import org.apache.qpid.transport.ProtocolEvent;
 import org.apache.qpid.transport.Session;
 
@@ -71,6 +73,7 @@ public class ServerConnection extends Co
                                                             LogSubject, 
AuthorizationHolder
 {
 
+    public static final long CLOSE_OK_TIMEOUT = 10000l;
     private final Broker<?> _broker;
     private AtomicBoolean _logClosed = new AtomicBoolean(false);
 
@@ -175,7 +178,10 @@ public class ServerConnection extends Co
                 
_virtualHost.getConnectionRegistry().deregisterConnection(this);
             }
         }
-
+        if(state == State.CLOSING)
+        {
+            getProtocolEngine().getAggregateTicker().addTicker(new 
ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_OK_TIMEOUT, 
getNetworkConnection()));
+        }
         if (state == State.CLOSED)
         {
             logClosed();
@@ -416,6 +422,14 @@ public class ServerConnection extends Co
         });
     }
 
+    @Override
+    protected void sendConnectionClose(final ConnectionCloseCode replyCode,
+                                       final String replyText,
+                                       final Option... _options)
+    {
+        super.sendConnectionClose(replyCode, replyText, _options);
+    }
+
     protected void performDeleteTasks()
     {
         for(Action<? super ServerConnection> task : _connectionCloseTaskList)

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
 Fri Jun  5 13:54:57 2015
@@ -198,8 +198,8 @@ public class ServerConnectionDelegate ex
                 final String redirectHost = vhost.getRedirectHost(port);
                 if(redirectHost == null)
                 {
-                    sconn.invoke(new 
ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED,
-                                                     "Virtual host '" + 
vhostName + "' is not active"));
+                    
sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED,
+                                                     "Virtual host '" + 
vhostName + "' is not active");
                 }
                 else
                 {
@@ -214,14 +214,14 @@ public class ServerConnectionDelegate ex
                 if(!vhost.authoriseCreateConnection(sconn))
                 {
                     sconn.setState(Connection.State.CLOSING);
-                    sconn.invoke(new 
ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Connection not 
authorized"));
+                    
sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, "Connection 
not authorized");
                     return;
                 }
             }
             catch (AccessControlException e)
             {
                 sconn.setState(Connection.State.CLOSING);
-                sconn.invoke(new 
ConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage()));
+                
sconn.sendConnectionClose(ConnectionCloseCode.CONNECTION_FORCED, 
e.getMessage());
                 return;
             }
 
@@ -231,8 +231,8 @@ public class ServerConnectionDelegate ex
         else
         {
             sconn.setState(Connection.State.CLOSING);
-            sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH,
-                                             "Unknown virtualhost '" + 
vhostName + "'"));
+            sconn.sendConnectionClose(ConnectionCloseCode.INVALID_PATH,
+                                             "Unknown virtualhost '" + 
vhostName + "'");
         }
 
     }

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
 Fri Jun  5 13:54:57 2015
@@ -59,6 +59,7 @@ import org.apache.qpid.common.ServerProp
 import org.apache.qpid.framing.*;
 import org.apache.qpid.properties.ConnectionStartProperties;
 import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.protocol.ConnectionClosingTicker;
 import org.apache.qpid.server.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.connection.ConnectionPrincipal;
@@ -91,6 +92,7 @@ import org.apache.qpid.transport.ByteBuf
 import org.apache.qpid.transport.SenderClosedException;
 import org.apache.qpid.transport.SenderException;
 import org.apache.qpid.transport.TransportException;
+import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.NetworkConnection;
 
 public class AMQProtocolEngine implements ServerProtocolEngine,
@@ -99,6 +101,7 @@ public class AMQProtocolEngine implement
 {
 
 
+    private final AggregateTicker _aggregateTicker;
 
     enum ConnectionState
     {
@@ -118,6 +121,9 @@ public class AMQProtocolEngine implement
     private static final String BROKER_DEBUG_BINARY_DATA_LENGTH = 
"broker.debug.binaryDataLength";
     private static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80;
     private static final long AWAIT_CLOSED_TIMEOUT = 60000;
+
+    private static final long CLOSE_OK_TIMEOUT = 10000l;
+
     private final AmqpPort<?> _port;
     private final long _creationTime;
     private final AtomicBoolean _stateChanged = new AtomicBoolean();
@@ -206,48 +212,16 @@ public class AMQProtocolEngine implement
 
     private final AtomicReference<Thread> _messageAssignmentSuspended = new 
AtomicReference<>();
 
-
-    @Override
-    public boolean isMessageAssignmentSuspended()
-    {
-        Thread lock = _messageAssignmentSuspended.get();
-        return lock != null && _messageAssignmentSuspended.get() != 
Thread.currentThread();
-    }
-
-    @Override
-    public void setMessageAssignmentSuspended(final boolean 
messageAssignmentSuspended)
-    {
-        _messageAssignmentSuspended.set(messageAssignmentSuspended ? 
Thread.currentThread() : null);
-        for(AMQSessionModel<?,?> session : getSessionModels())
-        {
-            for (Consumer<?> consumer : session.getConsumers())
-            {
-                ConsumerImpl consumerImpl = (ConsumerImpl) consumer;
-                if (!messageAssignmentSuspended)
-                {
-                    consumerImpl.getTarget().notifyCurrentState();
-                }
-                else
-                {
-                    // ensure that by the time the method returns, no consumer 
can be in the process of
-                    // delivering a message.
-                    consumerImpl.getSendLock();
-                    consumerImpl.releaseSendLock();
-                }
-            }
-        }
-    }
-
-
     public AMQProtocolEngine(Broker<?> broker,
                              final NetworkConnection network,
                              final long connectionId,
                              AmqpPort<?> port,
-                             Transport transport)
+                             Transport transport, final AggregateTicker 
aggregateTicker)
     {
         _broker = broker;
         _port = port;
         _transport = transport;
+        _aggregateTicker = aggregateTicker;
         _maxNoOfChannels = broker.getConnection_sessionCountLimit();
         _decoder = new BrokerDecoder(this);
         _connectionId = connectionId;
@@ -283,6 +257,44 @@ public class AMQProtocolEngine implement
         _creationTime = System.currentTimeMillis();
     }
 
+
+    @Override
+    public AggregateTicker getAggregateTicker()
+    {
+        return _aggregateTicker;
+    }
+
+    @Override
+    public boolean isMessageAssignmentSuspended()
+    {
+        Thread lock = _messageAssignmentSuspended.get();
+        return lock != null && _messageAssignmentSuspended.get() != 
Thread.currentThread();
+    }
+
+    @Override
+    public void setMessageAssignmentSuspended(final boolean 
messageAssignmentSuspended)
+    {
+        _messageAssignmentSuspended.set(messageAssignmentSuspended ? 
Thread.currentThread() : null);
+        for(AMQSessionModel<?,?> session : getSessionModels())
+        {
+            for (Consumer<?> consumer : session.getConsumers())
+            {
+                ConsumerImpl consumerImpl = (ConsumerImpl) consumer;
+                if (!messageAssignmentSuspended)
+                {
+                    consumerImpl.getTarget().notifyCurrentState();
+                }
+                else
+                {
+                    // ensure that by the time the method returns, no consumer 
can be in the process of
+                    // delivering a message.
+                    consumerImpl.getSendLock();
+                    consumerImpl.releaseSendLock();
+                }
+            }
+        }
+    }
+
     private <T> T runAsSubject(PrivilegedAction<T> action)
     {
         return Subject.doAs(getAuthorizedSubject(), action);
@@ -913,7 +925,10 @@ public class AMQProtocolEngine implement
                 }
                 finally
                 {
-                    closeNetworkConnection();
+                    final long timeoutTime = System.currentTimeMillis() + 
CLOSE_OK_TIMEOUT;
+                    final NetworkConnection network = _network;
+                    _aggregateTicker.addTicker(new 
ConnectionClosingTicker(timeoutTime, network));
+
                 }
             }
         }

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_8.java
 Fri Jun  5 13:54:57 2015
@@ -27,6 +27,7 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.NetworkConnection;
 
 @PluggableService
@@ -62,9 +63,9 @@ public class ProtocolEngineCreator_0_8 i
                                                   NetworkConnection network,
                                                   AmqpPort<?> port,
                                                   Transport transport,
-                                                  long id)
+                                                  long id, final 
AggregateTicker aggregateTicker)
     {
-        return new AMQProtocolEngine(broker, network, id, port, transport);
+        return new AMQProtocolEngine(broker, network, id, port, transport, 
aggregateTicker);
     }
 
     private static ProtocolEngineCreator INSTANCE = new 
ProtocolEngineCreator_0_8();

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9.java
 Fri Jun  5 13:54:57 2015
@@ -27,6 +27,7 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.NetworkConnection;
 
 @PluggableService
@@ -62,9 +63,9 @@ public class ProtocolEngineCreator_0_9 i
                                                   NetworkConnection network,
                                                   AmqpPort<?> port,
                                                   Transport transport,
-                                                  long id)
+                                                  long id, final 
AggregateTicker aggregateTicker)
     {
-        return new AMQProtocolEngine(broker, network, id, port, transport);
+        return new AMQProtocolEngine(broker, network, id, port, transport, 
aggregateTicker);
     }
 
     private static ProtocolEngineCreator INSTANCE = new 
ProtocolEngineCreator_0_9();

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolEngineCreator_0_9_1.java
 Fri Jun  5 13:54:57 2015
@@ -27,6 +27,7 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.NetworkConnection;
 
 @PluggableService
@@ -63,9 +64,9 @@ public class ProtocolEngineCreator_0_9_1
                                                   NetworkConnection network,
                                                   AmqpPort<?> port,
                                                   Transport transport,
-                                                  long id)
+                                                  long id, final 
AggregateTicker aggregateTicker)
     {
-        return new AMQProtocolEngine(broker, network, id, port, transport);
+        return new AMQProtocolEngine(broker, network, id, port, transport, 
aggregateTicker);
     }
 
 

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngineTest.java
 Fri Jun  5 13:54:57 2015
@@ -36,6 +36,7 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.util.BrokerTestHelper;
 import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.NetworkConnection;
 
 public class AMQProtocolEngineTest extends QpidTestCase
@@ -73,7 +74,7 @@ public class AMQProtocolEngineTest exten
 
     public void testSetClientPropertiesForNoRouteProvidedAsString()
     {
-        AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0, 
_port, _transport);
+        AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0, 
_port, _transport, new AggregateTicker());
         assertTrue("Unexpected closeWhenNoRoute before client properties set", 
engine.isCloseWhenNoRoute());
 
         Map<String, Object> clientProperties = new HashMap<String, Object>();
@@ -85,7 +86,7 @@ public class AMQProtocolEngineTest exten
 
     public void testSetClientPropertiesForNoRouteProvidedAsBoolean()
     {
-        AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0, 
_port, _transport);
+        AMQProtocolEngine engine = new AMQProtocolEngine(_broker, _network, 0, 
_port, _transport, new AggregateTicker());
         assertTrue("Unexpected closeWhenNoRoute before client properties set", 
engine.isCloseWhenNoRoute());
 
         Map<String, Object> clientProperties = new HashMap<String, Object>();

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
 Fri Jun  5 13:54:57 2015
@@ -53,6 +53,7 @@ import org.apache.qpid.server.security.a
 import org.apache.qpid.server.security.auth.UsernamePrincipal;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.NetworkConnection;
 
 public class InternalTestProtocolSession extends AMQProtocolEngine implements 
ProtocolOutputConverter
@@ -65,7 +66,7 @@ public class InternalTestProtocolSession
 
     public InternalTestProtocolSession(VirtualHostImpl virtualHost, Broker<?> 
broker, final AmqpPort<?> port) throws AMQException
     {
-        super(broker, new TestNetworkConnection(), 
ID_GENERATOR.getAndIncrement(), port, null);
+        super(broker, new TestNetworkConnection(), 
ID_GENERATOR.getAndIncrement(), port, null, new AggregateTicker());
 
         _channelDelivers = new HashMap<Integer, Map<String, 
LinkedList<DeliveryPair>>>();
 

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngineCreator_1_0_0_SASL.java
 Fri Jun  5 13:54:57 2015
@@ -27,6 +27,7 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.PluggableService;
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
+import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.NetworkConnection;
 
 @PluggableService
@@ -62,9 +63,9 @@ public class ProtocolEngineCreator_1_0_0
                                                   NetworkConnection network,
                                                   AmqpPort<?> port,
                                                   Transport transport,
-                                                  long id)
+                                                  long id, final 
AggregateTicker aggregateTicker)
     {
-        return new ProtocolEngine_1_0_0_SASL(network, broker, id, port, 
transport);
+        return new ProtocolEngine_1_0_0_SASL(network, broker, id, port, 
transport, aggregateTicker);
     }
 
     private static ProtocolEngineCreator INSTANCE = new 
ProtocolEngineCreator_1_0_0_SASL();

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java?rev=1683757&r1=1683756&r2=1683757&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ProtocolEngine_1_0_0_SASL.java
 Fri Jun  5 13:54:57 2015
@@ -52,6 +52,7 @@ import org.apache.qpid.amqp_1_0.type.Fra
 import org.apache.qpid.amqp_1_0.type.Symbol;
 import org.apache.qpid.common.QpidProperties;
 import org.apache.qpid.common.ServerPropertyNames;
+import org.apache.qpid.server.protocol.ConnectionClosingTicker;
 import org.apache.qpid.server.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.consumer.ConsumerImpl;
 import org.apache.qpid.server.model.Broker;
@@ -63,13 +64,16 @@ import org.apache.qpid.server.security.S
 import org.apache.qpid.server.security.auth.UsernamePrincipal;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.transport.network.AggregateTicker;
 import org.apache.qpid.transport.network.NetworkConnection;
 
 public class ProtocolEngine_1_0_0_SASL implements ServerProtocolEngine, 
FrameOutputHandler
 {
 
+    public static final long CLOSE_REPONSE_TIMEOUT = 10000l;
     private final AmqpPort<?> _port;
     private final Transport _transport;
+    private final AggregateTicker _aggregateTicker;
     private long _readBytes;
     private long _writtenBytes;
 
@@ -143,19 +147,29 @@ public class ProtocolEngine_1_0_0_SASL i
 
 
 
-    public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, 
final Broker<?> broker,
-                                     long id, AmqpPort<?> port, Transport 
transport)
+    public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver,
+                                     final Broker<?> broker,
+                                     long id,
+                                     AmqpPort<?> port,
+                                     Transport transport,
+                                     final AggregateTicker aggregateTicker)
     {
         _connectionId = id;
         _broker = broker;
         _port = port;
         _transport = transport;
+        _aggregateTicker = aggregateTicker;
         if(networkDriver != null)
         {
             setNetworkConnection(networkDriver, networkDriver.getSender());
         }
     }
 
+    @Override
+    public AggregateTicker getAggregateTicker()
+    {
+        return _aggregateTicker;
+    }
 
     @Override
     public boolean isMessageAssignmentSuspended()
@@ -544,7 +558,8 @@ public class ProtocolEngine_1_0_0_SASL i
 
     public void close()
     {
-        _sender.close();
+        getAggregateTicker().addTicker(new 
ConnectionClosingTicker(System.currentTimeMillis()+ CLOSE_REPONSE_TIMEOUT, 
_network));
+
     }
 
     public void setLogOutput(final PrintWriter out)

Added: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/AggregateTicker.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/AggregateTicker.java?rev=1683757&view=auto
==============================================================================
--- 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/AggregateTicker.java
 (added)
+++ 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/AggregateTicker.java
 Fri Jun  5 13:54:57 2015
@@ -0,0 +1,66 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class AggregateTicker implements Ticker
+{
+
+    private final CopyOnWriteArrayList<Ticker> _tickers = new 
CopyOnWriteArrayList<>();
+
+    @Override
+    public int getTimeToNextTick(final long currentTime)
+    {
+        int nextTick = Integer.MAX_VALUE;
+        for(Ticker ticker : _tickers)
+        {
+            nextTick = Math.min(ticker.getTimeToNextTick(currentTime), 
nextTick);
+        }
+        return nextTick;
+    }
+
+    @Override
+    public int tick(final long currentTime)
+    {
+        int nextTick = Integer.MAX_VALUE;
+        for(Ticker ticker : _tickers)
+        {
+            nextTick = Math.min(ticker.tick(currentTime), nextTick);
+        }
+        return nextTick;
+    }
+
+    public CopyOnWriteArrayList<Ticker> getTickers()
+    {
+        return _tickers;
+    }
+
+    public void addTicker(Ticker ticker)
+    {
+        _tickers.add(ticker);
+    }
+
+    public void removeTicker(Ticker ticker)
+    {
+        _tickers.remove(ticker);
+    }
+}

Propchange: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/AggregateTicker.java
------------------------------------------------------------------------------
    svn:eol-style = native



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to