Author: kwall
Date: Mon Apr 11 16:55:47 2016
New Revision: 1738607
URL: http://svn.apache.org/viewvc?rev=1738607&view=rev
Log:
QPID-7155: [Java Broker] Change ticker implementation to separate slow protocol
header detection from reader/writer idle timeouts established by connection tune
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
Mon Apr 11 16:55:47 2016
@@ -24,6 +24,7 @@ import java.net.SocketAddress;
import java.util.Set;
import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.DerivedAttribute;
import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedContextDefault;
import org.apache.qpid.server.model.ManagedObject;
@@ -91,6 +92,16 @@ public interface AmqpPort<X extends Amqp
@ManagedContextDefault(name = PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE)
long DEFAULT_PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE = 1024 * 1024;
+
+ String PROTOCOL_HANDSHAKE_TIMEOUT = "qpid.port.protocol_handshake_timeout";
+
+ @SuppressWarnings("unused")
+ @ManagedContextDefault(name = PROTOCOL_HANDSHAKE_TIMEOUT,
+ description = "Maximum time allowed for a new
connection to send a protocol header."
+ + " If the connection does not send a
protocol header within this time,"
+ + " the connection will be aborted.")
+ long DEFAULT_PROTOCOL_HANDSHAKE_TIMEOUT = 2000;
+
SSLContext getSSLContext();
@ManagedAttribute(defaultValue = "*")
@@ -131,6 +142,11 @@ public interface AmqpPort<X extends Amqp
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units =
StatisticUnit.COUNT, label = "Connections")
int getConnectionCount();
+ @DerivedAttribute(description = "Maximum time allowed for a new connection
to send a protocol header."
+ + " If the connection does not send a
protocol header within this time,"
+ + " the connection will be aborted.")
+ long getProtocolHandshakeTimeout();
+
VirtualHost<?> getVirtualHost(String name);
boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress);
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
Mon Apr 11 16:55:47 2016
@@ -132,6 +132,7 @@ public class AmqpPortImpl extends Abstra
private final SettableFuture _noConnectionsRemain =
SettableFuture.create();
private AcceptingTransport _transport;
private SSLContext _sslContext;
+ private volatile long _protocolHandshakeTimeout;
@ManagedObjectFactoryConstructor
public AmqpPortImpl(Map<String, Object> attributes, Broker<?> broker)
@@ -213,6 +214,13 @@ public class AmqpPortImpl extends Abstra
}
@Override
+ protected void onOpen()
+ {
+ super.onOpen();
+ _protocolHandshakeTimeout = getContextValue(Long.class,
AmqpPort.PROTOCOL_HANDSHAKE_TIMEOUT);
+ }
+
+ @Override
public <C extends ConfiguredObject> ListenableFuture<C>
addChildAsync(final Class<C> childClass,
final Map<String, Object> attributes,
final ConfiguredObject... otherParents)
@@ -646,4 +654,10 @@ public class AmqpPortImpl extends Abstra
{
return _connectionCount.get();
}
+
+ @Override
+ public long getProtocolHandshakeTimeout()
+ {
+ return _protocolHandshakeTimeout;
+ }
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
Mon Apr 11 16:55:47 2016
@@ -718,6 +718,21 @@ public abstract class AbstractAMQPConnec
});
}
+ protected void initialiseHeartbeating(final long writerDelay, final long
readerDelay)
+ {
+ if (writerDelay > 0)
+ {
+ _aggregateTicker.addTicker(new ServerIdleWriteTimeoutTicker(this,
(int) writerDelay));
+ _network.setMaxWriteIdleMillis(writerDelay);
+ }
+
+ if (readerDelay > 0)
+ {
+ _aggregateTicker.addTicker(new
ServerIdleReadTimeoutTicker(_network, this, (int) readerDelay));
+ _network.setMaxReadIdleMillis(readerDelay);
+ }
+ }
+
protected abstract boolean isOrderlyClose();
@Override
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
Mon Apr 11 16:55:47 2016
@@ -41,6 +41,7 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.logging.subjects.PortLogSubject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.SystemConfig;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
@@ -48,6 +49,7 @@ import org.apache.qpid.server.security.M
import org.apache.qpid.server.util.Action;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.AggregateTicker;
+import org.apache.qpid.transport.network.Ticker;
public class MultiVersionProtocolEngine implements ProtocolEngine
{
@@ -184,6 +186,11 @@ public class MultiVersionProtocolEngine
throw new IllegalArgumentException("Unsupported socket address
class: " + address);
}
_sender = network.getSender();
+
+ SlowProtocolHeaderTicker ticker = new
SlowProtocolHeaderTicker(_port.getProtocolHandshakeTimeout(),
+
System.currentTimeMillis());
+ _aggregateTicker.addTicker(ticker);
+ _network.addSchedulingDelayNotificationListeners(ticker);
}
@Override
@@ -591,8 +598,6 @@ public class MultiVersionProtocolEngine
@Override
public void readerIdle()
{
-
_broker.getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Protocol header
not sent within timeout period", true));
- _network.close();
}
@Override
@@ -617,5 +622,59 @@ public class MultiVersionProtocolEngine
}
}
+ class SlowProtocolHeaderTicker implements Ticker,
SchedulingDelayNotificationListener
+ {
+ private final long _allowedTime;
+ private final long _createdTime;
+ private volatile long _accumulatedSchedulingDelay;
+
+ public SlowProtocolHeaderTicker(long allowedTime, long createdTime)
+ {
+ _allowedTime = allowedTime;
+ _createdTime = createdTime;
+ }
+
+ @Override
+ public int getTimeToNextTick(final long currentTime)
+ {
+ return (int) (_createdTime + _allowedTime +
_accumulatedSchedulingDelay - currentTime); }
+
+ @Override
+ public int tick(final long currentTime)
+ {
+ int nextTick = getTimeToNextTick(currentTime);
+ if(nextTick <= 0)
+ {
+ if (isProtocolEstablished())
+ {
+ _aggregateTicker.removeTicker(this);
+ _network.removeSchedulingDelayNotificationListeners(this);
+ }
+ else
+ {
+ _logger.warn("Connection has taken more than {} ms to send
complete protocol header. Closing as possible DoS.",
+ _allowedTime);
+
_broker.getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Protocol header
not received within timeout period", true));
+ _network.close();
+ }
+ }
+ return nextTick;
+ }
+
+ @Override
+ public void notifySchedulingDelay(final long schedulingDelay)
+ {
+ if (schedulingDelay > 0)
+ {
+ _accumulatedSchedulingDelay += schedulingDelay;
+ }
+ }
+ }
+
+ public boolean isProtocolEstablished()
+ {
+ return _delegate instanceof AbstractAMQPConnection;
+ }
+
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
Mon Apr 11 16:55:47 2016
@@ -47,8 +47,6 @@ public class NonBlockingNetworkTransport
private static final Logger LOGGER =
LoggerFactory.getLogger(NonBlockingNetworkTransport.class);
private static final int TIMEOUT =
Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
- private static final int HANDSHAKE_TIMEOUT =
Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
-
CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
private final Set<TransportEncryption> _encryptionSet;
private final MultiVersionProtocolEngineFactory _factory;
private final ServerSocketChannel _serverSocket;
@@ -167,15 +165,7 @@ public class NonBlockingNetworkTransport
_scheduler,
_port);
- AggregateTicker aggregateTicker =
engine.getAggregateTicker();
-
- Ticker writeIdleTimeoutTicker = new
ServerIdleWriteTimeoutTicker(connection, engine, _timeout);
- Ticker readIdleTimeoutTicker = new
ServerIdleReadTimeoutTicker(connection, engine, _timeout);
- aggregateTicker.addTicker(writeIdleTimeoutTicker);
- aggregateTicker.addTicker(readIdleTimeoutTicker);
-
engine.setNetworkConnection(connection);
- connection.setMaxReadIdleMillis(1000L * HANDSHAKE_TIMEOUT);
connection.start();
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
Mon Apr 11 16:55:47 2016
@@ -26,35 +26,33 @@ import org.apache.qpid.transport.network
public class ServerIdleReadTimeoutTicker implements Ticker
{
private final TransportActivity _transport;
- private final int _defaultTimeout;
+ private final int _readDelay;
private final ServerNetworkConnection _connection;
- public ServerIdleReadTimeoutTicker(ServerNetworkConnection connection,
TransportActivity transport,
- int defaultTimeout)
+ public ServerIdleReadTimeoutTicker(ServerNetworkConnection connection,
TransportActivity transport, int readDelay)
{
+ if (readDelay <= 0)
+ {
+ throw new IllegalArgumentException("Read delay should be
positive");
+ }
+
_connection = connection;
_transport = transport;
- _defaultTimeout = defaultTimeout;
+ _readDelay = readDelay;
}
@Override
public int getTimeToNextTick(long currentTime)
{
- final long maxReadIdle = _connection.getMaxReadIdleMillis();
- if (maxReadIdle > 0)
- {
- long nextTime = _transport.getLastReadTime() + maxReadIdle;
- return (int) (nextTime - (_connection.getScheduledTime() > 0 ?
_connection.getScheduledTime() : currentTime) );
- }
-
- return _defaultTimeout;
+ long nextTime = _transport.getLastReadTime() + (long) _readDelay;
+ return (int) (nextTime - (_connection.getScheduledTime() > 0 ?
_connection.getScheduledTime() : currentTime) );
}
@Override
public int tick(long currentTime)
{
int timeToNextTick = getTimeToNextTick(currentTime);;
- if (_connection.getMaxReadIdleMillis() > 0 && timeToNextTick <= 0)
+ if (timeToNextTick <= 0)
{
_transport.readerIdle();
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
Mon Apr 11 16:55:47 2016
@@ -26,35 +26,31 @@ import org.apache.qpid.transport.network
public class ServerIdleWriteTimeoutTicker implements Ticker
{
private final TransportActivity _transport;
- private final int _defaultTimeout;
- private final ServerNetworkConnection _connection;
+ private final int _writeDelay;
- public ServerIdleWriteTimeoutTicker(ServerNetworkConnection connection,
TransportActivity transport,
- int defaultTimeout)
+ public ServerIdleWriteTimeoutTicker(TransportActivity transport, int
writeDelay)
{
- _connection = connection;
+ if (writeDelay <= 0)
+ {
+ throw new IllegalArgumentException("Write delay should be
positive");
+ }
+
_transport = transport;
- _defaultTimeout = defaultTimeout;
+ _writeDelay = writeDelay;
}
@Override
public int getTimeToNextTick(long currentTime)
{
- long maxWriteIdle = _connection.getMaxWriteIdleMillis();
- if (maxWriteIdle > 0)
- {
- long writeTime = _transport.getLastWriteTime() + maxWriteIdle;
- return (int) (writeTime - currentTime);
- }
-
- return _defaultTimeout;
+ long writeTime = _transport.getLastWriteTime() + _writeDelay;
+ return (int) (writeTime - currentTime);
}
@Override
public int tick(long currentTime)
{
int timeToNextTick = getTimeToNextTick(currentTime);
- if (_connection.getMaxWriteIdleMillis() > 0 && timeToNextTick <= 0)
+ if (timeToNextTick <= 0)
{
_transport.writerIdle();
}
Modified:
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
(original)
+++
qpid/java/trunk/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
Mon Apr 11 16:55:47 2016
@@ -150,6 +150,7 @@ public class TCPandSSLTransportTest exte
when(port.getContextValue(Long.class,
AmqpPort.PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT)).thenReturn(1l);
when(port.getContextValue(Long.class,
AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
when(port.getContextValue(Integer.class,
AmqpPort.PORT_AMQP_ACCEPT_BACKLOG)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_ACCEPT_BACKLOG);
+
when(port.getProtocolHandshakeTimeout()).thenReturn(AmqpPort.DEFAULT_PROTOCOL_HANDSHAKE_TIMEOUT);
ObjectMapper mapper = new ObjectMapper();
JavaType type =
mapper.getTypeFactory().constructCollectionType(List.class, String.class);
List<String> whiteList =
mapper.readValue(Broker.DEFAULT_SECURITY_TLS_PROTOCOL_WHITE_LIST, type);
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
Mon Apr 11 16:55:47 2016
@@ -334,4 +334,10 @@ public class AMQPConnection_0_10 extends
{
return !_connection.isConnectionLost();
}
+
+ @Override
+ public void initialiseHeartbeating(final long writerDelay, final long
readerDelay)
+ {
+ super.initialiseHeartbeating(writerDelay, readerDelay);
+ }
}
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
Mon Apr 11 16:55:47 2016
@@ -325,23 +325,12 @@ public class ServerConnectionDelegate ex
okMaxFrameSize = getFrameMax();
}
- final NetworkConnection networkConnection =
sconn.getNetworkConnection();
- if(ok.hasHeartbeat())
+ if(ok.hasHeartbeat() && ok.getHeartbeat() > 0)
{
int heartbeat = ok.getHeartbeat();
- if(heartbeat < 0)
- {
- heartbeat = 0;
- }
-
- networkConnection.setMaxReadIdleMillis(2000L * heartbeat);
- networkConnection.setMaxWriteIdleMillis(1000L * heartbeat);
-
- }
- else
- {
- networkConnection.setMaxReadIdleMillis(0);
- networkConnection.setMaxWriteIdleMillis(0);
+ long readerIdle = 2000L * heartbeat;
+ long writerIdle = 1000L * heartbeat;
+ sconn.getAmqpConnection().initialiseHeartbeating(writerIdle,
readerIdle);
}
setConnectionTuneOkChannelMax(sconn, okChannelMax);
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
Mon Apr 11 16:55:47 2016
@@ -495,21 +495,6 @@ public class AMQPConnection_0_8
_closingChannelsList.put(channelId, System.currentTimeMillis());
}
- private void initHeartbeats(int delay)
- {
- ServerNetworkConnection network = getNetwork();
- if (delay > 0)
- {
- network.setMaxWriteIdleMillis(1000L * delay);
- network.setMaxReadIdleMillis(1000L *
BrokerProperties.HEARTBEAT_TIMEOUT_FACTOR * delay);
- }
- else
- {
- network.setMaxWriteIdleMillis(0);
- network.setMaxReadIdleMillis(0);
- }
- }
-
private void closeAllChannels()
{
try
@@ -1232,7 +1217,12 @@ public class AMQPConnection_0_8
assertState(ConnectionState.AWAIT_TUNE_OK);
- initHeartbeats(heartbeat);
+ if (heartbeat > 0)
+ {
+ long writerDelay = 1000L * heartbeat;
+ long readerDelay = 1000L *
BrokerProperties.HEARTBEAT_TIMEOUT_FACTOR * heartbeat;
+ initialiseHeartbeating(writerDelay, readerDelay);
+ }
int brokerFrameMax = getDefaultMaxFrameSize();
if (brokerFrameMax <= 0)
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
Mon Apr 11 16:55:47 2016
@@ -659,4 +659,10 @@ public class AMQPConnection_1_0 extends
{
return _connection.getConnectionEndpoint().isOrderlyClose();
}
+
+ @Override
+ public void initialiseHeartbeating(final long writerDelay, final long
readerDelay)
+ {
+ super.initialiseHeartbeating(writerDelay, readerDelay);
+ }
}
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
Mon Apr 11 16:55:47 2016
@@ -140,7 +140,6 @@ public class Connection_1_0 implements C
}
_amqpConnection.setClientId(_connectionEndpoint.getRemoteContainerId());
}
-
_amqpConnection.getNetwork().setMaxReadIdleMillis(_connectionEndpoint.getDesiredIdleTimeout());
long idleTimeout = _connectionEndpoint.getIdleTimeout();
if(idleTimeout != 0L && idleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT)
{
@@ -154,7 +153,8 @@ public class Connection_1_0 implements C
}
else
{
- _amqpConnection.getNetwork().setMaxWriteIdleMillis(idleTimeout /
2L);
+ long desiredIdleTimeout =
_connectionEndpoint.getDesiredIdleTimeout();
+ _amqpConnection.initialiseHeartbeating(idleTimeout / 2L,
desiredIdleTimeout);
final VirtualHost vhost = ((AmqpPort) _port).getVirtualHost(host);
if (vhost == null)
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java?rev=1738607&r1=1738606&r2=1738607&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java
Mon Apr 11 16:55:47 2016
@@ -52,9 +52,6 @@ public class CommonProperties
public static final String IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME =
"qpid.io_network_transport_timeout";
public static final int IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT = 60000;
- public static final String HANDSHAKE_TIMEOUT_PROP_NAME =
"qpid.handshake_timeout";
- public static final int HANDSHAKE_TIMEOUT_DEFAULT = 2;
-
public static final String QPID_SECURITY_TLS_PROTOCOL_WHITE_LIST =
"qpid.security.tls.protocolWhiteList";
public static final String QPID_SECURITY_TLS_PROTOCOL_WHITE_LIST_DEFAULT =
"TLSv1\\.[0-9]+";
public static final String QPID_SECURITY_TLS_PROTOCOL_BLACK_LIST =
"qpid.security.tls.protocolBlackList";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]