Author: kwall
Date: Mon Apr 11 16:55:47 2016
New Revision: 1738607

URL: http://svn.apache.org/viewvc?rev=1738607&view=rev
Log:
QPID-7155: [Java Broker] Change ticker implementation to separate slow protocol 
header detection from reader/writer idle timeouts established by connection tune

Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
    
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
    
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
    
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
    
qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
 Mon Apr 11 16:55:47 2016
@@ -24,6 +24,7 @@ import java.net.SocketAddress;
 import java.util.Set;
 
 import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.DerivedAttribute;
 import org.apache.qpid.server.model.ManagedAttribute;
 import org.apache.qpid.server.model.ManagedContextDefault;
 import org.apache.qpid.server.model.ManagedObject;
@@ -91,6 +92,16 @@ public interface AmqpPort<X extends Amqp
     @ManagedContextDefault(name = PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE)
     long DEFAULT_PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE = 1024 * 1024;
 
+
+    String PROTOCOL_HANDSHAKE_TIMEOUT = "qpid.port.protocol_handshake_timeout";
+
+    @SuppressWarnings("unused")
+    @ManagedContextDefault(name = PROTOCOL_HANDSHAKE_TIMEOUT,
+                           description = "Maximum time allowed for a new 
connection to send a protocol header."
+                                         + " If the connection does not send a 
protocol header within this time,"
+                                         + " the connection will be aborted.")
+    long DEFAULT_PROTOCOL_HANDSHAKE_TIMEOUT = 2000;
+
     SSLContext getSSLContext();
 
     @ManagedAttribute(defaultValue = "*")
@@ -131,6 +142,11 @@ public interface AmqpPort<X extends Amqp
     @ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units = 
StatisticUnit.COUNT, label = "Connections")
     int getConnectionCount();
 
+    @DerivedAttribute(description = "Maximum time allowed for a new connection 
to send a protocol header."
+                                    + " If the connection does not send a 
protocol header within this time,"
+                                    + " the connection will be aborted.")
+    long getProtocolHandshakeTimeout();
+
     VirtualHost<?> getVirtualHost(String name);
 
     boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress);

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
 Mon Apr 11 16:55:47 2016
@@ -132,6 +132,7 @@ public class AmqpPortImpl extends Abstra
     private final SettableFuture _noConnectionsRemain = 
SettableFuture.create();
     private AcceptingTransport _transport;
     private SSLContext _sslContext;
+    private volatile long _protocolHandshakeTimeout;
 
     @ManagedObjectFactoryConstructor
     public AmqpPortImpl(Map<String, Object> attributes, Broker<?> broker)
@@ -213,6 +214,13 @@ public class AmqpPortImpl extends Abstra
     }
 
     @Override
+    protected void onOpen()
+    {
+        super.onOpen();
+        _protocolHandshakeTimeout = getContextValue(Long.class, 
AmqpPort.PROTOCOL_HANDSHAKE_TIMEOUT);
+    }
+
+    @Override
     public <C extends ConfiguredObject> ListenableFuture<C> 
addChildAsync(final Class<C> childClass,
                                                                           
final Map<String, Object> attributes,
                                                                           
final ConfiguredObject... otherParents)
@@ -646,4 +654,10 @@ public class AmqpPortImpl extends Abstra
     {
         return _connectionCount.get();
     }
+
+    @Override
+    public long getProtocolHandshakeTimeout()
+    {
+        return _protocolHandshakeTimeout;
+    }
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
 Mon Apr 11 16:55:47 2016
@@ -718,6 +718,21 @@ public abstract class AbstractAMQPConnec
         });
     }
 
+    protected void initialiseHeartbeating(final long writerDelay, final long 
readerDelay)
+    {
+        if (writerDelay > 0)
+        {
+            _aggregateTicker.addTicker(new ServerIdleWriteTimeoutTicker(this, 
(int) writerDelay));
+            _network.setMaxWriteIdleMillis(writerDelay);
+        }
+
+        if (readerDelay > 0)
+        {
+            _aggregateTicker.addTicker(new 
ServerIdleReadTimeoutTicker(_network, this, (int) readerDelay));
+            _network.setMaxReadIdleMillis(readerDelay);
+        }
+    }
+
     protected abstract boolean isOrderlyClose();
 
     @Override

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
 Mon Apr 11 16:55:47 2016
@@ -41,6 +41,7 @@ import org.apache.qpid.server.logging.me
 import org.apache.qpid.server.logging.subjects.PortLogSubject;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.SystemConfig;
 import org.apache.qpid.server.model.Transport;
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.plugin.ProtocolEngineCreator;
@@ -48,6 +49,7 @@ import org.apache.qpid.server.security.M
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.transport.ByteBufferSender;
 import org.apache.qpid.transport.network.AggregateTicker;
