Author: orudyy
Date: Tue Apr 12 12:09:06 2016
New Revision: 1738786

URL: http://svn.apache.org/viewvc?rev=1738786&view=rev
Log:
QPID-7155: [Java Broker] Change ticker implementation to separate slow protocol 
header detection from reader/writer idle timeouts established by connection tune

          Merged from trunk with manual confict resolution 
          svn merge -c 1738607  ^/qpid/java/trunk

Modified:
    qpid/java/branches/6.0.x/   (props changed)
    
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
    
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
    
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
    
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
    
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
    
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
    
qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
    
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
    
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
    
qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
    
qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    
qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    
qpid/java/branches/6.0.x/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java

Propchange: qpid/java/branches/6.0.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Apr 12 12:09:06 2016
@@ -9,5 +9,5 @@
 /qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
 /qpid/branches/java-network-refactor/qpid/java:805429-821809
 /qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
 
657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732525,1732812,1734452,1736478,1736751,1736838,1737804,1737835,1737853,1737984,1737992,1738119,1738135,1738231,1738271,1738610,1738731
+/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
 
657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732525,1732812,1734452,1736478,1736751,1736838,1737804,1737835,1737853,1737984,1737992,1738119,1738135,1738231,1738271,1738607,1738610,1738731
 /qpid/trunk/qpid:796646-796653

Modified: 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
 Tue Apr 12 12:09:06 2016
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.DerivedAttribute;
 import org.apache.qpid.server.model.ManagedAttribute;
 import org.apache.qpid.server.model.ManagedContextDefault;
 import org.apache.qpid.server.model.ManagedObject;
@@ -98,6 +99,13 @@ public interface AmqpPort<X extends Amqp
     @ManagedContextDefault(name = PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE)
     long DEFAULT_PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE = 1024 * 1024;
 
+
+    String PROTOCOL_HANDSHAKE_TIMEOUT = "qpid.port.protocol_handshake_timeout";
+
+    @SuppressWarnings("unused")
+    @ManagedContextDefault(name = PROTOCOL_HANDSHAKE_TIMEOUT)
+    long DEFAULT_PROTOCOL_HANDSHAKE_TIMEOUT = 2000;
+
     SSLContext getSSLContext();
 
     @ManagedAttribute(defaultValue = "*")
@@ -138,6 +146,11 @@ public interface AmqpPort<X extends Amqp
     @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = 
StatisticUnit.COUNT, label = "Connections")
     int getConnectionCount();
 
+    @DerivedAttribute(description = "Maximum time allowed for a new connection 
to send a protocol header."
+                                    + " If the connection does not send a 
protocol header within this time,"
+                                    + " the connection will be aborted.")
+    long getProtocolHandshakeTimeout();
+
     VirtualHostImpl getVirtualHost(String name);
 
     boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress);

Modified: 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
 Tue Apr 12 12:09:06 2016
@@ -129,6 +129,7 @@ public class AmqpPortImpl extends Abstra
     private final SettableFuture _noConnectionsRemain = 
SettableFuture.create();
     private AcceptingTransport _transport;
     private SSLContext _sslContext;
+    private volatile long _protocolHandshakeTimeout;
 
     @ManagedObjectFactoryConstructor
     public AmqpPortImpl(Map<String, Object> attributes, Broker<?> broker)
@@ -198,7 +199,13 @@ public class AmqpPortImpl extends Abstra
         attributes.put(VirtualHostAlias.TYPE, HostNameAlias.TYPE_NAME);
         attributes.put(VirtualHostAlias.DURABLE, true);
         createVirtualHostAlias(attributes);
+    }
 
+    @Override
+    protected void onOpen()
+    {
+        super.onOpen();
+        _protocolHandshakeTimeout = getContextValue(Long.class, 
AmqpPort.PROTOCOL_HANDSHAKE_TIMEOUT);
     }
 
     @Override
