Author: rgodfrey
Date: Mon Nov 16 09:50:41 2015
New Revision: 1714530
URL: http://svn.apache.org/viewvc?rev=1714530&view=rev
Log:
QPID-6843 : Implement heartbeating for AMQP 1.0
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1714530&r1=1714529&r2=1714530&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
Mon Nov 16 09:50:41 2015
@@ -60,9 +60,6 @@ public class NonBlockingConnection imple
private final AtomicLong _usedOutboundMessageSpace = new AtomicLong();
private final long _outboundMessageBufferLimit;
- private volatile int _maxReadIdle;
- private volatile int _maxWriteIdle;
-
private volatile boolean _fullyWritten = true;
private boolean _partialRead = false;
@@ -73,6 +70,8 @@ public class NonBlockingConnection imple
private final String _threadName;
private volatile SelectorThread.SelectionTask _selectionTask;
private Iterator<Runnable> _pendingIterator;
+ private final AtomicLong _maxWriteIdleMillis = new AtomicLong();
+ private final AtomicLong _maxReadIdleMillis = new AtomicLong();
public NonBlockingConnection(SocketChannel socketChannel,
ProtocolEngine protocolEngine,
@@ -169,15 +168,15 @@ public class NonBlockingConnection imple
}
@Override
- public void setMaxWriteIdle(int sec)
+ public void setMaxWriteIdleMillis(final long millis)
{
- _maxWriteIdle = sec;
+ _maxWriteIdleMillis.set(millis);
}
@Override
- public void setMaxReadIdle(int sec)
+ public void setMaxReadIdleMillis(final long millis)
{
- _maxReadIdle = sec;
+ _maxReadIdleMillis.set(millis);
}
@Override
@@ -193,15 +192,15 @@ public class NonBlockingConnection imple
}
@Override
- public int getMaxReadIdle()
+ public long getMaxReadIdleMillis()
{
- return _maxReadIdle;
+ return _maxReadIdleMillis.get();
}
@Override
- public int getMaxWriteIdle()
+ public long getMaxWriteIdleMillis()
{
- return _maxWriteIdle;
+ return _maxWriteIdleMillis.get();
}
@Override
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1714530&r1=1714529&r2=1714530&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
Mon Nov 16 09:50:41 2015
@@ -173,7 +173,7 @@ public class NonBlockingNetworkTransport
_port);
engine.setNetworkConnection(connection);
- connection.setMaxReadIdle(HANDSHAKE_TIMEOUT);
+ connection.setMaxReadIdleMillis(1000L * HANDSHAKE_TIMEOUT);
idleTimeoutTicker.setConnection(connection);
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java?rev=1714530&r1=1714529&r2=1714530&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
Mon Nov 16 09:50:41 2015
@@ -283,14 +283,14 @@ public class ServerConnectionDelegate ex
heartbeat = 0;
}
- networkConnection.setMaxReadIdle(2 * heartbeat);
- networkConnection.setMaxWriteIdle(heartbeat);
+ networkConnection.setMaxReadIdleMillis(2000L * heartbeat);
+ networkConnection.setMaxWriteIdleMillis(1000L * heartbeat);
}
else
{
- networkConnection.setMaxReadIdle(0);
- networkConnection.setMaxWriteIdle(0);
+ networkConnection.setMaxReadIdleMillis(0);
+ networkConnection.setMaxWriteIdleMillis(0);
}
setConnectionTuneOkChannelMax(sconn, okChannelMax);
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1714530&r1=1714529&r2=1714530&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
Mon Nov 16 09:50:41 2015
@@ -550,13 +550,13 @@ public class AMQPConnection_0_8
{
if (delay > 0)
{
- _network.setMaxWriteIdle(delay);
- _network.setMaxReadIdle(BrokerProperties.HEARTBEAT_TIMEOUT_FACTOR
* delay);
+ _network.setMaxWriteIdleMillis(1000L * delay);
+ _network.setMaxReadIdleMillis(1000L *
BrokerProperties.HEARTBEAT_TIMEOUT_FACTOR * delay);
}
else
{
- _network.setMaxWriteIdle(0);
- _network.setMaxReadIdle(0);
+ _network.setMaxWriteIdleMillis(0);
+ _network.setMaxReadIdleMillis(0);
}
}
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java?rev=1714530&r1=1714529&r2=1714530&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/framing/FrameHandler.java
Mon Nov 16 09:50:41 2015
@@ -134,13 +134,12 @@ public class FrameHandler implements Pro
state = State.ERROR;
break;
}
-
- else if (size >
_connection.getDesiredMaxFrameSize().intValue())
+ else if(size > _connection.getMaxFrameSize())
{
frameParsingError = createFramingError(
"specified frame size %d larger than
maximum frame header size %d",
size,
-
_connection.getDesiredMaxFrameSize().intValue());
+ _connection.getMaxFrameSize());
state = State.ERROR;
break;
}
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java?rev=1714530&r1=1714529&r2=1714530&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionEndpoint.java
Mon Nov 16 09:50:41 2015
@@ -136,7 +136,6 @@ public class ConnectionEndpoint implemen
private boolean _saslComplete;
- private UnsignedInteger _desiredMaxFrameSize =
UnsignedInteger.valueOf(DEFAULT_MAX_FRAME);
private Runnable _onSaslCompleteTask;
private SaslServerProvider _saslServerProvider;
@@ -153,6 +152,7 @@ public class ConnectionEndpoint implemen
private Principal _externalPrincipal;
private List<Runnable> _postLockActions = new ArrayList<>();
private Map _remoteProperties;
+ private long _desiredIdleTimeout;
public ConnectionEndpoint(Container container, SaslServerProvider cbs)
{
@@ -294,8 +294,9 @@ public class ConnectionEndpoint implemen
}
open.setChannelMax(UnsignedShort.valueOf((short) channelMax));
open.setContainerId(_container.getId());
- open.setMaxFrameSize(getDesiredMaxFrameSize());
+ open.setMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize));
open.setHostname(getRemoteHostname());
+ open.setIdleTimeOut(UnsignedInteger.valueOf(_desiredIdleTimeout));
if (_properties != null)
{
open.setProperties(_properties);
@@ -304,18 +305,6 @@ public class ConnectionEndpoint implemen
send(CONNECTION_CONTROL_CHANNEL, open);
}
- public UnsignedInteger getDesiredMaxFrameSize()
- {
- return _desiredMaxFrameSize;
- }
-
-
- public void setDesiredMaxFrameSize(UnsignedInteger size)
- {
- _desiredMaxFrameSize = size;
- }
-
-
private void closeSender()
{
setClosedForOutput(true);
@@ -363,12 +352,8 @@ public class ConnectionEndpoint implemen
_receivingSessions = new SessionEndpoint[_channelMax + 1];
_sendingSessions = new SessionEndpoint[_channelMax + 1];
}
- UnsignedInteger remoteDesiredMaxFrameSize =
- open.getMaxFrameSize() == null ?
UnsignedInteger.valueOf(DEFAULT_MAX_FRAME) : open.getMaxFrameSize();
- _maxFrameSize =
(remoteDesiredMaxFrameSize.compareTo(_desiredMaxFrameSize) < 0
- ? remoteDesiredMaxFrameSize
- : _desiredMaxFrameSize).intValue();
+ _maxFrameSize = open.getMaxFrameSize() == null ? DEFAULT_MAX_FRAME :
open.getMaxFrameSize().intValue();
_remoteContainerId = open.getContainerId();
_localHostname = open.getHostname();
@@ -390,11 +375,7 @@ public class ConnectionEndpoint implemen
// TODO bad stuff (connection already open)
}
- /*if(_state == ConnectionState.AWAITING_OPEN)
- {
- _state = ConnectionState.OPEN;
- }
-*/
+
notifyAll();
}
@@ -776,6 +757,16 @@ public class ConnectionEndpoint implemen
_externalPrincipal = externalPrincipal;
}
+ public void setDesiredIdleTimeout(final long desiredIdleTimeout)
+ {
+ _desiredIdleTimeout = desiredIdleTimeout;
+ }
+
+ public long getDesiredIdleTimeout()
+ {
+ return _desiredIdleTimeout;
+ }
+
public static interface FrameReceiptLogger
{
boolean isEnabled();
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1714530&r1=1714529&r2=1714530&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java
Mon Nov 16 09:50:41 2015
@@ -45,6 +45,7 @@ import org.apache.qpid.amqp_1_0.framing.
import org.apache.qpid.amqp_1_0.framing.FrameHandler;
import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
import org.apache.qpid.amqp_1_0.framing.SASLFrameHandler;
+import org.apache.qpid.amqp_1_0.framing.TransportFrame;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.Container;
import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
@@ -52,11 +53,14 @@ import org.apache.qpid.amqp_1_0.transpor
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.FrameBody;
import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.transport.ConnectionError;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.protocol.ConnectionClosingTicker;
@@ -157,6 +161,7 @@ public class AMQPConnection_1_0 extends
_connection.setAmqpConnection(this);
_endpoint = _connection.getConnectionEndpoint();
_endpoint.setConnectionEventListener(_connection);
+ _endpoint.setDesiredIdleTimeout(1000L *
broker.getConnection_heartBeatDelay());
_endpoint.setFrameOutputHandler(this);
final List<String> mechanisms =
port.getAuthenticationProvider().getSubjectCreator(transport.isSecure()).getMechanisms();
ByteBuffer headerResponse = useSASL ? initiateSasl() :
initiateNonSasl(mechanisms);
@@ -265,14 +270,25 @@ public class AMQPConnection_1_0 extends
return headerResponse;
}
+ @Override
public void writerIdle()
{
- //Todo
+ send(TransportFrame.createAMQFrame((short)0,null));
}
+ @Override
public void readerIdle()
{
- //Todo
+ AccessController.doPrivileged(new PrivilegedAction<Object>()
+ {
+ @Override
+ public Object run()
+ {
+ getEventLogger().message(ConnectionMessages.IDLE_CLOSE());
+ getNetwork().close();
+ return null;
+ }
+ }, getAccessControllerContext());
}
@Override
Modified:
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1714530&r1=1714529&r2=1714530&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
Mon Nov 16 09:50:41 2015
@@ -46,6 +46,7 @@ import org.apache.qpid.amqp_1_0.transpor
import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
import org.apache.qpid.amqp_1_0.type.Symbol;
import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+import org.apache.qpid.amqp_1_0.type.transport.ConnectionError;
import org.apache.qpid.amqp_1_0.type.transport.End;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.protocol.AMQConstant;
@@ -61,6 +62,7 @@ import org.apache.qpid.server.virtualhos
public class Connection_1_0 implements ConnectionEventListener
{
+ private static final long MINIMUM_SUPPORTED_IDLE_TIMEOUT = 1000L;
private final AmqpPort<?> _port;
private final SubjectCreator _subjectCreator;
private AMQPConnection_1_0 _amqpConnection;
@@ -139,39 +141,56 @@ public class Connection_1_0 implements C
}
_amqpConnection.setClientId(_connectionEndpoint.getRemoteContainerId());
}
- // TODO implement AMQP 1.0 heartbeating
- _amqpConnection.getNetwork().setMaxReadIdle(0);
- _amqpConnection.getNetwork().setMaxWriteIdle(0);
-
- _vhost = ((AmqpPort)_port).getVirtualHost(host);
- if(_vhost == null)
+
_amqpConnection.getNetwork().setMaxReadIdleMillis(_connectionEndpoint.getDesiredIdleTimeout());
+ long idleTimeout = _connectionEndpoint.getIdleTimeout();
+ if(idleTimeout != 0L && idleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT)
{
- final Error err = new Error();
- err.setCondition(AmqpError.NOT_FOUND);
- err.setDescription("Unknown hostname in connection open: '" + host
+ "'");
- _connectionEndpoint.close(err);
+ _connectionEndpoint.close(new
Error(ConnectionError.CONNECTION_FORCED,
+ "Requested idle timeout of "
+ + idleTimeout
+ + " is too low. The minimum
supported timeout is"
+ +
MINIMUM_SUPPORTED_IDLE_TIMEOUT));
+ _amqpConnection.close();
_closedOnOpen = true;
}
else
{
- final Principal user = _connectionEndpoint.getUser();
- if(user != null)
- {
- setUserPrincipal(user);
- }
-
_amqpConnection.getSubject().getPrincipals().add(_vhost.getPrincipal());
- _amqpConnection.updateAccessControllerContext();
-
if(AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(_amqpConnection.getSubject())
== null)
+ _amqpConnection.getNetwork().setMaxWriteIdleMillis(idleTimeout /
2L);
+
+ _vhost = ((AmqpPort) _port).getVirtualHost(host);
+ if (_vhost == null)
{
final Error err = new Error();
- err.setCondition(AmqpError.NOT_ALLOWED);
- err.setDescription("Connection has not been authenticated");
+ err.setCondition(AmqpError.NOT_FOUND);
+ err.setDescription("Unknown hostname in connection open: '" +
host + "'");
_connectionEndpoint.close(err);
+ _amqpConnection.close();
+
_closedOnOpen = true;
}
else
{
- _amqpConnection.virtualHostAssociated();
+ final Principal user = _connectionEndpoint.getUser();
+ if (user != null)
+ {
+ setUserPrincipal(user);
+ }
+
_amqpConnection.getSubject().getPrincipals().add(_vhost.getPrincipal());
+ _amqpConnection.updateAccessControllerContext();
+ if
(AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(_amqpConnection.getSubject())
+ == null)
+ {
+ final Error err = new Error();
+ err.setCondition(AmqpError.NOT_ALLOWED);
+ err.setDescription("Connection has not been
authenticated");
+ _connectionEndpoint.close(err);
+ _amqpConnection.close();
+ _closedOnOpen = true;
+ }
+ else
+ {
+ _amqpConnection.virtualHostAssociated();
+ }
}
}
}
Modified:
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1714530&r1=1714529&r2=1714530&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
(original)
+++
qpid/java/trunk/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
Mon Nov 16 09:50:41 2015
@@ -318,8 +318,8 @@ class WebSocketProvider implements Accep
private final AtomicLong _usedOutboundMessageSpace = new AtomicLong();
private Certificate _certificate;
- private int _maxWriteIdle;
- private int _maxReadIdle;
+ private long _maxWriteIdleMillis;
+ private long _maxReadIdleMillis;
public ConnectionWrapper(final WebSocket.Connection connection,
final SocketAddress localAddress,
@@ -378,15 +378,15 @@ class WebSocketProvider implements Accep
}
@Override
- public void setMaxWriteIdle(final int sec)
+ public void setMaxWriteIdleMillis(final long millis)
{
- _maxWriteIdle = sec;
+ _maxWriteIdleMillis = millis;
}
@Override
- public void setMaxReadIdle(final int sec)
+ public void setMaxReadIdleMillis(final long millis)
{
- _maxReadIdle = sec;
+ _maxReadIdleMillis = millis;
}
@Override
@@ -402,15 +402,15 @@ class WebSocketProvider implements Accep
}
@Override
- public int getMaxReadIdle()
+ public long getMaxReadIdleMillis()
{
- return _maxReadIdle;
+ return _maxReadIdleMillis;
}
@Override
- public int getMaxWriteIdle()
+ public long getMaxWriteIdleMillis()
{
- return _maxWriteIdle;
+ return _maxWriteIdleMillis;
}
@Override
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=1714530&r1=1714529&r2=1714530&view=diff
==============================================================================
---
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
(original)
+++
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
Mon Nov 16 09:50:41 2015
@@ -154,9 +154,9 @@ public class AMQProtocolSession implemen
if (delay > 0)
{
NetworkConnection network =
getProtocolHandler().getNetworkConnection();
- network.setMaxWriteIdle(delay);
+ network.setMaxWriteIdleMillis(1000L*delay);
int readerIdle = (int)(delay * timeoutFactor);
- network.setMaxReadIdle(readerIdle);
+ network.setMaxReadIdleMillis(1000L * readerIdle);
}
}
Modified:
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java?rev=1714530&r1=1714529&r2=1714530&view=diff
==============================================================================
---
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java
(original)
+++
qpid/java/trunk/client/src/test/java/org/apache/qpid/client/transport/TestNetworkConnection.java
Mon Nov 16 09:50:41 2015
@@ -58,11 +58,6 @@ public class TestNetworkConnection imple
return (_remoteAddress != null) ? _remoteAddress : new
InetSocketAddress(_remoteHost, _port);
}
- public void setMaxReadIdle(int idleTime)
- {
-
- }
-
@Override
public Principal getPeerPrincipal()
{
@@ -76,18 +71,25 @@ public class TestNetworkConnection imple
}
@Override
- public int getMaxReadIdle()
+ public long getMaxReadIdleMillis()
+ {
+ return 0L;
+ }
+
+ @Override
+ public long getMaxWriteIdleMillis()
{
- return 0;
+ return 0L;
}
@Override
- public int getMaxWriteIdle()
+ public void setMaxWriteIdleMillis(final long millis)
{
- return 0;
+
}
- public void setMaxWriteIdle(int idleTime)
+ @Override
+ public void setMaxReadIdleMillis(final long millis)
{
}
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=1714530&r1=1714529&r2=1714530&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
Mon Nov 16 09:50:41 2015
@@ -153,8 +153,8 @@ public class ClientDelegate extends Conn
maxFrameSize,
actualHeartbeatInterval);
-
conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor));
- conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval);
+ conn.getNetworkConnection().setMaxReadIdleMillis((long) (1000L *
(actualHeartbeatInterval * heartbeatTimeoutFactor)));
+ conn.getNetworkConnection().setMaxWriteIdleMillis(1000L
*actualHeartbeatInterval);
conn.setMaxFrameSize(maxFrameSize == 0 ? 0xffff : maxFrameSize);
int channelMax = tune.getChannelMax();
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java?rev=1714530&r1=1714529&r2=1714530&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/NetworkConnection.java
Mon Nov 16 09:50:41 2015
@@ -44,15 +44,16 @@ public interface NetworkConnection
*/
SocketAddress getLocalAddress();
- void setMaxWriteIdle(int sec);
+ void setMaxWriteIdleMillis(long millis);
+ void setMaxReadIdleMillis(long millis);
- void setMaxReadIdle(int sec);
Principal getPeerPrincipal();
Certificate getPeerCertificate();
- int getMaxReadIdle();
+ long getMaxReadIdleMillis();
+
+ long getMaxWriteIdleMillis();
- int getMaxWriteIdle();
}
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java?rev=1714530&r1=1714529&r2=1714530&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IdleTimeoutTicker.java
Mon Nov 16 09:50:41 2015
@@ -41,14 +41,14 @@ public class IdleTimeoutTicker implement
public int getTimeToNextTick(long currentTime)
{
long nextTime = -1;
- final long maxReadIdle = 1000l * _connection.getMaxReadIdle();
+ final long maxReadIdle = _connection.getMaxReadIdleMillis();
if(maxReadIdle > 0)
{
nextTime = _transport.getLastReadTime() + maxReadIdle;
}
- long maxWriteIdle = 1000l * _connection.getMaxWriteIdle();
+ long maxWriteIdle = _connection.getMaxWriteIdleMillis();
if(maxWriteIdle > 0)
{
@@ -65,13 +65,13 @@ public class IdleTimeoutTicker implement
public int tick(long currentTime)
{
// writer Idle
- long maxWriteIdle = 1000l * _connection.getMaxWriteIdle();
+ long maxWriteIdle = _connection.getMaxWriteIdleMillis();
if(maxWriteIdle > 0 && maxWriteIdle+ _transport.getLastWriteTime() <=
currentTime)
{
_transport.writerIdle();
}
// reader Idle
- final long maxReadIdle = 1000l * _connection.getMaxReadIdle();
+ final long maxReadIdle = _connection.getMaxReadIdleMillis();
if(maxReadIdle > 0 && maxReadIdle+ _transport.getLastReadTime() <=
currentTime)
{
Modified:
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java?rev=1714530&r1=1714529&r2=1714530&view=diff
==============================================================================
---
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
(original)
+++
qpid/java/trunk/common/src/main/java/org/apache/qpid/transport/network/io/IoNetworkConnection.java
Mon Nov 16 09:50:41 2015
@@ -43,12 +43,12 @@ public class IoNetworkConnection impleme
private final long _timeout;
private final IoSender _ioSender;
private final IoReceiver _ioReceiver;
- private int _maxReadIdle;
- private int _maxWriteIdle;
private Principal _principal;
private boolean _principalChecked;
private final Object _lock = new Object();
private Certificate _certificate;
+ private long _maxWriteIdleMillis;
+ private long _maxReadIdleMillis;
public IoNetworkConnection(Socket socket,
ExceptionHandlingByteBufferReceiver delegate,
int sendBufferSize, int receiveBufferSize, long timeout, Ticker
ticker)
@@ -98,14 +98,16 @@ public class IoNetworkConnection impleme
return _socket.getLocalSocketAddress();
}
- public void setMaxWriteIdle(int sec)
+ @Override
+ public void setMaxWriteIdleMillis(final long millis)
{
- _maxWriteIdle = sec;
+ _maxWriteIdleMillis = millis;
}
- public void setMaxReadIdle(int sec)
+ @Override
+ public void setMaxReadIdleMillis(final long millis)
{
- _maxReadIdle = sec;
+ _maxReadIdleMillis = millis;
}
@Override
@@ -154,14 +156,14 @@ public class IoNetworkConnection impleme
}
@Override
- public int getMaxReadIdle()
+ public long getMaxReadIdleMillis()
{
- return _maxReadIdle;
+ return _maxReadIdleMillis;
}
@Override
- public int getMaxWriteIdle()
+ public long getMaxWriteIdleMillis()
{
- return _maxWriteIdle;
+ return _maxWriteIdleMillis;
}
}
Modified:
qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java?rev=1714530&r1=1714529&r2=1714530&view=diff
==============================================================================
---
qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java
(original)
+++
qpid/java/trunk/common/src/test/java/org/apache/qpid/transport/network/io/IdleTimeoutTickerTest.java
Mon Nov 16 09:50:41 2015
@@ -37,10 +37,10 @@ public class IdleTimeoutTickerTest exten
private long _lastReadTime;
private long _lastWriteTime;
private long _currentTime;
- private int _maxWriteIdle;
- private int _maxReadIdle;
private boolean _readerIdle;
private boolean _writerIdle;
+ private long _maxReadIdleMillis;
+ private long _maxWriteIdleMillis;
@Override
public void setUp() throws Exception
@@ -52,14 +52,14 @@ public class IdleTimeoutTickerTest exten
_writerIdle = false;
_lastReadTime = 0l;
_lastWriteTime = 0l;
- _maxReadIdle = 0;
- _maxWriteIdle = 0;
+ _maxReadIdleMillis = 0;
+ _maxWriteIdleMillis = 0;
}
public void testNoIdle() throws Exception
{
- _maxReadIdle = 4;
- _maxWriteIdle = 2;
+ _maxReadIdleMillis = 4000;
+ _maxWriteIdleMillis = 2000;
_lastReadTime = 0;
_lastWriteTime = 1500;
_currentTime = 3000;
@@ -82,13 +82,13 @@ public class IdleTimeoutTickerTest exten
assertFalse("Incorrectly caused reader idle", _readerIdle);
assertFalse("Incorrectly caused writer idle", _writerIdle);
- _maxReadIdle = 0;
+ _maxReadIdleMillis = 0;
nextTime = _ticker.tick(_currentTime);
assertEquals("Incorrect next tick calculation", 1700l, nextTime);
assertFalse("Incorrectly caused reader idle", _readerIdle);
assertFalse("Incorrectly caused writer idle", _writerIdle);
- _maxWriteIdle = 0;
+ _maxWriteIdleMillis = 0;
nextTime = _ticker.tick(_currentTime);
assertEquals("Incorrect next tick calculation", DEFAULT_TIMEOUT,
nextTime);
assertFalse("Incorrectly caused reader idle", _readerIdle);
@@ -98,8 +98,8 @@ public class IdleTimeoutTickerTest exten
public void testReaderIdle() throws Exception
{
- _maxReadIdle = 4;
- _maxWriteIdle = 0;
+ _maxReadIdleMillis = 4000;
+ _maxWriteIdleMillis = 0;
_lastReadTime = 0;
_lastWriteTime = 2500;
_currentTime = 4000;
@@ -113,7 +113,7 @@ public class IdleTimeoutTickerTest exten
_readerIdle = false;
// last write = 2.5s, max write idle = 2s, should check in 0.5s
- _maxWriteIdle = 2;
+ _maxWriteIdleMillis = 2000;
nextTime = _ticker.tick(_currentTime);
assertTrue(_readerIdle);
assertFalse(_writerIdle);
@@ -131,8 +131,8 @@ public class IdleTimeoutTickerTest exten
public void testWriterIdle() throws Exception
{
- _maxReadIdle = 0;
- _maxWriteIdle = 2;
+ _maxReadIdleMillis = 0;
+ _maxWriteIdleMillis = 2000;
_lastReadTime = 0;
_lastWriteTime = 1500;
_currentTime = 4000;
@@ -146,7 +146,7 @@ public class IdleTimeoutTickerTest exten
_writerIdle = false;
_lastWriteTime = 1500;
- _maxReadIdle = 5;
+ _maxReadIdleMillis = 5000;
nextTime = _ticker.tick(_currentTime);
@@ -219,15 +219,15 @@ public class IdleTimeoutTickerTest exten
}
@Override
- public void setMaxWriteIdle(int sec)
+ public void setMaxWriteIdleMillis(final long millis)
{
- _maxWriteIdle = sec;
+ _maxWriteIdleMillis = millis;
}
@Override
- public void setMaxReadIdle(int sec)
+ public void setMaxReadIdleMillis(final long millis)
{
- _maxReadIdle = sec;
+ _maxReadIdleMillis = millis;
}
@Override
@@ -243,14 +243,14 @@ public class IdleTimeoutTickerTest exten
}
@Override
- public int getMaxReadIdle()
+ public long getMaxReadIdleMillis()
{
- return _maxReadIdle;
+ return _maxReadIdleMillis;
}
@Override
- public int getMaxWriteIdle()
+ public long getMaxWriteIdleMillis()
{
- return _maxWriteIdle;
+ return _maxWriteIdleMillis;
}
}
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java?rev=1714530&r1=1714529&r2=1714530&view=diff
==============================================================================
---
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
(original)
+++
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
Mon Nov 16 09:50:41 2015
@@ -367,8 +367,8 @@ public class MaxFrameSizeTest extends Qp
_maxFrameSize,
actualHeartbeatInterval);
-
conn.getNetworkConnection().setMaxReadIdle((int)(actualHeartbeatInterval*heartbeatTimeoutFactor));
-
conn.getNetworkConnection().setMaxWriteIdle(actualHeartbeatInterval);
+ conn.getNetworkConnection().setMaxReadIdleMillis((long)(1000L *
actualHeartbeatInterval*heartbeatTimeoutFactor));
+ conn.getNetworkConnection().setMaxWriteIdleMillis(1000L *
actualHeartbeatInterval);
conn.setMaxFrameSize(_maxFrameSize);
int channelMax = tune.getChannelMax();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]