+import org.apache.qpid.transport.network.Ticker;
 
 public class MultiVersionProtocolEngine implements ProtocolEngine
 {
@@ -184,6 +186,11 @@ public class MultiVersionProtocolEngine
             throw new IllegalArgumentException("Unsupported socket address 
class: " + address);
         }
         _sender = network.getSender();
+
+        SlowProtocolHeaderTicker ticker = new 
SlowProtocolHeaderTicker(_port.getProtocolHandshakeTimeout(),
+                                                                       
System.currentTimeMillis());
+        _aggregateTicker.addTicker(ticker);
+        _network.addSchedulingDelayNotificationListeners(ticker);
     }
 
     @Override
@@ -591,8 +598,6 @@ public class MultiVersionProtocolEngine
         @Override
         public void readerIdle()
         {
-            
_broker.getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Protocol header 
not sent within timeout period", true));
-            _network.close();
         }
 
         @Override
@@ -617,5 +622,59 @@ public class MultiVersionProtocolEngine
         }
     }
 
+    class SlowProtocolHeaderTicker implements Ticker, 
SchedulingDelayNotificationListener
+    {
+        private final long _allowedTime;
+        private final long _createdTime;
+        private volatile long _accumulatedSchedulingDelay;
+
+        public SlowProtocolHeaderTicker(long allowedTime, long createdTime)
+        {
+            _allowedTime = allowedTime;
+            _createdTime = createdTime;
+        }
+
+        @Override
+        public int getTimeToNextTick(final long currentTime)
+        {
+            return (int) (_createdTime + _allowedTime + 
_accumulatedSchedulingDelay - currentTime);        }
+
+        @Override
+        public int tick(final long currentTime)
+        {
+            int nextTick = getTimeToNextTick(currentTime);
+            if(nextTick <= 0)
+            {
+                if (isProtocolEstablished())
+                {
+                    _aggregateTicker.removeTicker(this);
+                    _network.removeSchedulingDelayNotificationListeners(this);
+                }
+                else
+                {
+                    _logger.warn("Connection has taken more than {} ms to send 
complete protocol header.  Closing as possible DoS.",
+                                 _allowedTime);
+                    
_broker.getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Protocol header 
not received within timeout period", true));
+                    _network.close();
+                }
+            }
+            return nextTick;
+        }
+
+        @Override
+        public void notifySchedulingDelay(final long schedulingDelay)
+        {
+            if (schedulingDelay > 0)
+            {
+                _accumulatedSchedulingDelay += schedulingDelay;
+            }
+        }
+    }
+
+    public boolean isProtocolEstablished()
+    {
+        return _delegate instanceof AbstractAMQPConnection;
+    }
+
 
 }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
 Mon Apr 11 16:55:47 2016
@@ -47,8 +47,6 @@ public class NonBlockingNetworkTransport
     private static final Logger LOGGER = 
LoggerFactory.getLogger(NonBlockingNetworkTransport.class);
     private static final int TIMEOUT = 
Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
                                                           
CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
-    private static final int HANDSHAKE_TIMEOUT = 
Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
-                                                                    
CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
     private final Set<TransportEncryption> _encryptionSet;
     private final MultiVersionProtocolEngineFactory _factory;
     private final ServerSocketChannel _serverSocket;
@@ -167,15 +165,7 @@ public class NonBlockingNetworkTransport
                                                       _scheduler,
                                                       _port);
 
-                    AggregateTicker aggregateTicker = 
engine.getAggregateTicker();
-
-                    Ticker writeIdleTimeoutTicker = new 
ServerIdleWriteTimeoutTicker(connection, engine, _timeout);
-                    Ticker readIdleTimeoutTicker = new 
ServerIdleReadTimeoutTicker(connection, engine, _timeout);
-                    aggregateTicker.addTicker(writeIdleTimeoutTicker);
-                    aggregateTicker.addTicker(readIdleTimeoutTicker);
-
                     engine.setNetworkConnection(connection);
-                    connection.setMaxReadIdleMillis(1000L * HANDSHAKE_TIMEOUT);
 
                     connection.start();
 

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
 Mon Apr 11 16:55:47 2016
