Author: kwall
Date: Fri Mar 25 10:26:35 2016
New Revision: 1736551

URL: http://svn.apache.org/viewvc?rev=1736551&view=rev
Log:
QPID-7033: [Java Broker] Busy IO thread pools may cause client connections to 
be unfairly closed

Merged with command:

svn merge -c 1732184,1732452,1732461,1732812    ^/qpid/java/trunk



Added:
    
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
      - copied unchanged from r1732184, 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleReadTimeoutTicker.java
    
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
      - copied unchanged from r1732184, 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/ServerIdleWriteTimeoutTicker.java
Modified:
    qpid/java/branches/6.0.x/   (props changed)
    
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
    
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java
    
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
    
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
    
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
    
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
    
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
    
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
    
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
    
qpid/java/branches/6.0.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
    
qpid/java/branches/6.0.x/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
    
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java
    
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java

Propchange: qpid/java/branches/6.0.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 25 10:26:35 2016
@@ -9,5 +9,5 @@
 /qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
 /qpid/branches/java-network-refactor/qpid/java:805429-821809
 /qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
 
657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732525,1734452
+/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
 
657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732525,1734452
 /qpid/trunk/qpid:796646-796653

Modified: 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/protocol/ConnectionClosingTicker.java
 Fri Mar 25 10:26:35 2016
@@ -20,15 +20,15 @@
  */
 package org.apache.qpid.server.protocol;
 
-import org.apache.qpid.transport.network.NetworkConnection;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.transport.network.Ticker;
 
 public class ConnectionClosingTicker implements Ticker
 {
     private final long _timeoutTime;
-    private final NetworkConnection _network;
+    private final ServerNetworkConnection _network;
 
-    public ConnectionClosingTicker(final long timeoutTime, final 
NetworkConnection network)
+    public ConnectionClosingTicker(final long timeoutTime, final 
ServerNetworkConnection network)
     {
         _timeoutTime = timeoutTime;
         _network = network;
@@ -37,6 +37,11 @@ public class ConnectionClosingTicker imp
     @Override
     public int getTimeToNextTick(final long currentTime)
     {
+        if (_network.getScheduledTime() > 0)
+        {
+            return (int) (_timeoutTime - _network.getScheduledTime());
+        }
+
         return (int) (_timeoutTime - currentTime);
     }
 

Modified: 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/queue/SuspendedConsumerLoggingTicker.java
 Fri Mar 25 10:26:35 2016
@@ -24,8 +24,8 @@ import org.apache.qpid.transport.network
 
 abstract public class SuspendedConsumerLoggingTicker implements Ticker
 {
-    private long _nextTick;
-    private long _startTime;
+    private volatile long _nextTick;
+    private volatile long _startTime;
     private final long _repeatPeriod;
 
     public SuspendedConsumerLoggingTicker(final long repeatPeriod)

Modified: 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
 Fri Mar 25 10:26:35 2016
@@ -105,6 +105,8 @@ public abstract class AbstractAMQPConnec
     private volatile AccessControlContext _accessControllerContext;
     private volatile Thread _ioThread;
 
+    private volatile SlowConnectionOpenTicker _slowConnectionOpenTicker;
+
     public AbstractAMQPConnection(Broker<?> broker,
                                   ServerNetworkConnection network,
                                   AmqpPort<?> port,
@@ -166,7 +168,8 @@ public abstract class AbstractAMQPConnec
     {
         super.onOpen();
         long maxAuthDelay = _port.getContextValue(Long.class, 
Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY);
-        _aggregateTicker.addTicker(new SlowConnectionOpenTicker(maxAuthDelay));
+        _slowConnectionOpenTicker = new SlowConnectionOpenTicker(maxAuthDelay);
+        _aggregateTicker.addTicker(_slowConnectionOpenTicker);
         _lastReadTime = _lastWriteTime = getCreatedTime();
 
     }
@@ -176,7 +179,7 @@ public abstract class AbstractAMQPConnec
         return _broker;
     }
 
-    public final NetworkConnection getNetwork()
+    public final ServerNetworkConnection getNetwork()
     {
         return _network;
     }
@@ -673,9 +676,21 @@ public abstract class AbstractAMQPConnec
 
     protected abstract EventLogger getEventLogger();
 
+    @Override
+    public void processingStarted(final long currentTime)
+    {
+        SlowConnectionOpenTicker ticker = _slowConnectionOpenTicker;
+        long scheduledTime = _network.getScheduledTime();
+        if (ticker != null && scheduledTime > 0)
+        {
+            ticker.addSchedulingDelay(currentTime - scheduledTime);
+        }
+    }
+
     private class SlowConnectionOpenTicker implements Ticker
     {
         private final long _allowedTime;
+        private volatile long _accumulatedDelay;
 
         public SlowConnectionOpenTicker(long timeoutTime)
         {
@@ -685,7 +700,7 @@ public abstract class AbstractAMQPConnec
         @Override
         public int getTimeToNextTick(final long currentTime)
         {
-            final int timeToNextTick = (int) (getCreatedTime() + _allowedTime 
- currentTime);
+            final int timeToNextTick = (int) (getCreatedTime() + _allowedTime 
+ _accumulatedDelay - currentTime);
             return timeToNextTick;
         }
 
@@ -705,10 +720,15 @@ public abstract class AbstractAMQPConnec
                 else
                 {
                     _aggregateTicker.removeTicker(this);
+                    _slowConnectionOpenTicker = null;
                 }
             }
             return nextTick;
         }
-    }
 
+        public void addSchedulingDelay(final long delay)
+        {
+            _accumulatedDelay += delay;
+        }
+    }
 }

