Author: orudyy
Date: Tue Apr 12 12:09:06 2016
New Revision: 1738786
URL: http://svn.apache.org/viewvc?rev=1738786&view=rev
Log:
QPID-7155: [Java Broker] Change ticker implementation to separate slow protocol
header detection from reader/writer idle timeouts established by connection tune
Merged from trunk with manual confict resolution
svn merge -c 1738607 ^/qpid/java/trunk
Modified:
qpid/java/branches/6.0.x/ (props changed)
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/java/branches/6.0.x/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java
Propchange: qpid/java/branches/6.0.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Apr 12 12:09:06 2016
@@ -9,5 +9,5 @@
/qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
/qpid/branches/java-network-refactor/qpid/java:805429-821809
/qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732525,1732812,1734452,1736478,1736751,1736838,1737804,1737835,1737853,1737984,1737992,1738119,1738135,1738231,1738271,1738610,1738731
+/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732525,1732812,1734452,1736478,1736751,1736838,1737804,1737835,1737853,1737984,1737992,1738119,1738135,1738231,1738271,1738607,1738610,1738731
/qpid/trunk/qpid:796646-796653
Modified:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPort.java
Tue Apr 12 12:09:06 2016
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.Set;
import org.apache.qpid.server.model.AuthenticationProvider;
+import org.apache.qpid.server.model.DerivedAttribute;
import org.apache.qpid.server.model.ManagedAttribute;
import org.apache.qpid.server.model.ManagedContextDefault;
import org.apache.qpid.server.model.ManagedObject;
@@ -98,6 +99,13 @@ public interface AmqpPort<X extends Amqp
@ManagedContextDefault(name = PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE)
long DEFAULT_PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE = 1024 * 1024;
+
+ String PROTOCOL_HANDSHAKE_TIMEOUT = "qpid.port.protocol_handshake_timeout";
+
+ @SuppressWarnings("unused")
+ @ManagedContextDefault(name = PROTOCOL_HANDSHAKE_TIMEOUT)
+ long DEFAULT_PROTOCOL_HANDSHAKE_TIMEOUT = 2000;
+
SSLContext getSSLContext();
@ManagedAttribute(defaultValue = "*")
@@ -138,6 +146,11 @@ public interface AmqpPort<X extends Amqp
@ManagedStatistic(statisticType = StatisticType.POINT_IN_TIME, units =
StatisticUnit.COUNT, label = "Connections")
int getConnectionCount();
+ @DerivedAttribute(description = "Maximum time allowed for a new connection
to send a protocol header."
+ + " If the connection does not send a
protocol header within this time,"
+ + " the connection will be aborted.")
+ long getProtocolHandshakeTimeout();
+
VirtualHostImpl getVirtualHost(String name);
boolean canAcceptNewConnection(final SocketAddress remoteSocketAddress);
Modified:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/model/port/AmqpPortImpl.java
Tue Apr 12 12:09:06 2016
@@ -129,6 +129,7 @@ public class AmqpPortImpl extends Abstra
private final SettableFuture _noConnectionsRemain =
SettableFuture.create();
private AcceptingTransport _transport;
private SSLContext _sslContext;
+ private volatile long _protocolHandshakeTimeout;
@ManagedObjectFactoryConstructor
public AmqpPortImpl(Map<String, Object> attributes, Broker<?> broker)
@@ -198,7 +199,13 @@ public class AmqpPortImpl extends Abstra
attributes.put(VirtualHostAlias.TYPE, HostNameAlias.TYPE_NAME);
attributes.put(VirtualHostAlias.DURABLE, true);
createVirtualHostAlias(attributes);
+ }
+ @Override
+ protected void onOpen()
+ {
+ super.onOpen();
+ _protocolHandshakeTimeout = getContextValue(Long.class,
AmqpPort.PROTOCOL_HANDSHAKE_TIMEOUT);
}
@Override
@@ -637,4 +644,10 @@ public class AmqpPortImpl extends Abstra
{
return _connectionCount.get();
}
+
+ @Override
+ public long getProtocolHandshakeTimeout()
+ {
+ return _protocolHandshakeTimeout;
+ }
}
Modified:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
Tue Apr 12 12:09:06 2016
@@ -662,6 +662,21 @@ public abstract class AbstractAMQPConnec
return getSessionModels().size();
}
+ protected void initialiseHeartbeating(final long writerDelay, final long
readerDelay)
+ {
+ if (writerDelay > 0)
+ {
+ _aggregateTicker.addTicker(new ServerIdleWriteTimeoutTicker(this,
(int) writerDelay));
+ _network.setMaxWriteIdleMillis(writerDelay);
+ }
+
+ if (readerDelay > 0)
+ {
+ _aggregateTicker.addTicker(new
ServerIdleReadTimeoutTicker(_network, this, (int) readerDelay));
+ _network.setMaxReadIdleMillis(readerDelay);
+ }
+ }
+
@Override
public AMQPConnection<?> getUnderlyingConnection()
{
Modified:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
Tue Apr 12 12:09:06 2016
@@ -41,6 +41,7 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.logging.subjects.PortLogSubject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Protocol;
+import org.apache.qpid.server.model.SystemConfig;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.plugin.ProtocolEngineCreator;
@@ -48,6 +49,7 @@ import org.apache.qpid.server.security.M
import org.apache.qpid.server.util.Action;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.network.AggregateTicker;
+import org.apache.qpid.transport.network.Ticker;
public class MultiVersionProtocolEngine implements ProtocolEngine
{
@@ -184,6 +186,11 @@ public class MultiVersionProtocolEngine
throw new IllegalArgumentException("Unsupported socket address
class: " + address);
}
_sender = network.getSender();
+
+ SlowProtocolHeaderTicker ticker = new
SlowProtocolHeaderTicker(_port.getProtocolHandshakeTimeout(),
+
System.currentTimeMillis());
+ _aggregateTicker.addTicker(ticker);
+ _network.addSchedulingDelayNotificationListeners(ticker);
}
@Override
@@ -591,8 +598,6 @@ public class MultiVersionProtocolEngine
@Override
public void readerIdle()
{
-
_broker.getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Protocol header
not sent within timeout period", true));
- _network.close();
}
@Override
@@ -617,5 +622,59 @@ public class MultiVersionProtocolEngine
}
}
+ class SlowProtocolHeaderTicker implements Ticker,
SchedulingDelayNotificationListener
+ {
+ private final long _allowedTime;
+ private final long _createdTime;
+ private volatile long _accumulatedSchedulingDelay;
+
+ public SlowProtocolHeaderTicker(long allowedTime, long createdTime)
+ {
+ _allowedTime = allowedTime;
+ _createdTime = createdTime;
+ }
+
+ @Override
+ public int getTimeToNextTick(final long currentTime)
+ {
+ return (int) (_createdTime + _allowedTime +
_accumulatedSchedulingDelay - currentTime); }
+
+ @Override
+ public int tick(final long currentTime)
+ {
+ int nextTick = getTimeToNextTick(currentTime);
+ if(nextTick <= 0)
+ {
+ if (isProtocolEstablished())
+ {
+ _aggregateTicker.removeTicker(this);
+ _network.removeSchedulingDelayNotificationListeners(this);
+ }
+ else
+ {
+ _logger.warn("Connection has taken more than {} ms to send
complete protocol header. Closing as possible DoS.",
+ _allowedTime);
+
_broker.getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Protocol header
not received within timeout period", true));
+ _network.close();
+ }
+ }
+ return nextTick;
+ }
+
+ @Override
+ public void notifySchedulingDelay(final long schedulingDelay)
+ {
+ if (schedulingDelay > 0)
+ {
+ _accumulatedSchedulingDelay += schedulingDelay;
+ }
+ }
+ }
+
+ public boolean isProtocolEstablished()
+ {
+ return _delegate instanceof AbstractAMQPConnection;
+ }
+
}
Modified:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
Tue Apr 12 12:09:06 2016
@@ -47,8 +47,6 @@ public class NonBlockingNetworkTransport
private static final Logger LOGGER =
LoggerFactory.getLogger(NonBlockingNetworkTransport.class);
private static final int TIMEOUT =
Integer.getInteger(CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME,
CommonProperties.IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT);
- private static final int HANDSHAKE_TIMEOUT =
Integer.getInteger(CommonProperties.HANDSHAKE_TIMEOUT_PROP_NAME ,
-
CommonProperties.HANDSHAKE_TIMEOUT_DEFAULT);
private final Set<TransportEncryption> _encryptionSet;
private final MultiVersionProtocolEngineFactory _factory;
private final ServerSocketChannel _serverSocket;
@@ -167,15 +165,7 @@ public class NonBlockingNetworkTransport
_scheduler,
_port);
- AggregateTicker aggregateTicker =
engine.getAggregateTicker();
-
- Ticker writeIdleTimeoutTicker = new
ServerIdleWriteTimeoutTicker(connection, engine, _timeout);
- Ticker readIdleTimeoutTicker = new
ServerIdleReadTimeoutTicker(connection, engine, _timeout);
- aggregateTicker.addTicker(writeIdleTimeoutTicker);
- aggregateTicker.addTicker(readIdleTimeoutTicker);
-
engine.setNetworkConnection(connection);
- connection.setMaxReadIdleMillis(1000L * HANDSHAKE_TIMEOUT);
connection.start();
Modified:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
Tue Apr 12 12:09:06 2016
@@ -26,35 +26,33 @@ import org.apache.qpid.transport.network
public class ServerIdleReadTimeoutTicker implements Ticker
{
private final TransportActivity _transport;
- private final int _defaultTimeout;
+ private final int _readDelay;
private final ServerNetworkConnection _connection;
- public ServerIdleReadTimeoutTicker(ServerNetworkConnection connection,
TransportActivity transport,
- int defaultTimeout)
+ public ServerIdleReadTimeoutTicker(ServerNetworkConnection connection,
TransportActivity transport, int readDelay)
{
+ if (readDelay <= 0)
+ {
+ throw new IllegalArgumentException("Read delay should be
positive");
+ }
+
_connection = connection;
_transport = transport;
- _defaultTimeout = defaultTimeout;
+ _readDelay = readDelay;
}
@Override
public int getTimeToNextTick(long currentTime)
{
- final long maxReadIdle = _connection.getMaxReadIdleMillis();
- if (maxReadIdle > 0)
- {
- long nextTime = _transport.getLastReadTime() + maxReadIdle;
- return (int) (nextTime - (_connection.getScheduledTime() > 0 ?
_connection.getScheduledTime() : currentTime) );
- }
-
- return _defaultTimeout;
+ long nextTime = _transport.getLastReadTime() + (long) _readDelay;
+ return (int) (nextTime - (_connection.getScheduledTime() > 0 ?
_connection.getScheduledTime() : currentTime) );
}
@Override
public int tick(long currentTime)
{
int timeToNextTick = getTimeToNextTick(currentTime);;
- if (_connection.getMaxReadIdleMillis() > 0 && timeToNextTick <= 0)
+ if (timeToNextTick <= 0)
{
_transport.readerIdle();
}
Modified:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
Tue Apr 12 12:09:06 2016
@@ -26,35 +26,31 @@ import org.apache.qpid.transport.network
public class ServerIdleWriteTimeoutTicker implements Ticker
{
private final TransportActivity _transport;
- private final int _defaultTimeout;
- private final ServerNetworkConnection _connection;
+ private final int _writeDelay;
- public ServerIdleWriteTimeoutTicker(ServerNetworkConnection connection,
TransportActivity transport,
- int defaultTimeout)
+ public ServerIdleWriteTimeoutTicker(TransportActivity transport, int
writeDelay)
{
- _connection = connection;
+ if (writeDelay <= 0)
+ {
+ throw new IllegalArgumentException("Write delay should be
positive");
+ }
+
_transport = transport;
- _defaultTimeout = defaultTimeout;
+ _writeDelay = writeDelay;
}
@Override
public int getTimeToNextTick(long currentTime)
{
- long maxWriteIdle = _connection.getMaxWriteIdleMillis();
- if (maxWriteIdle > 0)
- {
- long writeTime = _transport.getLastWriteTime() + maxWriteIdle;
- return (int) (writeTime - currentTime);
- }
-
- return _defaultTimeout;
+ long writeTime = _transport.getLastWriteTime() + _writeDelay;
+ return (int) (writeTime - currentTime);
}
@Override
public int tick(long currentTime)
{
int timeToNextTick = getTimeToNextTick(currentTime);
- if (_connection.getMaxWriteIdleMillis() > 0 && timeToNextTick <= 0)
+ if (timeToNextTick <= 0)
{
_transport.writerIdle();
}
Modified:
qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/test/java/org/apache/qpid/server/transport/TCPandSSLTransportTest.java
Tue Apr 12 12:09:06 2016
@@ -122,6 +122,7 @@ public class TCPandSSLTransportTest exte
when(port.getContextValue(Long.class,
AmqpPort.PORT_AMQP_THREAD_POOL_KEEP_ALIVE_TIMEOUT)).thenReturn(1l);
when(port.getContextValue(Long.class,
AmqpPort.PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_OUTBOUND_MESSAGE_BUFFER_SIZE);
when(port.getContextValue(Integer.class,
AmqpPort.PORT_AMQP_ACCEPT_BACKLOG)).thenReturn(AmqpPort.DEFAULT_PORT_AMQP_ACCEPT_BACKLOG);
+
when(port.getProtocolHandshakeTimeout()).thenReturn(AmqpPort.DEFAULT_PROTOCOL_HANDSHAKE_TIMEOUT);
ObjectMapper mapper = new ObjectMapper();
JavaType type =
mapper.getTypeFactory().constructCollectionType(List.class, String.class);
List<String> whiteList =
mapper.readValue(Broker.DEFAULT_SECURITY_TLS_PROTOCOL_WHITE_LIST, type);
Modified:
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
(original)
+++
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/AMQPConnection_0_10.java
Tue Apr 12 12:09:06 2016
@@ -351,4 +351,11 @@ public class AMQPConnection_0_10 extends
{
super.logConnectionOpen();
}
+
+ @Override
+ public void initialiseHeartbeating(final long writerDelay, final long
readerDelay)
+ {
+ super.initialiseHeartbeating(writerDelay, readerDelay);
+ }
+
}
Modified:
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
(original)
+++
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
Tue Apr 12 12:09:06 2016
@@ -323,23 +323,12 @@ public class ServerConnectionDelegate ex
okMaxFrameSize = getFrameMax();
}
- final NetworkConnection networkConnection =
sconn.getNetworkConnection();
- if(ok.hasHeartbeat())
+ if(ok.hasHeartbeat() && ok.getHeartbeat() > 0)
{
int heartbeat = ok.getHeartbeat();
- if(heartbeat < 0)
- {
- heartbeat = 0;
- }
-
- networkConnection.setMaxReadIdleMillis(2000L * heartbeat);
- networkConnection.setMaxWriteIdleMillis(1000L * heartbeat);
-
- }
- else
- {
- networkConnection.setMaxReadIdleMillis(0);
- networkConnection.setMaxWriteIdleMillis(0);
+ long readerIdle = 2000L * heartbeat;
+ long writerIdle = 1000L * heartbeat;
+ sconn.getAmqpConnection().initialiseHeartbeating(writerIdle,
readerIdle);
}
setConnectionTuneOkChannelMax(sconn, okChannelMax);
Modified:
qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
(original)
+++
qpid/java/branches/6.0.x/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
Tue Apr 12 12:09:06 2016
@@ -517,20 +517,6 @@ public class AMQPConnection_0_8
_closingChannelsList.put(channelId, System.currentTimeMillis());
}
- private void initHeartbeats(int delay)
- {
- if (delay > 0)
- {
- _network.setMaxWriteIdleMillis(1000L * delay);
- _network.setMaxReadIdleMillis(1000L *
BrokerProperties.HEARTBEAT_TIMEOUT_FACTOR * delay);
- }
- else
- {
- _network.setMaxWriteIdleMillis(0);
- _network.setMaxReadIdleMillis(0);
- }
- }
-
private void closeAllChannels()
{
try
@@ -1308,7 +1294,12 @@ public class AMQPConnection_0_8
assertState(ConnectionState.AWAIT_TUNE_OK);
- initHeartbeats(heartbeat);
+ if (heartbeat > 0)
+ {
+ long writerDelay = 1000L * heartbeat;
+ long readerDelay = 1000L *
BrokerProperties.HEARTBEAT_TIMEOUT_FACTOR * heartbeat;
+ initialiseHeartbeating(writerDelay, readerDelay);
+ }
int brokerFrameMax = getDefaultMaxFrameSize();
if (brokerFrameMax <= 0)
Modified:
qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
(original)
+++
qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
Tue Apr 12 12:09:06 2016
@@ -688,4 +688,10 @@ public class AMQPConnection_1_0 extends
return _broker.getEventLogger();
}
}
+
+ @Override
+ public void initialiseHeartbeating(final long writerDelay, final long
readerDelay)
+ {
+ super.initialiseHeartbeating(writerDelay, readerDelay);
+ }
}
Modified:
qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
(original)
+++
qpid/java/branches/6.0.x/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
Tue Apr 12 12:09:06 2016
@@ -146,7 +146,6 @@ public class Connection_1_0 implements C
}
_amqpConnection.setClientId(_connectionEndpoint.getRemoteContainerId());
}
-
_amqpConnection.getNetwork().setMaxReadIdleMillis(_connectionEndpoint.getDesiredIdleTimeout());
long idleTimeout = _connectionEndpoint.getIdleTimeout();
if(idleTimeout != 0L && idleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT)
{
@@ -160,7 +159,8 @@ public class Connection_1_0 implements C
}
else
{
- _amqpConnection.getNetwork().setMaxWriteIdleMillis(idleTimeout /
2L);
+ long desiredIdleTimeout =
_connectionEndpoint.getDesiredIdleTimeout();
+ _amqpConnection.initialiseHeartbeating(idleTimeout / 2L,
desiredIdleTimeout);
final VirtualHostImpl<?,?,?> vhost = ((AmqpPort)
_port).getVirtualHost(host);
if (vhost == null)
Modified:
qpid/java/branches/6.0.x/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java?rev=1738786&r1=1738785&r2=1738786&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java
(original)
+++
qpid/java/branches/6.0.x/common/src/main/java/org/apache/qpid/configuration/CommonProperties.java
Tue Apr 12 12:09:06 2016
@@ -52,9 +52,6 @@ public class CommonProperties
public static final String IO_NETWORK_TRANSPORT_TIMEOUT_PROP_NAME =
"qpid.io_network_transport_timeout";
public static final int IO_NETWORK_TRANSPORT_TIMEOUT_DEFAULT = 60000;
- public static final String HANDSHAKE_TIMEOUT_PROP_NAME =
"qpid.handshake_timeout";
- public static final int HANDSHAKE_TIMEOUT_DEFAULT = 2;
-
public static final String QPID_SECURITY_TLS_PROTOCOL_WHITE_LIST =
"qpid.security.tls.protocolWhiteList";
public static final String QPID_SECURITY_TLS_PROTOCOL_WHITE_LIST_DEFAULT =
"TLS.*";
public static final String QPID_SECURITY_TLS_PROTOCOL_BLACK_LIST =
"qpid.security.tls.protocolBlackList";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]