@@ -26,35 +26,33 @@ import org.apache.qpid.transport.network
 public class ServerIdleReadTimeoutTicker implements Ticker
 {
     private final TransportActivity _transport;
-    private final int _defaultTimeout;
+    private final int _readDelay;
     private final ServerNetworkConnection _connection;
 
-    public ServerIdleReadTimeoutTicker(ServerNetworkConnection connection, 
TransportActivity transport,
-                                       int defaultTimeout)
+    public ServerIdleReadTimeoutTicker(ServerNetworkConnection connection, 
TransportActivity transport, int readDelay)
     {
+        if (readDelay <= 0)
+        {
+            throw new IllegalArgumentException("Read delay should be 
positive");
+        }
+
         _connection = connection;
         _transport = transport;
-        _defaultTimeout = defaultTimeout;
+        _readDelay = readDelay;
     }
 
     @Override
     public int getTimeToNextTick(long currentTime)
     {
-        final long maxReadIdle = _connection.getMaxReadIdleMillis();
-        if (maxReadIdle > 0)
-        {
-            long nextTime = _transport.getLastReadTime() + maxReadIdle;
-            return (int) (nextTime - (_connection.getScheduledTime() > 0 ?  
_connection.getScheduledTime() : currentTime) );
-        }
-
-        return _defaultTimeout;
+        long nextTime = _transport.getLastReadTime() + (long) _readDelay;
+        return (int) (nextTime - (_connection.getScheduledTime() > 0 ?  
_connection.getScheduledTime() : currentTime) );
     }
 
     @Override
     public int tick(long currentTime)
     {
         int timeToNextTick = getTimeToNextTick(currentTime);;
-        if (_connection.getMaxReadIdleMillis() > 0 && timeToNextTick <= 0)
+        if (timeToNextTick <= 0)
         {
             _transport.readerIdle();
         }

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
 Mon Apr 11 16:55:47 2016
@@ -26,35 +26,31 @@ import org.apache.qpid.transport.network
 public class ServerIdleWriteTimeoutTicker implements Ticker
 {
     private final TransportActivity _transport;
-    private final int _defaultTimeout;
-    private final ServerNetworkConnection _connection;
+    private final int _writeDelay;
 
-    public ServerIdleWriteTimeoutTicker(ServerNetworkConnection connection, 
TransportActivity transport,
-                                        int defaultTimeout)
+    public ServerIdleWriteTimeoutTicker(TransportActivity transport, int 
writeDelay)
     {
-        _connection = connection;
+        if (writeDelay <= 0)
+        {
+            throw new IllegalArgumentException("Write delay should be 
positive");
+        }
+
         _transport = transport;
-        _defaultTimeout = defaultTimeout;
+        _writeDelay = writeDelay;
     }
 
     @Override
     public int getTimeToNextTick(long currentTime)
     {
-        long maxWriteIdle = _connection.getMaxWriteIdleMillis();
-        if (maxWriteIdle > 0)
-        {
-            long writeTime = _transport.getLastWriteTime() + maxWriteIdle;
-            return (int) (writeTime - currentTime);
-        }
-
-        return _defaultTimeout;
+        long writeTime = _transport.getLastWriteTime() + _writeDelay;
+        return (int) (writeTime - currentTime);
     }
 
     @Override
     public int tick(long currentTime)
     {
         int timeToNextTick = getTimeToNextTick(currentTime);
-        if (_connection.getMaxWriteIdleMillis() > 0 && timeToNextTick <= 0)
+        if (timeToNextTick <= 0)
         {
             _transport.writerIdle();
         }

Modified: 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
 Mon Apr 11 16:55:47 2016
@@ -150,6 +150,7 @@ public class TCPandSSLTransportTest exte
         when(port.getContextValue(Long.class, 
AmqpPort.PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT)).thenReturn(1l);
         when(port.getContextValue(Long.class, 
AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
         when(port.getContextValue(Integer.class, 
AmqpPort.PORT_AMQP_ACCEPT_BACKLOG)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_ACCEPT_BACKLOG);
+        
when(port.getProtocolHandshakeTimeout()).thenReturn(AmqpPort.DEFAULT_PROTOCOL_HANDSHAKE_TIMEOUT);
         ObjectMapper mapper = new ObjectMapper();
         JavaType type = 
mapper.getTypeFactory().constructCollectionType(List.class, String.class);
         List<String> whiteList = 
mapper.readValue(Broker.DEFAULT_SECURITY_TLS_PROTOCOL_WHITE_LIST, type);

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
 Mon Apr 11 16:55:47 2016
@@ -334,4 +334,10 @@ public class AMQPConnection_0_10 extends
     {
         return !_connection.isConnectionLost();
     }
+
+    @Override
+    public void initialiseHeartbeating(final long writerDelay, final long 
readerDelay)
+    {
+        super.initialiseHeartbeating(writerDelay, readerDelay);
+    }
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
 Mon Apr 11 16:55:47 2016
@@ -325,23 +325,12 @@ public class ServerConnectionDelegate ex
             okMaxFrameSize = getFrameMax();
         }
 
-        final NetworkConnection networkConnection = 
sconn.getNetworkConnection();
-        if(ok.hasHeartbeat())
+        if(ok.hasHeartbeat() && ok.getHeartbeat() > 0)
         {
             int heartbeat = ok.getHeartbeat();
-            if(heartbeat < 0)
-            {
-                heartbeat = 0;
-            }
-
-            networkConnection.setMaxReadIdleMillis(2000L * heartbeat);
-            networkConnection.setMaxWriteIdleMillis(1000L * heartbeat);
-
-        }
-        else
-        {
-            networkConnection.setMaxReadIdleMillis(0);
-            networkConnection.setMaxWriteIdleMillis(0);
+            long readerIdle = 2000L * heartbeat;
+            long writerIdle = 1000L * heartbeat;
+            sconn.getAmqpConnection().initialiseHeartbeating(writerIdle, 
readerIdle);
         }
 
         setConnectionTuneOkChannelMax(sconn, okChannelMax);

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
 Mon Apr 11 16:55:47 2016
@@ -495,21 +495,6 @@ public class AMQPConnection_0_8
         _closingChannelsList.put(channelId, System.currentTimeMillis());
     }
 
-    private void initHeartbeats(int delay)
-    {
-        ServerNetworkConnection network = getNetwork();
-        if (delay > 0)
-        {
-            network.setMaxWriteIdleMillis(1000L * delay);
-            network.setMaxReadIdleMillis(1000L * 
BrokerProperties.HEARTBEAT_TIMEOUT_FACTOR * delay);
-        }
-        else
-        {
-            network.setMaxWriteIdleMillis(0);
-            network.setMaxReadIdleMillis(0);
-        }
-    }
-
     private void closeAllChannels()
     {
         try
@@ -1232,7 +1217,12 @@ public class AMQPConnection_0_8
 
         assertState(ConnectionState.AWAIT_TUNE_OK);
 
-        initHeartbeats(heartbeat);
+        if (heartbeat > 0)
+        {
+            long writerDelay = 1000L * heartbeat;
+            long readerDelay = 1000L * 
BrokerProperties.HEARTBEAT_TIMEOUT_FACTOR * heartbeat;
+            initialiseHeartbeating(writerDelay, readerDelay);
+        }
 
         int brokerFrameMax = getDefaultMaxFrameSize();
         if (brokerFrameMax <= 0)

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
 Mon Apr 11 16:55:47 2016
@@ -659,4 +659,10 @@ public class AMQPConnection_1_0 extends
     {
         return _connection.getConnectionEndpoint().isOrderlyClose();
     }
+
+    @Override
+    public void initialiseHeartbeating(final long writerDelay, final long 
readerDelay)
+    {
+        super.initialiseHeartbeating(writerDelay, readerDelay);
+    }
 }

Modified: 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
 Mon Apr 11 16:55:47 2016
@@ -140,7 +140,6 @@ public class Connection_1_0 implements C
             }
             
_amqpConnection.setClientId(_connectionEndpoint.getRemoteContainerId());
         }
-        
_amqpConnection.getNetwork().setMaxReadIdleMillis(_connectionEndpoint.getDesiredIdleTimeout());
         long idleTimeout = _connectionEndpoint.getIdleTimeout();
         if(idleTimeout != 0L && idleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT)
         {
@@ -154,7 +153,8 @@ public class Connection_1_0 implements C
         }
         else
         {
-            _amqpConnection.getNetwork().setMaxWriteIdleMillis(idleTimeout / 
2L);
+            long desiredIdleTimeout = 
_connectionEndpoint.getDesiredIdleTimeout();
+            _amqpConnection.initialiseHeartbeating(idleTimeout / 2L, 
desiredIdleTimeout);
 
             final VirtualHost vhost = ((AmqpPort) _port).getVirtualHost(host);
             if (vhost == null)

Modified: 
qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
--- 
qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java
 (original)
+++ 
qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java
 Mon Apr 11 16:55:47 2016
@@ -52,9 +52,6 @@ public class CommonProperties
     public static final String IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME = 
"qpid.io_network_transport_timeout";
     public static final int IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT = 60000;
 
-    public static final String HANDSHAKE_TIMEOUT_PROP_NAME = 
"qpid.handshake_timeout";
-    public static final int HANDSHAKE_TIMEOUT_DEFAULT = 2;
-
     public static final String QPID_SECURITY_TLS_PROTOCOL_WHITE_LIST = 
"qpid.security.tls.protocolWhiteList";
     public static final String QPID_SECURITY_TLS_PROTOCOL_WHITE_LIST_DEFAULT = 
"TLSv1\\.[0-9]+";
     public static final String QPID_SECURITY_TLS_PROTOCOL_BLACK_LIST = 
"qpid.security.tls.protocolBlackList";



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to