Modified: 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
 Fri Mar 25 10:26:35 2016
@@ -148,6 +148,12 @@ public class MultiVersionProtocolEngine
         _delegate.setIOThread(ioThread);
     }
 
+    @Override
+    public void processingStarted(final long currentTime)
+    {
+        _delegate.processingStarted(currentTime);
+    }
+
     public long getConnectionId()
     {
         return _id;
@@ -294,6 +300,12 @@ public class MultiVersionProtocolEngine
         }
 
         @Override
+        public void processingStarted(final long currentTime)
+        {
+
+        }
+
+        @Override
         public void closed()
         {
 
@@ -539,6 +551,11 @@ public class MultiVersionProtocolEngine
 
         }
 
+        @Override
+        public void processingStarted(final long currentTime)
+        {
+
+        }
 
         @Override
         public Subject getSubject()

Modified: 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
 Fri Mar 25 10:26:35 2016
@@ -66,6 +66,7 @@ public class NonBlockingConnection imple
 
     private final AmqpPort _port;
     private final AtomicBoolean _scheduled = new AtomicBoolean();
+    private volatile long _scheduledTime;
     private volatile boolean _unexpectedByteBufferSizeReported;
     private final String _threadName;
     private volatile SelectorThread.SelectionTask _selectionTask;
@@ -241,11 +242,13 @@ public class NonBlockingConnection imple
             try
             {
                 long currentTime = System.currentTimeMillis();
+                _protocolEngine.processingStarted(currentTime);
                 int tick = getTicker().getTimeToNextTick(currentTime);
                 if (tick <= 0)
                 {
                     getTicker().tick(currentTime);
                 }
+                _scheduledTime = 0;
 
                 _protocolEngine.setIOThread(Thread.currentThread());
                 _protocolEngine.setMessageAssignmentSuspended(true, true);
@@ -591,7 +594,12 @@ public class NonBlockingConnection imple
 
     public boolean setScheduled()
     {
-        return _scheduled.compareAndSet(false,true);
+        final boolean scheduled = _scheduled.compareAndSet(false, true);
+        if (scheduled)
+        {
+            _scheduledTime = System.currentTimeMillis();
+        }
+        return scheduled;
     }
 
     public void clearScheduled()
@@ -599,6 +607,12 @@ public class NonBlockingConnection imple
         _scheduled.set(false);
     }
 
