Author: kwall
Date: Fri Mar 25 10:26:35 2016
New Revision: 1736551
URL: http://svn.apache.org/viewvc?rev=1736551&view=rev
Log:
QPID-7033: [Java Broker] Busy IO thread pools may cause client connections to
be unfairly closed
Merged with command:
svn merge -c 1732184,1732452,1732461,1732812 ^/qpid/java/trunk
Added:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
- copied unchanged from r1732184,
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
- copied unchanged from r1732184,
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
Modified:
qpid/java/branches/6.0.x/ (props changed)
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
qpid/java/branches/6.0.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
qpid/java/branches/6.0.x/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
Propchange: qpid/java/branches/6.0.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 25 10:26:35 2016
@@ -9,5 +9,5 @@
/qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
/qpid/branches/java-network-refactor/qpid/java:805429-821809
/qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732525,1734452
+/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732525,1734452
/qpid/trunk/qpid:796646-796653
Modified:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
Fri Mar 25 10:26:35 2016
@@ -20,15 +20,15 @@
*/
package org.apache.qpid.server.protocol;
-import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.transport.network.Ticker;
public class ConnectionClosingTicker implements Ticker
{
private final long _timeoutTime;
- private final NetworkConnection _network;
+ private final ServerNetworkConnection _network;
- public ConnectionClosingTicker(final long timeoutTime, final
NetworkConnection network)
+ public ConnectionClosingTicker(final long timeoutTime, final
ServerNetworkConnection network)
{
_timeoutTime = timeoutTime;
_network = network;
@@ -37,6 +37,11 @@ public class ConnectionClosingTicker imp
@Override
public int getTimeToNextTick(final long currentTime)
{
+ if (_network.getScheduledTime() > 0)
+ {
+ return (int) (_timeoutTime - _network.getScheduledTime());
+ }
+
return (int) (_timeoutTime - currentTime);
}
Modified:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java
Fri Mar 25 10:26:35 2016
@@ -24,8 +24,8 @@ import org.apache.qpid.transport.network
abstract public class SuspendedConsumerLoggingTicker implements Ticker
{
- private long _nextTick;
- private long _startTime;
+ private volatile long _nextTick;
+ private volatile long _startTime;
private final long _repeatPeriod;
public SuspendedConsumerLoggingTicker(final long repeatPeriod)
Modified:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
Fri Mar 25 10:26:35 2016
@@ -105,6 +105,8 @@ public abstract class AbstractAMQPConnec
private volatile AccessControlContext _accessControllerContext;
private volatile Thread _ioThread;
+ private volatile SlowConnectionOpenTicker _slowConnectionOpenTicker;
+
public AbstractAMQPConnection(Broker<?> broker,
ServerNetworkConnection network,
AmqpPort<?> port,
@@ -166,7 +168,8 @@ public abstract class AbstractAMQPConnec
{
super.onOpen();
long maxAuthDelay = _port.getContextValue(Long.class,
Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY);
- _aggregateTicker.addTicker(new SlowConnectionOpenTicker(maxAuthDelay));
+ _slowConnectionOpenTicker = new SlowConnectionOpenTicker(maxAuthDelay);
+ _aggregateTicker.addTicker(_slowConnectionOpenTicker);
_lastReadTime = _lastWriteTime = getCreatedTime();
}
@@ -176,7 +179,7 @@ public abstract class AbstractAMQPConnec
return _broker;
}
- public final NetworkConnection getNetwork()
+ public final ServerNetworkConnection getNetwork()
{
return _network;
}
@@ -673,9 +676,21 @@ public abstract class AbstractAMQPConnec
protected abstract EventLogger getEventLogger();
+ @Override
+ public void processingStarted(final long currentTime)
+ {
+ SlowConnectionOpenTicker ticker = _slowConnectionOpenTicker;
+ long scheduledTime = _network.getScheduledTime();
+ if (ticker != null && scheduledTime > 0)
+ {
+ ticker.addSchedulingDelay(currentTime - scheduledTime);
+ }
+ }
+
private class SlowConnectionOpenTicker implements Ticker
{
private final long _allowedTime;
+ private volatile long _accumulatedDelay;
public SlowConnectionOpenTicker(long timeoutTime)
{
@@ -685,7 +700,7 @@ public abstract class AbstractAMQPConnec
@Override
public int getTimeToNextTick(final long currentTime)
{
- final int timeToNextTick = (int) (getCreatedTime() + _allowedTime
- currentTime);
+ final int timeToNextTick = (int) (getCreatedTime() + _allowedTime
+ _accumulatedDelay - currentTime);
return timeToNextTick;
}
@@ -705,10 +720,15 @@ public abstract class AbstractAMQPConnec
else
{
_aggregateTicker.removeTicker(this);
+ _slowConnectionOpenTicker = null;
}
}
return nextTick;
}
- }
+ public void addSchedulingDelay(final long delay)
+ {
+ _accumulatedDelay += delay;
+ }
+ }
}
Modified:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
Fri Mar 25 10:26:35 2016
@@ -148,6 +148,12 @@ public class MultiVersionProtocolEngine
_delegate.setIOThread(ioThread);
}
+ @Override
+ public void processingStarted(final long currentTime)
+ {
+ _delegate.processingStarted(currentTime);
+ }
+
public long getConnectionId()
{
return _id;
@@ -294,6 +300,12 @@ public class MultiVersionProtocolEngine
}
@Override
+ public void processingStarted(final long currentTime)
+ {
+
+ }
+
+ @Override
public void closed()
{
@@ -539,6 +551,11 @@ public class MultiVersionProtocolEngine
}
+ @Override
+ public void processingStarted(final long currentTime)
+ {
+
+ }
@Override
public Subject getSubject()
Modified:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
Fri Mar 25 10:26:35 2016
@@ -66,6 +66,7 @@ public class NonBlockingConnection imple
private final AmqpPort _port;
private final AtomicBoolean _scheduled = new AtomicBoolean();
+ private volatile long _scheduledTime;
private volatile boolean _unexpectedByteBufferSizeReported;
private final String _threadName;
private volatile SelectorThread.SelectionTask _selectionTask;
@@ -241,11 +242,13 @@ public class NonBlockingConnection imple
try
{
long currentTime = System.currentTimeMillis();
+ _protocolEngine.processingStarted(currentTime);
int tick = getTicker().getTimeToNextTick(currentTime);
if (tick <= 0)
{
getTicker().tick(currentTime);
}
+ _scheduledTime = 0;
_protocolEngine.setIOThread(Thread.currentThread());
_protocolEngine.setMessageAssignmentSuspended(true, true);
@@ -591,7 +594,12 @@ public class NonBlockingConnection imple
public boolean setScheduled()
{
- return _scheduled.compareAndSet(false,true);
+ final boolean scheduled = _scheduled.compareAndSet(false, true);
+ if (scheduled)
+ {
+ _scheduledTime = System.currentTimeMillis();
+ }
+ return scheduled;
}
public void clearScheduled()
@@ -599,6 +607,12 @@ public class NonBlockingConnection imple
_scheduled.set(false);
}
+ @Override
+ public long getScheduledTime()
+ {
+ return _scheduledTime;
+ }
+
void reportUnexpectedByteBufferSizeUsage()
{
if (!_unexpectedByteBufferSizeReported)
Modified:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
Fri Mar 25 10:26:35 2016
@@ -36,8 +36,8 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.configuration.CommonProperties;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.AggregateTicker;
+import org.apache.qpid.transport.network.Ticker;
import org.apache.qpid.transport.network.TransportEncryption;
-import org.apache.qpid.transport.network.io.IdleTimeoutTicker;
import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
@@ -149,10 +149,7 @@ public class NonBlockingNetworkTransport
socketChannel.configureBlocking(false);
- AggregateTicker aggregateTicker =
engine.getAggregateTicker();
- final IdleTimeoutTicker idleTimeoutTicker = new
IdleTimeoutTicker(engine, _timeout);
- aggregateTicker.addTicker(idleTimeoutTicker);
NonBlockingConnection connection =
new NonBlockingConnection(socketChannel,
@@ -170,11 +167,16 @@ public class NonBlockingNetworkTransport
_scheduler,
_port);
+ AggregateTicker aggregateTicker =
engine.getAggregateTicker();
+
+ Ticker writeIdleTimeoutTicker = new
ServerIdleWriteTimeoutTicker(connection, engine, _timeout);
+ Ticker readIdleTimeoutTicker = new
ServerIdleReadTimeoutTicker(connection, engine, _timeout);
+ aggregateTicker.addTicker(writeIdleTimeoutTicker);
+ aggregateTicker.addTicker(readIdleTimeoutTicker);
+
engine.setNetworkConnection(connection);
connection.setMaxReadIdleMillis(1000L * HANDSHAKE_TIMEOUT);
- idleTimeoutTicker.setConnection(connection);
-
connection.start();
_scheduler.addConnection(connection);
Modified:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
Fri Mar 25 10:26:35 2016
@@ -75,4 +75,6 @@ public interface ProtocolEngine extends
void received(QpidByteBuffer msg);
void setIOThread(Thread ioThread);
+
+ void processingStarted(long currentTime);
}
Modified:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
Fri Mar 25 10:26:35 2016
@@ -26,4 +26,6 @@ public interface ServerNetworkConnection
void reserveOutboundMessageSpace(long size);
String getTransportInfo();
+
+ long getScheduledTime();
}
Modified:
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
(original)
+++
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
Fri Mar 25 10:26:35 2016
@@ -54,6 +54,7 @@ import org.apache.qpid.server.model.Tran
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -147,7 +148,7 @@ public class ServerConnection extends Co
if(state == State.CLOSING)
{
- getAmqpConnection().getAggregateTicker().addTicker(new
ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_OK_TIMEOUT,
getNetworkConnection()));
+ getAmqpConnection().getAggregateTicker().addTicker(new
ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_OK_TIMEOUT,
(ServerNetworkConnection) getNetworkConnection()));
}
}
Modified:
qpid/java/branches/6.0.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
(original)
+++
qpid/java/branches/6.0.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
Fri Mar 25 10:26:35 2016
@@ -445,6 +445,12 @@ class WebSocketProvider implements Accep
return _connection.getProtocol();
}
+ @Override
+ public long getScheduledTime()
+ {
+ return 0;
+ }
+
void setPeerCertificate(final Certificate certificate)
{
_certificate = certificate;
Modified:
qpid/java/branches/6.0.x/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
(original)
+++
qpid/java/branches/6.0.x/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
Fri Mar 25 10:26:35 2016
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
* A basic implementation of TCP traffic forwarder between ports.
* It is intended to use in tests.
*/
-public class TCPTunneler
+public class TCPTunneler implements AutoCloseable
{
private static final Logger LOGGER =
LoggerFactory.getLogger(TCPTunneler.class);
@@ -61,6 +61,11 @@ public class TCPTunneler
_tcpWorker.start();
}
+ public void stopClientToServerForwarding(final InetSocketAddress
clientAddress)
+ {
+ _tcpWorker.stopClientToServerForwarding(clientAddress);
+ }
+
public void stop()
{
try
@@ -92,6 +97,12 @@ public class TCPTunneler
}
}
+ @Override
+ public void close() throws Exception
+ {
+ stop();
+ }
+
public interface TunnelListener
{
void clientConnected(InetSocketAddress clientAddress);
@@ -111,6 +122,7 @@ public class TCPTunneler
private final TunnelListener _notifyingListener;
private volatile ServerSocket _serverSocket;
private volatile ExecutorService _executor;
+ private int _actualLocalPort;
public TCPWorker(final int localPort,
final String remoteHost,
@@ -180,20 +192,22 @@ public class TCPTunneler
public void start()
{
- LOGGER.info("Starting TCPTunneler forwarding from port {} to {}",
_localPort, _remoteHostPort);
+ _actualLocalPort = _localPort;
try
{
_serverSocket = new ServerSocket(_localPort);
+ _actualLocalPort = _serverSocket.getLocalPort();
+ LOGGER.info ("Starting
TCPTunneler forwarding from port {} to {}",
+ _actualLocalPort, _remoteHostPort);
_serverSocket.setReuseAddress(true);
}
catch (IOException e)
{
- throw new RuntimeException("Cannot start TCPTunneler on port "
+ _localPort, e);
+ throw new RuntimeException("Cannot start TCPTunneler on port "
+ _actualLocalPort, e);
}
if (_serverSocket != null)
{
- LOGGER.info("Listening on port {}", _localPort);
try
{
_executor.execute(this);
@@ -206,7 +220,7 @@ public class TCPTunneler
}
finally
{
- throw new RuntimeException("Cannot start acceptor
thread for TCPTunneler on port " + _localPort,
+ throw new RuntimeException("Cannot start acceptor
thread for TCPTunneler on port " + _actualLocalPort,
e);
}
}
@@ -218,7 +232,7 @@ public class TCPTunneler
if (_closed.compareAndSet(false, true))
{
LOGGER.info("Stopping TCPTunneler forwarding from port {} to
{}",
- _localPort,
+ _actualLocalPort,
_remoteHostPort);
try
{
@@ -233,7 +247,7 @@ public class TCPTunneler
}
LOGGER.info("TCPTunneler forwarding from port {} to {} is
stopped",
- _localPort,
+ _actualLocalPort,
_remoteHostPort);
}
}
@@ -326,6 +340,28 @@ public class TCPTunneler
}
}
+ public void stopClientToServerForwarding(final InetSocketAddress
clientAddress)
+ {
+ SocketTunnel target = null;
+ for (SocketTunnel tunnel : _tunnels)
+ {
+ if (tunnel.getClientAddress().equals(clientAddress))
+ {
+ target = tunnel;
+ break;
+ }
+ }
+ if (target != null)
+ {
+ LOGGER.debug("Stopping forwarding from client {} to server",
clientAddress);
+ target.stopClientToServerForwarding();
+ }
+ else
+ {
+ throw new IllegalArgumentException("Could not find tunnel for
address " + clientAddress);
+ }
+ }
+
private void closeServerSocket()
{
if (_serverSocket != null)
@@ -345,7 +381,6 @@ public class TCPTunneler
}
}
-
private SocketTunnel removeTunnel(final InetSocketAddress
clientAddress)
{
SocketTunnel client = null;
@@ -372,8 +407,8 @@ public class TCPTunneler
private final Socket _serverSocket;
private final TunnelListener _tunnelListener;
private final AtomicBoolean _closed;
- private final AutoClosingStreamForwarder _upStreamForwarder;
- private final AutoClosingStreamForwarder _downStreamForwarder;
+ private final AutoClosingStreamForwarder _clientToServer;
+ private final AutoClosingStreamForwarder _serverToClient;
private final InetSocketAddress _clientSocketAddress;
public SocketTunnel(final Socket clientSocket,
@@ -388,8 +423,8 @@ public class TCPTunneler
_tunnelListener = tunnelListener;
_clientSocket.setKeepAlive(true);
_serverSocket.setKeepAlive(true);
- _upStreamForwarder = new AutoClosingStreamForwarder(new
StreamForwarder(_clientSocket, _serverSocket));
- _downStreamForwarder = new AutoClosingStreamForwarder(new
StreamForwarder(_serverSocket, _clientSocket));
+ _clientToServer = new AutoClosingStreamForwarder(new
StreamForwarder(_clientSocket, _serverSocket));
+ _serverToClient = new AutoClosingStreamForwarder(new
StreamForwarder(_serverSocket, _clientSocket));
}
public void close()
@@ -410,11 +445,16 @@ public class TCPTunneler
public void start(Executor executor) throws IOException
{
- executor.execute(_upStreamForwarder);
- executor.execute(_downStreamForwarder);
+ executor.execute(_clientToServer);
+ executor.execute(_serverToClient);
_tunnelListener.clientConnected(getClientAddress());
}
+ public void stopClientToServerForwarding()
+ {
+ _clientToServer.stopForwarding();
+ }
+
public boolean isClosed()
{
return _closed.get();
@@ -430,7 +470,6 @@ public class TCPTunneler
return _clientSocketAddress;
}
-
private static void closeSocket(Socket socket)
{
if (socket != null)
@@ -472,6 +511,11 @@ public class TCPTunneler
currentThread.setName(originalThreadName);
}
}
+
+ public void stopForwarding()
+ {
+ _streamForwarder.stopForwarding();
+ }
}
}
@@ -482,13 +526,13 @@ public class TCPTunneler
private final InputStream _inputStream;
private final OutputStream _outputStream;
private final String _name;
+ private AtomicBoolean _stopForwarding = new AtomicBoolean();
public StreamForwarder(Socket input, Socket output) throws IOException
{
_inputStream = input.getInputStream();
_outputStream = output.getOutputStream();
- _name = "Forwarder-" + input.getInetAddress().getHostName() + ":"
+ input.getPort() + "->"
- + output.getInetAddress().getHostName() + ":" +
output.getPort();
+ _name = "Forwarder-" + input.getLocalSocketAddress() + "->" +
output.getRemoteSocketAddress();
}
@Override
@@ -500,8 +544,16 @@ public class TCPTunneler
{
while ((bytesRead = _inputStream.read(buffer)) != -1)
{
- _outputStream.write(buffer, 0, bytesRead);
- _outputStream.flush();
+ if (!_stopForwarding.get())
+ {
+ _outputStream.write(buffer, 0, bytesRead);
+ _outputStream.flush();
+ LOGGER.debug("Forwarded {} byte(s)", bytesRead);
+ }
+ else
+ {
+ LOGGER.debug("Discarded {} byte(s)", bytesRead);
+ }
}
}
catch (IOException e)
@@ -535,5 +587,11 @@ public class TCPTunneler
{
return _name;
}
+
+ public void stopForwarding()
+ {
+ _stopForwarding.set(true);
+ }
+
}
}
Modified:
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java
(original)
+++
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java
Fri Mar 25 10:26:35 2016
@@ -20,11 +20,15 @@ package org.apache.qpid.client;
import static
org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTERVAL;
+import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
@@ -34,6 +38,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TCPTunneler;
public class HeartbeatTest extends QpidBrokerTestCase
{
@@ -192,6 +197,56 @@ public class HeartbeatTest extends QpidB
conn.close();
}
+ public void testClientStopsSendingHeartbeats_BrokerClosesConnection()
throws Exception
+ {
+ try(TCPTunneler tcpTunneler = new TCPTunneler(getFailingPort(),
"localhost", getPort(), 1))
+ {
+ tcpTunneler.start();
+
+ final AtomicReference<InetSocketAddress> clientAddressRef = new
AtomicReference<>();
+ tcpTunneler.addClientListener(new TCPTunneler.TunnelListener()
+ {
+ @Override
+ public void clientConnected(final InetSocketAddress
clientAddress)
+ {
+ clientAddressRef.set(clientAddress);
+ }
+
+ @Override
+ public void clientDisconnected(final InetSocketAddress
clientAddress)
+ {
+ }
+ });
+
+ final CountDownLatch exceptionLatch = new CountDownLatch(1);
+ final String url = String.format(CONNECTION_URL_WITH_HEARTBEAT,
getFailingPort(), 1);
+ AMQConnection conn = (AMQConnection) getConnection(new
AMQConnectionURL(url));
+ conn.setHeartbeatListener(_listener);
+ conn.setExceptionListener(new ExceptionListener()
+ {
+ @Override
+ public void onException(final JMSException exception)
+ {
+ LOGGER.debug("Exception listener got exception",
exception);
+ exceptionLatch.countDown();
+ }
+ });
+ conn.start();
+
+ assertNotNull(clientAddressRef.get());
+
+ _listener.awaitExpectedHeartbeats(MAXIMUM_WAIT_TIME);
+
+ assertTrue("Too few heartbeats received:
"+_listener.getHeartbeatsReceived() +" (expected at least 2)",
_listener.getHeartbeatsReceived() >=2);
+ assertTrue("Too few heartbeats sent
"+_listener.getHeartbeatsSent() +" (expected at least 2)",
_listener.getHeartbeatsSent() >=2);
+
+ tcpTunneler.stopClientToServerForwarding(clientAddressRef.get());
+
+ exceptionLatch.await(5, TimeUnit.SECONDS);
+ assertTrue("Connection should be disconnected within timeout",
conn.isConnected());
+ }
+ }
+
private class TestListener implements HeartbeatListener
{
private final String _name;
Modified:
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
(original)
+++
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
Fri Mar 25 10:26:35 2016
@@ -145,11 +145,12 @@ public class ProtocolNegotiationTest ext
boolean brokenPipe = false;
while(timeout > System.currentTimeMillis())
{
- if (!writeHeartbeat(dataOutputStream));
+ if (!writeHeartbeat(dataOutputStream))
{
brokenPipe = true;
break;
}
+ Thread.sleep(100);
}
assertTrue("Expected pipe to become broken within "
+ Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY + "
timeout", brokenPipe);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]