@@ -637,4 +644,10 @@ public class AmqpPortImpl extends Abstra
     {
         return _connectionCount.get();
     }
+
+    @Override
+    public long getProtocolHandshakeTimeout()
+    {
+        return _protocolHandshakeTimeout;
+    }
 }

Modified: 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
 Tue Apr 12 12:09:06 2016
@@ -662,6 +662,21 @@ public abstract class AbstractAMQPConnec
         return getSessionModels().size();
     }
 
+    protected void initialiseHeartbeating(final long writerDelay, final long 
readerDelay)
+    {
+        if (writerDelay > 0)
+        {
+            _aggregateTicker.addTicker(new ServerIdleWriteTimeoutTicker(this, 
(int) writerDelay));
+            _network.setMaxWriteIdleMillis(writerDelay);
+        }
+
+        if (readerDelay > 0)
+        {
+            _aggregateTicker.addTicker(new 
ServerIdleReadTimeoutTicker(_network, this, (int) readerDelay));
+            _network.setMaxReadIdleMillis(readerDelay);
+        }
+    }
+
     @Override
     public AMQPConnection<?> getUnderlyingConnection()
     {

Modified: 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
 Tue Apr 12 12:09:06 2016
@@ -41,6 +41,7 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.logging.subjects.PortLogSubject;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.SystemConfig;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
@@ -48,6 +49,7 @@ import org.apache.qpid.server.security.M
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.network.AggregateTicker;
+import org.apache.qpid.transport.network.Ticker;
 
 public class MultiVersionProtocolEngine implements ProtocolEngine
 {
@@ -184,6 +186,11 @@ public class MultiVersionProtocolEngine
             throw new IllegalArgumentException("Unsupported socket address 
class: " + address);
         }
         _sender = network.getSender();
+
+        SlowProtocolHeaderTicker ticker = new 
SlowProtocolHeaderTicker(_port.getProtocolHandshakeTimeout(),
+                                                                       
System.currentTimeMillis());
+        _aggregateTicker.addTicker(ticker);
+        _network.addSchedulingDelayNotificationListeners(ticker);
     }
 
     @Override
@@ -591,8 +598,6 @@ public class MultiVersionProtocolEngine
         @Override
         public void readerIdle()
         {
-            
_broker.getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Protocol header 
not sent within timeout period", true));
-            _network.close();
         }
 
         @Override
@@ -617,5 +622,59 @@ public class MultiVersionProtocolEngine
         }
     }
 
+    class SlowProtocolHeaderTicker implements Ticker, 
SchedulingDelayNotificationListener
+    {
+        private final long _allowedTime;
+        private final long _createdTime;
+        private volatile long _accumulatedSchedulingDelay;
+
+        public SlowProtocolHeaderTicker(long allowedTime, long createdTime)
+        {
+            _allowedTime = allowedTime;
+            _createdTime = createdTime;
+        }
+
+        @Override
+        public int getTimeToNextTick(final long currentTime)
+        {
+            return (int) (_createdTime + _allowedTime + 
_accumulatedSchedulingDelay - currentTime);        }
+
+        @Override
+        public int tick(final long currentTime)
+        {
+            int nextTick = getTimeToNextTick(currentTime);
+            if(nextTick <= 0)
+            {
+                if (isProtocolEstablished())
+                {
+                    _aggregateTicker.removeTicker(this);
+                    _network.removeSchedulingDelayNotificationListeners(this);
+                }
+                else
+                {
+                    _logger.warn("Connection has taken more than {} ms to send 
complete protocol header.  Closing as possible DoS.",
+                                 _allowedTime);
+                    
_broker.getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Protocol header 
not received within timeout period", true));
+                    _network.close();
+                }
+            }
+            return nextTick;
+        }
+
+        @Override
+        public void notifySchedulingDelay(final long schedulingDelay)
+        {
+            if (schedulingDelay > 0)
+            {
+                _accumulatedSchedulingDelay += schedulingDelay;
+            }
+        }
+    }
+
+    public boolean isProtocolEstablished()
+    {
+        return _delegate instanceof AbstractAMQPConnection;
+    }
+
 
 }