+    @Override
+    public long getScheduledTime()
+    {
+        return _scheduledTime;
+    }
+
     void reportUnexpectedByteBufferSizeUsage()
     {
         if (!_unexpectedByteBufferSizeReported)

Modified: 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingNetworkTransport.java
 Fri Mar 25 10:26:35 2016
@@ -36,8 +36,8 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.configuration.CommonProperties;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.AggregateTicker;
+import org.apache.qpid.transport.network.Ticker;
 import org.apache.qpid.transport.network.TransportEncryption;
-import org.apache.qpid.transport.network.io.IdleTimeoutTicker;
 
 import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
 
@@ -149,10 +149,7 @@ public class NonBlockingNetworkTransport
 
                     socketChannel.configureBlocking(false);
 
-                    AggregateTicker aggregateTicker = 
engine.getAggregateTicker();
 
-                    final IdleTimeoutTicker idleTimeoutTicker = new 
IdleTimeoutTicker(engine, _timeout);
-                    aggregateTicker.addTicker(idleTimeoutTicker);
 
                     NonBlockingConnection connection =
                             new NonBlockingConnection(socketChannel,
@@ -170,11 +167,16 @@ public class NonBlockingNetworkTransport
                                                       _scheduler,
                                                       _port);
 
+                    AggregateTicker aggregateTicker = 
engine.getAggregateTicker();
+
+                    Ticker writeIdleTimeoutTicker = new 
ServerIdleWriteTimeoutTicker(connection, engine, _timeout);
+                    Ticker readIdleTimeoutTicker = new 
ServerIdleReadTimeoutTicker(connection, engine, _timeout);
+                    aggregateTicker.addTicker(writeIdleTimeoutTicker);
+                    aggregateTicker.addTicker(readIdleTimeoutTicker);
+
                     engine.setNetworkConnection(connection);
                     connection.setMaxReadIdleMillis(1000L * HANDSHAKE_TIMEOUT);
 
-                    idleTimeoutTicker.setConnection(connection);
-
                     connection.start();
 
                     _scheduler.addConnection(connection);

Modified: 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
 Fri Mar 25 10:26:35 2016
@@ -75,4 +75,6 @@ public interface ProtocolEngine extends
     void received(QpidByteBuffer msg);
 
     void setIOThread(Thread ioThread);
+
+    void processingStarted(long currentTime);
 }

Modified: 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
 Fri Mar 25 10:26:35 2016
@@ -26,4 +26,6 @@ public interface ServerNetworkConnection
     void reserveOutboundMessageSpace(long size);
 
     String getTransportInfo();
+
+    long getScheduledTime();
 }

Modified: 
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
 Fri Mar 25 10:26:35 2016
@@ -54,6 +54,7 @@ import org.apache.qpid.server.model.Tran
 import org.apache.qpid.server.model.port.AmqpPort;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
+import org.apache.qpid.server.transport.ServerNetworkConnection;
 import org.apache.qpid.server.util.Action;
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -147,7 +148,7 @@ public class ServerConnection extends Co
 
         if(state == State.CLOSING)
         {
-            getAmqpConnection().getAggregateTicker().addTicker(new 
ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_OK_TIMEOUT, 
getNetworkConnection()));
+            getAmqpConnection().getAggregateTicker().addTicker(new 
ConnectionClosingTicker(System.currentTimeMillis() + CLOSE_OK_TIMEOUT, 
(ServerNetworkConnection) getNetworkConnection()));
         }
     }
 

Modified: 
qpid/java/branches/6.0.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
 (original)
+++ 
qpid/java/branches/6.0.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
 Fri Mar 25 10:26:35 2016
@@ -445,6 +445,12 @@ class WebSocketProvider implements Accep
             return _connection.getProtocol();
         }
 
+        @Override
+        public long getScheduledTime()
+        {
+            return 0;
+        }
+
         void setPeerCertificate(final Certificate certificate)
         {
             _certificate = certificate;

Modified: 
qpid/java/branches/6.0.x/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
 (original)
+++ 
qpid/java/branches/6.0.x/qpid-test-utils/src/main/java/org/apache/qpid/test/utils/TCPTunneler.java
 Fri Mar 25 10:26:35 2016
@@ -41,7 +41,7 @@ import org.slf4j.LoggerFactory;
  * A basic implementation of TCP traffic forwarder between ports.
  * It is intended to use in tests.
  */
-public class TCPTunneler
+public class TCPTunneler implements AutoCloseable
 {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(TCPTunneler.class);
 
@@ -61,6 +61,11 @@ public class TCPTunneler
         _tcpWorker.start();
     }
 
+    public void stopClientToServerForwarding(final InetSocketAddress 
clientAddress)
+    {
+        _tcpWorker.stopClientToServerForwarding(clientAddress);
+    }
+
     public void stop()
     {
         try
@@ -92,6 +97,12 @@ public class TCPTunneler
         }
     }
 
+    @Override
+    public void close() throws Exception
+    {
+        stop();
+    }
+
     public interface TunnelListener
     {
         void clientConnected(InetSocketAddress clientAddress);
@@ -111,6 +122,7 @@ public class TCPTunneler
         private final TunnelListener _notifyingListener;
         private volatile ServerSocket _serverSocket;
         private volatile ExecutorService _executor;
+        private int _actualLocalPort;
 
         public TCPWorker(final int localPort,
                          final String remoteHost,
@@ -180,20 +192,22 @@ public class TCPTunneler
 
         public void start()
         {
-            LOGGER.info("Starting TCPTunneler forwarding from port {} to {}", 
_localPort, _remoteHostPort);
+            _actualLocalPort = _localPort;
             try
             {
                 _serverSocket = new ServerSocket(_localPort);
+                _actualLocalPort = _serverSocket.getLocalPort();
+                LOGGER.info                                  ("Starting 
TCPTunneler forwarding from port {} to {}",
+                            _actualLocalPort, _remoteHostPort);
                 _serverSocket.setReuseAddress(true);
             }
             catch (IOException e)
             {
-                throw new RuntimeException("Cannot start TCPTunneler on port " 
+ _localPort, e);
+                throw new RuntimeException("Cannot start TCPTunneler on port " 
+ _actualLocalPort, e);
             }
 
             if (_serverSocket != null)
             {
-                LOGGER.info("Listening on port {}", _localPort);
                 try
                 {
                     _executor.execute(this);
@@ -206,7 +220,7 @@ public class TCPTunneler
                     }
                     finally
                     {
-                        throw new RuntimeException("Cannot start acceptor 
thread for TCPTunneler on port " + _localPort,
+                        throw new RuntimeException("Cannot start acceptor 
thread for TCPTunneler on port " + _actualLocalPort,
                                                    e);
                     }
                 }
@@ -218,7 +232,7 @@ public class TCPTunneler
             if (_closed.compareAndSet(false, true))
             {
                 LOGGER.info("Stopping TCPTunneler forwarding from port {} to 
{}",
-                            _localPort,
+                            _actualLocalPort,
                             _remoteHostPort);
                 try
                 {
@@ -233,7 +247,7 @@ public class TCPTunneler
                 }
 
                 LOGGER.info("TCPTunneler forwarding from port {} to {} is 
stopped",
-                            _localPort,
+                            _actualLocalPort,
                             _remoteHostPort);
             }
         }
@@ -326,6 +340,28 @@ public class TCPTunneler
             }
         }
 
+        public void stopClientToServerForwarding(final InetSocketAddress 
clientAddress)
+        {
+            SocketTunnel target = null;
+            for (SocketTunnel tunnel : _tunnels)
+            {
+                if (tunnel.getClientAddress().equals(clientAddress))
+                {
+                    target = tunnel;
+                    break;
+                }
+            }
+            if (target != null)
+            {
+                LOGGER.debug("Stopping forwarding from client {} to server", 
clientAddress);
+                target.stopClientToServerForwarding();
+            }
+            else
+            {
+                throw new IllegalArgumentException("Could not find tunnel for 
address " + clientAddress);
+            }
+        }
+
         private void closeServerSocket()
         {
             if (_serverSocket != null)
@@ -345,7 +381,6 @@ public class TCPTunneler
             }
         }
 
-
         private SocketTunnel removeTunnel(final InetSocketAddress 
clientAddress)
         {
             SocketTunnel client = null;
@@ -372,8 +407,8 @@ public class TCPTunneler
         private final Socket _serverSocket;
         private final TunnelListener _tunnelListener;
         private final AtomicBoolean _closed;
-        private final AutoClosingStreamForwarder _upStreamForwarder;
-        private final AutoClosingStreamForwarder _downStreamForwarder;
+        private final AutoClosingStreamForwarder _clientToServer;
+        private final AutoClosingStreamForwarder _serverToClient;
         private final InetSocketAddress _clientSocketAddress;
 
         public SocketTunnel(final Socket clientSocket,
@@ -388,8 +423,8 @@ public class TCPTunneler
             _tunnelListener = tunnelListener;
             _clientSocket.setKeepAlive(true);
             _serverSocket.setKeepAlive(true);
-            _upStreamForwarder = new AutoClosingStreamForwarder(new 
StreamForwarder(_clientSocket, _serverSocket));
-            _downStreamForwarder = new AutoClosingStreamForwarder(new 
StreamForwarder(_serverSocket, _clientSocket));
+            _clientToServer = new AutoClosingStreamForwarder(new 
StreamForwarder(_clientSocket, _serverSocket));
+            _serverToClient = new AutoClosingStreamForwarder(new 
StreamForwarder(_serverSocket, _clientSocket));
         }
 
         public void close()
@@ -410,11 +445,16 @@ public class TCPTunneler
 
         public void start(Executor executor) throws IOException
         {
-            executor.execute(_upStreamForwarder);
-            executor.execute(_downStreamForwarder);
+            executor.execute(_clientToServer);
+            executor.execute(_serverToClient);
             _tunnelListener.clientConnected(getClientAddress());
         }
 
+        public void stopClientToServerForwarding()
+        {
+            _clientToServer.stopForwarding();
+        }
+
         public boolean isClosed()
         {
             return _closed.get();
@@ -430,7 +470,6 @@ public class TCPTunneler
             return _clientSocketAddress;
         }
 
-
         private static void closeSocket(Socket socket)
         {
             if (socket != null)
@@ -472,6 +511,11 @@ public class TCPTunneler
                     currentThread.setName(originalThreadName);
                 }
             }
+
+            public void stopForwarding()
+            {
+                _streamForwarder.stopForwarding();
+            }
         }
     }
 