Modified: 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
 Tue Apr 12 12:09:06 2016
@@ -47,8 +47,6 @@ public class NonBlockingNetworkTransport
     private static final Logger LOGGER = 
LoggerFactory.getLogger(NonBlockingNetworkTransport.class);
     private static final int TIMEOUT = 
Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
                                                           
CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
-    private static final int HANDSHAKE_TIMEOUT = 
Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
-                                                                    
CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
     private final Set<TransportEncryption> _encryptionSet;
     private final MultiVersionProtocolEngineFactory _factory;
     private final ServerSocketChannel _serverSocket;
@@ -167,15 +165,7 @@ public class NonBlockingNetworkTransport
                                                       _scheduler,
                                                       _port);
 
-                    AggregateTicker aggregateTicker = 
engine.getAggregateTicker();
-
-                    Ticker writeIdleTimeoutTicker = new 
ServerIdleWriteTimeoutTicker(connection, engine, _timeout);
-                    Ticker readIdleTimeoutTicker = new 
ServerIdleReadTimeoutTicker(connection, engine, _timeout);
-                    aggregateTicker.addTicker(writeIdleTimeoutTicker);
-                    aggregateTicker.addTicker(readIdleTimeoutTicker);
-
                     engine.setNetworkConnection(connection);
-                    connection.setMaxReadIdleMillis(1000L * HANDSHAKE_TIMEOUT);
 
                     connection.start();
 

Modified: 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
 Tue Apr 12 12:09:06 2016