@@ -482,13 +526,13 @@ public class TCPTunneler
         private final InputStream _inputStream;
         private final OutputStream _outputStream;
         private final String _name;
+        private AtomicBoolean _stopForwarding = new AtomicBoolean();
 
         public StreamForwarder(Socket input, Socket output) throws IOException
         {
             _inputStream = input.getInputStream();
             _outputStream = output.getOutputStream();
-            _name = "Forwarder-" + input.getInetAddress().getHostName() + ":" 
+ input.getPort() + "->"
-                    + output.getInetAddress().getHostName() + ":" + 
output.getPort();
+            _name = "Forwarder-" + input.getLocalSocketAddress() + "->" + 
output.getRemoteSocketAddress();
         }
 
         @Override
@@ -500,8 +544,16 @@ public class TCPTunneler
             {
                 while ((bytesRead = _inputStream.read(buffer)) != -1)
                 {
-                    _outputStream.write(buffer, 0, bytesRead);
-                    _outputStream.flush();
+                    if (!_stopForwarding.get())
+                    {
+                        _outputStream.write(buffer, 0, bytesRead);
+                        _outputStream.flush();
+                        LOGGER.debug("Forwarded {} byte(s)", bytesRead);
+                    }
+                    else
+                    {
+                        LOGGER.debug("Discarded {} byte(s)", bytesRead);
+                    }
                 }
             }
             catch (IOException e)
@@ -535,5 +587,11 @@ public class TCPTunneler
         {
             return _name;
         }
+
+        public void stopForwarding()
+        {
+            _stopForwarding.set(true);
+        }
+
     }
 }

Modified: 
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java
 (original)
+++ 
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/client/HeartbeatTest.java
 Fri Mar 25 10:26:35 2016
@@ -20,11 +20,15 @@ package org.apache.qpid.client;
 
 import static 
org.apache.qpid.configuration.ClientProperties.QPID_HEARTBEAT_INTERVAL;
 
+import java.net.InetSocketAddress;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -34,6 +38,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.test.utils.TCPTunneler;
 
 public class HeartbeatTest extends QpidBrokerTestCase
 {
@@ -192,6 +197,56 @@ public class HeartbeatTest extends QpidB
         conn.close();
     }
 
+    public void testClientStopsSendingHeartbeats_BrokerClosesConnection() 
throws Exception
+    {
+        try(TCPTunneler tcpTunneler = new TCPTunneler(getFailingPort(), 
"localhost", getPort(), 1))
+        {
+            tcpTunneler.start();
+
+            final AtomicReference<InetSocketAddress> clientAddressRef = new 
AtomicReference<>();
+            tcpTunneler.addClientListener(new TCPTunneler.TunnelListener()
+            {
+                @Override
+                public void clientConnected(final InetSocketAddress 
clientAddress)
+                {
+                    clientAddressRef.set(clientAddress);
+                }
+
+                @Override
+                public void clientDisconnected(final InetSocketAddress 
clientAddress)
+                {
+                }
+            });
+
+            final CountDownLatch exceptionLatch = new CountDownLatch(1);
+            final String url = String.format(CONNECTION_URL_WITH_HEARTBEAT,  
getFailingPort(), 1);
+            AMQConnection conn = (AMQConnection) getConnection(new 
AMQConnectionURL(url));
+            conn.setHeartbeatListener(_listener);
+            conn.setExceptionListener(new ExceptionListener()
+            {
+                @Override
+                public void onException(final JMSException exception)
+                {
+                    LOGGER.debug("Exception listener got exception", 
exception);
+                    exceptionLatch.countDown();
+                }
+            });
+            conn.start();
+
+            assertNotNull(clientAddressRef.get());
+
+            _listener.awaitExpectedHeartbeats(MAXIMUM_WAIT_TIME);
+
+            assertTrue("Too few heartbeats received: 
"+_listener.getHeartbeatsReceived() +" (expected at least 2)", 
_listener.getHeartbeatsReceived() >=2);
+            assertTrue("Too few heartbeats sent 
"+_listener.getHeartbeatsSent() +" (expected at least 2)", 
_listener.getHeartbeatsSent() >=2);
+
+            tcpTunneler.stopClientToServerForwarding(clientAddressRef.get());
+
+            exceptionLatch.await(5, TimeUnit.SECONDS);
+            assertTrue("Connection should be disconnected within timeout", 
conn.isConnected());
+        }
+    }
+
     private class TestListener implements HeartbeatListener
     {
         private final String _name;

Modified: 
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
URL: 
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java?rev=1736551&r1=1736550&r2=1736551&view=diff
==============================================================================
--- 
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
 (original)
+++ 
qpid/java/branches/6.0.x/systests/src/test/java/org/apache/qpid/transport/ProtocolNegotiationTest.java
 Fri Mar 25 10:26:35 2016
@@ -145,11 +145,12 @@ public class ProtocolNegotiationTest ext
             boolean brokenPipe = false;
             while(timeout > System.currentTimeMillis())
             {
-                if (!writeHeartbeat(dataOutputStream));
+                if (!writeHeartbeat(dataOutputStream))
                 {
                     brokenPipe = true;
                     break;
                 }
+                Thread.sleep(100);
             }
             assertTrue("Expected pipe to become broken within "
                        + Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY + " 
timeout", brokenPipe);



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to