@@ -26,35 +26,33 @@ import org.apache.qpid.transport.network
 public class ServerIdleReadTimeoutTicker implements Ticker
 {
     private final TransportActivity _transport;
-    private final int _defaultTimeout;
+    private final int _readDelay;
     private final ServerNetworkConnection _connection;
 
-    public ServerIdleReadTimeoutTicker(ServerNetworkConnection connection, 
TransportActivity transport,
-                                       int defaultTimeout)
+    public ServerIdleReadTimeoutTicker(ServerNetworkConnection connection, 
TransportActivity transport, int readDelay)
     {
+        if (readDelay <= 0)
+        {
+            throw new IllegalArgumentException("Read delay should be 
positive");
+        }
+
         _connection = connection;
         _transport = transport;
-        _defaultTimeout = defaultTimeout;
+        _readDelay = readDelay;
     }
 
     @Override
     public int getTimeToNextTick(long currentTime)
     {
-        final long maxReadIdle = _connection.getMaxReadIdleMillis();
-        if (maxReadIdle > 0)
-        {
-            long nextTime = _transport.getLastReadTime() + maxReadIdle;
-            return (int) (nextTime - (_connection.getScheduledTime() > 0 ?  
_connection.getScheduledTime() : currentTime) );
-        }
-
-        return _defaultTimeout;
+        long nextTime = _transport.getLastReadTime() + (long) _readDelay;
+        return (int) (nextTime - (_connection.getScheduledTime() > 0 ?  
_connection.getScheduledTime() : currentTime) );
     }
 
     @Override
     public int tick(long currentTime)
     {
         int timeToNextTick = getTimeToNextTick(currentTime);;
-        if (_connection.getMaxReadIdleMillis() > 0 && timeToNextTick <= 0)
+        if (timeToNextTick <= 0)
         {
             _transport.readerIdle();
         }

Modified: 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
 Tue Apr 12 12:09:06 2016
@@ -26,35 +26,31 @@ import org.apache.qpid.transport.network
 public class ServerIdleWriteTimeoutTicker implements Ticker
 {
     private final TransportActivity _transport;
-    private final int _defaultTimeout;
-    private final ServerNetworkConnection _connection;
+    private final int _writeDelay;
 
-    public ServerIdleWriteTimeoutTicker(ServerNetworkConnection connection, 
TransportActivity transport,
-                                        int defaultTimeout)
+    public ServerIdleWriteTimeoutTicker(TransportActivity transport, int 
writeDelay)
     {
-        _connection = connection;
+        if (writeDelay <= 0)
+        {
+            throw new IllegalArgumentException("Write delay should be 
positive");
+        }
+
         _transport = transport;
-        _defaultTimeout = defaultTimeout;
+        _writeDelay = writeDelay;
     }
 
     @Override
     public int getTimeToNextTick(long currentTime)
     {
-        long maxWriteIdle = _connection.getMaxWriteIdleMillis();
-        if (maxWriteIdle > 0)
-        {
-            long writeTime = _transport.getLastWriteTime() + maxWriteIdle;
-            return (int) (writeTime - currentTime);
-        }
-
-        return _defaultTimeout;
+        long writeTime = _transport.getLastWriteTime() + _writeDelay;
+        return (int) (writeTime - currentTime);
     }
 
     @Override
     public int tick(long currentTime)
     {
         int timeToNextTick = getTimeToNextTick(currentTime);
-        if (_connection.getMaxWriteIdleMillis() > 0 && timeToNextTick <= 0)
+        if (timeToNextTick <= 0)
         {
             _transport.writerIdle();
         }

Modified: 
qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
 Tue Apr 12 12:09:06 2016
@@ -122,6 +122,7 @@ public class TCPandSSLTransportTest exte
         when(port.getContextValue(Long.class, 
AmqpPort.PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT)).thenReturn(1l);
         when(port.getContextValue(Long.class, 
AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
         when(port.getContextValue(Integer.class, 
AmqpPort.PORT_AMQP_ACCEPT_BACKLOG)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_ACCEPT_BACKLOG);
+        
when(port.getProtocolHandshakeTimeout()).thenReturn(AmqpPort.DEFAULT_PROTOCOL_HANDSHAKE_TIMEOUT);
         ObjectMapper mapper = new ObjectMapper();
         JavaType type = 
mapper.getTypeFactory().constructCollectionType(List.class, String.class);
         List<String> whiteList = 
mapper.readValue(Broker.DEFAULT_SECURITY_TLS_PROTOCOL_WHITE_LIST, type);

Modified: 
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
 Tue Apr 12 12:09:06 2016
@@ -351,4 +351,11 @@ public class AMQPConnection_0_10 extends
     {
         super.logConnectionOpen();
     }
+
+    @Override
+    public void initialiseHeartbeating(final long writerDelay, final long 
readerDelay)
+    {
+        super.initialiseHeartbeating(writerDelay, readerDelay);
+    }
+
 }

Modified: 
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
 Tue Apr 12 12:09:06 2016
@@ -323,23 +323,12 @@ public class ServerConnectionDelegate ex
             okMaxFrameSize = getFrameMax();
         }
 
-        final NetworkConnection networkConnection = 
sconn.getNetworkConnection();
-        if(ok.hasHeartbeat())
+        if(ok.hasHeartbeat() && ok.getHeartbeat() > 0)
         {
             int heartbeat = ok.getHeartbeat();
-            if(heartbeat < 0)
-            {
-                heartbeat = 0;
-            }
-
-            networkConnection.setMaxReadIdleMillis(2000L * heartbeat);
-            networkConnection.setMaxWriteIdleMillis(1000L * heartbeat);
-
-        }
-        else
-        {
-            networkConnection.setMaxReadIdleMillis(0);
-            networkConnection.setMaxWriteIdleMillis(0);
+            long readerIdle = 2000L * heartbeat;
+            long writerIdle = 1000L * heartbeat;
+            sconn.getAmqpConnection().initialiseHeartbeating(writerIdle, 
readerIdle);
         }
 
         setConnectionTuneOkChannelMax(sconn, okChannelMax);

Modified: 
qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
 Tue Apr 12 12:09:06 2016
@@ -517,20 +517,6 @@ public class AMQPConnection_0_8
         _closingChannelsList.put(channelId, System.currentTimeMillis());
     }
 
-    private void initHeartbeats(int delay)
-    {
-        if (delay > 0)
-        {
-            _network.setMaxWriteIdleMillis(1000L * delay);
-            _network.setMaxReadIdleMillis(1000L * 
BrokerProperties.HEARTBEAT_TIMEOUT_FACTOR * delay);
-        }
-        else
-        {
-            _network.setMaxWriteIdleMillis(0);
-            _network.setMaxReadIdleMillis(0);
-        }
-    }
-
     private void closeAllChannels()
     {
         try
@@ -1308,7 +1294,12 @@ public class AMQPConnection_0_8
 
         assertState(ConnectionState.AWAIT_TUNE_OK);
 
-        initHeartbeats(heartbeat);
+        if (heartbeat > 0)
+        {
+            long writerDelay = 1000L * heartbeat;
+            long readerDelay = 1000L * 
BrokerProperties.HEARTBEAT_TIMEOUT_FACTOR * heartbeat;
+            initialiseHeartbeating(writerDelay, readerDelay);
+        }
 
         int brokerFrameMax = getDefaultMaxFrameSize();
         if (brokerFrameMax <= 0)

Modified: 
qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
 Tue Apr 12 12:09:06 2016
@@ -688,4 +688,10 @@ public class AMQPConnection_1_0 extends
             return _broker.getEventLogger();
         }
     }
+
+    @Override
+    public void initialiseHeartbeating(final long writerDelay, final long 
readerDelay)
+    {
+        super.initialiseHeartbeating(writerDelay, readerDelay);
+    }
 }

Modified: 
qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
 Tue Apr 12 12:09:06 2016
@@ -146,7 +146,6 @@ public class Connection_1_0 implements C
             }
             
_amqpConnection.setClientId(_connectionEndpoint.getRemoteContainerId());
         }
-        
_amqpConnection.getNetwork().setMaxReadIdleMillis(_connectionEndpoint.getDesiredIdleTimeout());
         long idleTimeout = _connectionEndpoint.getIdleTimeout();
         if(idleTimeout != 0L && idleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT)
         {
@@ -160,7 +159,8 @@ public class Connection_1_0 implements C
         }
         else
         {
-            _amqpConnection.getNetwork().setMaxWriteIdleMillis(idleTimeout / 
2L);
+            long desiredIdleTimeout = 
_connectionEndpoint.getDesiredIdleTimeout();
+            _amqpConnection.initialiseHeartbeating(idleTimeout / 2L, 
desiredIdleTimeout);
 
             final VirtualHostImpl<?,?,?> vhost = ((AmqpPort) 
_port).getVirtualHost(host);
             if (vhost == null)

Modified: 
qpid/java/branches/6.0.x/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java
 (original)
+++ 
qpid/java/branches/6.0.x/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java
 Tue Apr 12 12:09:06 2016
@@ -52,9 +52,6 @@ public class CommonProperties
     public static final String IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME = 
"qpid.io_network_transport_timeout";
     public static final int IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT = 60000;
 
-    public static final String HANDSHAKE_TIMEOUT_PROP_NAME = 
"qpid.handshake_timeout";
-    public static final int HANDSHAKE_TIMEOUT_DEFAULT = 2;
-
     public static final String QPID_SECURITY_TLS_PROTOCOL_WHITE_LIST = 
"qpid.security.tls.protocolWhiteList";
     public static final String QPID_SECURITY_TLS_PROTOCOL_WHITE_LIST_DEFAULT = 
"TLS.*";
     public static final String QPID_SECURITY_TLS_PROTOCOL_BLACK_LIST = 
"qpid.security.tls.protocolBlackList";



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to