Author: kwall
Date: Fri Mar 25 11:13:08 2016
New Revision: 1736560
URL: http://svn.apache.org/viewvc?rev=1736560&view=rev
Log:
QPID-7033: [Java Broker] Busy IO thread pools may cause client connections to
be unfairly closed
Should have been part of previous commit.
Merged with command:
svn merge -c 1732452,1732461,1732812 ^/qpid/java/trunk
Added:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/SchedulingDelayNotificationListener.java
- copied unchanged from r1732452,
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SchedulingDelayNotificationListener.java
Modified:
qpid/java/branches/6.0.x/ (props changed)
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
qpid/java/branches/6.0.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
qpid/java/branches/6.0.x/test-profiles/CPPExcludes (contents, props
changed)
Propchange: qpid/java/branches/6.0.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 25 11:13:08 2016
@@ -9,5 +9,5 @@
/qpid/branches/java-broker-vhost-refactor/java:1493674-1494547
/qpid/branches/java-network-refactor/qpid/java:805429-821809
/qpid/branches/qpid-2935/qpid/java:1061302-1072333
-/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732525,1734452
+/qpid/java/trunk:1715445-1715447,1715586,1715940,1716086-1716087,1716127-1716128,1716141,1716153,1716155,1716194,1716204,1716209,1716227,1716277,1716357,1716368,1716370,1716374,1716432,1716444-1716445,1716455,1716461,1716474,1716489,1716497,1716515,1716555,1716602,1716606-1716610,1716619,1716636,1717269,1717299,1717401,1717446,1717449,1717626,1717691,1717735,1717780,1718744,1718889,1718893,1718918,1718922,1719026,1719028,1719033,1719037,1719047,1719051,1720340,1720664,1721151,1721198,1722019-1722020,1722246,1722339,1722416,1722674,1722678,1722683,1722711,1723064,1723194,1723563,1724216,1724251,1724257,1724292,1724375,1724397,1724432,1724582,1724603,1724780,1724843-1724844,1725295,1725569,1725760,1726176,1726244-1726246,1726249,1726358,1726436,1726449,1726456,1726646,1726653,1726755,1726778,1727532,1727555,1727608,1727951,1727954,1728089,1728167,1728302,1728497,1728501,1728524,1728639,1728651,1728772,1729215,1729297,1729347,1729356,1729406,1729408,1729412,1729515,1729638,1729656-1729
657,1729783,1729828,1729832,1729841,1729851,1729886,1729904,1729973,1730019,1730025,1730052,1730072,1730088,1730494,1730499,1730547,1730559,1730567,1730578,1730585,1730651,1730697,1730712-1730713,1730805,1731029,1731110,1731210,1731225,1731444,1731551,1731612,1732184,1732452,1732461,1732525,1732812,1734452
/qpid/trunk/qpid:796646-796653
Modified:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java?rev=1736560&r1=1736559&r2=1736560&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/AbstractAMQPConnection.java
Fri Mar 25 11:13:08 2016
@@ -105,8 +105,6 @@ public abstract class AbstractAMQPConnec
private volatile AccessControlContext _accessControllerContext;
private volatile Thread _ioThread;
- private volatile SlowConnectionOpenTicker _slowConnectionOpenTicker;
-
public AbstractAMQPConnection(Broker<?> broker,
ServerNetworkConnection network,
AmqpPort<?> port,
@@ -168,8 +166,9 @@ public abstract class AbstractAMQPConnec
{
super.onOpen();
long maxAuthDelay = _port.getContextValue(Long.class,
Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY);
- _slowConnectionOpenTicker = new SlowConnectionOpenTicker(maxAuthDelay);
- _aggregateTicker.addTicker(_slowConnectionOpenTicker);
+ SlowConnectionOpenTicker slowConnectionOpenTicker = new
SlowConnectionOpenTicker(maxAuthDelay);
+
_network.addSchedulingDelayNotificationListeners(slowConnectionOpenTicker);
+ _aggregateTicker.addTicker(slowConnectionOpenTicker);
_lastReadTime = _lastWriteTime = getCreatedTime();
}
@@ -676,21 +675,10 @@ public abstract class AbstractAMQPConnec
protected abstract EventLogger getEventLogger();
- @Override
- public void processingStarted(final long currentTime)
- {
- SlowConnectionOpenTicker ticker = _slowConnectionOpenTicker;
- long scheduledTime = _network.getScheduledTime();
- if (ticker != null && scheduledTime > 0)
- {
- ticker.addSchedulingDelay(currentTime - scheduledTime);
- }
- }
-
- private class SlowConnectionOpenTicker implements Ticker
+ private class SlowConnectionOpenTicker implements Ticker,
SchedulingDelayNotificationListener
{
private final long _allowedTime;
- private volatile long _accumulatedDelay;
+ private volatile long _accumulatedSchedulingDelay;
public SlowConnectionOpenTicker(long timeoutTime)
{
@@ -700,7 +688,8 @@ public abstract class AbstractAMQPConnec
@Override
public int getTimeToNextTick(final long currentTime)
{
- final int timeToNextTick = (int) (getCreatedTime() + _allowedTime
+ _accumulatedDelay - currentTime);
+ final int timeToNextTick = (int) (getCreatedTime() + _allowedTime
+ _accumulatedSchedulingDelay
+ - currentTime);
return timeToNextTick;
}
@@ -720,15 +709,19 @@ public abstract class AbstractAMQPConnec
else
{
_aggregateTicker.removeTicker(this);
- _slowConnectionOpenTicker = null;
+ _network.removeSchedulingDelayNotificationListeners(this);
}
}
return nextTick;
}
- public void addSchedulingDelay(final long delay)
+ @Override
+ public void notifySchedulingDelay(final long schedulingDelay)
{
- _accumulatedDelay += delay;
+ if (schedulingDelay > 0)
+ {
+ _accumulatedSchedulingDelay += schedulingDelay;
+ }
}
}
}
Modified:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java?rev=1736560&r1=1736559&r2=1736560&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/MultiVersionProtocolEngine.java
Fri Mar 25 11:13:08 2016
@@ -148,12 +148,6 @@ public class MultiVersionProtocolEngine
_delegate.setIOThread(ioThread);
}
- @Override
- public void processingStarted(final long currentTime)
- {
- _delegate.processingStarted(currentTime);
- }
-
public long getConnectionId()
{
return _id;
@@ -300,12 +294,6 @@ public class MultiVersionProtocolEngine
}
@Override
- public void processingStarted(final long currentTime)
- {
-
- }
-
- @Override
public void closed()
{
@@ -550,12 +538,6 @@ public class MultiVersionProtocolEngine
{
}
-
- @Override
- public void processingStarted(final long currentTime)
- {
-
- }
@Override
public Subject getSubject()
Modified:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1736560&r1=1736559&r2=1736560&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
Fri Mar 25 11:13:08 2016
@@ -101,6 +101,7 @@ public class NetworkConnectionScheduler
void processConnection(final NonBlockingConnection connection)
{
Thread.currentThread().setName(connection.getThreadName());
+ connection.doPreWork();
boolean rerun;
do
{
Modified:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1736560&r1=1736559&r2=1736560&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
Fri Mar 25 11:13:08 2016
@@ -27,8 +27,10 @@ import java.security.Principal;
import java.security.cert.Certificate;
import java.util.Collection;
import java.util.Iterator;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -73,6 +75,7 @@ public class NonBlockingConnection imple
private Iterator<Runnable> _pendingIterator;
private final AtomicLong _maxWriteIdleMillis = new AtomicLong();
private final AtomicLong _maxReadIdleMillis = new AtomicLong();
+ private final List<SchedulingDelayNotificationListener>
_schedulingDelayNotificationListeners = new CopyOnWriteArrayList<>();
public NonBlockingConnection(SocketChannel socketChannel,
ProtocolEngine protocolEngine,
@@ -234,6 +237,22 @@ public class NonBlockingConnection imple
return _protocolEngine.hasWork();
}
+ public void doPreWork()
+ {
+ if (!_closed.get())
+ {
+ long currentTime = System.currentTimeMillis();
+ long schedulingDelay = currentTime - getScheduledTime();
+ if (!_schedulingDelayNotificationListeners.isEmpty())
+ {
+ for (SchedulingDelayNotificationListener listener :
_schedulingDelayNotificationListeners)
+ {
+ listener.notifySchedulingDelay(schedulingDelay);
+ }
+ }
+ }
+ }
+
public boolean doWork()
{
_protocolEngine.clearWork();
@@ -242,13 +261,11 @@ public class NonBlockingConnection imple
try
{
long currentTime = System.currentTimeMillis();
- _protocolEngine.processingStarted(currentTime);
int tick = getTicker().getTimeToNextTick(currentTime);
if (tick <= 0)
{
getTicker().tick(currentTime);
}
- _scheduledTime = 0;
_protocolEngine.setIOThread(Thread.currentThread());
_protocolEngine.setMessageAssignmentSuspended(true, true);
@@ -312,6 +329,18 @@ public class NonBlockingConnection imple
}
+ @Override
+ public void addSchedulingDelayNotificationListeners(final
SchedulingDelayNotificationListener listener)
+ {
+ _schedulingDelayNotificationListeners.add(listener);
+ }
+
+ @Override
+ public void removeSchedulingDelayNotificationListeners(final
SchedulingDelayNotificationListener listener)
+ {
+ _schedulingDelayNotificationListeners.remove(listener);
+ }
+
private boolean processPending() throws IOException
{
if(_pendingIterator == null)
@@ -605,6 +634,7 @@ public class NonBlockingConnection imple
public void clearScheduled()
{
_scheduled.set(false);
+ _scheduledTime = 0;
}
@Override
Modified:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java?rev=1736560&r1=1736559&r2=1736560&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ProtocolEngine.java
Fri Mar 25 11:13:08 2016
@@ -75,6 +75,4 @@ public interface ProtocolEngine extends
void received(QpidByteBuffer msg);
void setIOThread(Thread ioThread);
-
- void processingStarted(long currentTime);
}
Modified:
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java?rev=1736560&r1=1736559&r2=1736560&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
(original)
+++
qpid/java/branches/6.0.x/broker-core/src/main/java/org/apache/qpid/server/transport/ServerNetworkConnection.java
Fri Mar 25 11:13:08 2016
@@ -28,4 +28,8 @@ public interface ServerNetworkConnection
String getTransportInfo();
long getScheduledTime();
+
+ void
addSchedulingDelayNotificationListeners(SchedulingDelayNotificationListener
listener);
+
+ void
removeSchedulingDelayNotificationListeners(SchedulingDelayNotificationListener
listener);
}
Modified:
qpid/java/branches/6.0.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java?rev=1736560&r1=1736559&r2=1736560&view=diff
==============================================================================
---
qpid/java/branches/6.0.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
(original)
+++
qpid/java/branches/6.0.x/broker-plugins/websocket/src/main/java/org/apache/qpid/server/transport/websocket/WebSocketProvider.java
Fri Mar 25 11:13:08 2016
@@ -61,6 +61,7 @@ import org.apache.qpid.server.model.port
import org.apache.qpid.server.transport.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.transport.AcceptingTransport;
import org.apache.qpid.server.transport.ProtocolEngine;
+import org.apache.qpid.server.transport.SchedulingDelayNotificationListener;
import org.apache.qpid.server.transport.ServerNetworkConnection;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
@@ -431,6 +432,16 @@ class WebSocketProvider implements Accep
}
@Override
+ public void addSchedulingDelayNotificationListeners(final
SchedulingDelayNotificationListener listener)
+ {
+ }
+
+ @Override
+ public void removeSchedulingDelayNotificationListeners(final
SchedulingDelayNotificationListener listener)
+ {
+ }
+
+ @Override
public void reserveOutboundMessageSpace(final long size)
{
if (_usedOutboundMessageSpace.addAndGet(size) >
_outboundMessageBufferLimit)
Modified: qpid/java/branches/6.0.x/test-profiles/CPPExcludes
URL:
http://svn.apache.org/viewvc/qpid/java/branches/6.0.x/test-profiles/CPPExcludes?rev=1736560&r1=1736559&r2=1736560&view=diff
==============================================================================
--- qpid/java/branches/6.0.x/test-profiles/CPPExcludes (original)
+++ qpid/java/branches/6.0.x/test-profiles/CPPExcludes Fri Mar 25 11:13:08 2016
@@ -205,6 +205,7 @@ org.apache.qpid.transport.MaxFrameSizeTe
// CPP Broker does not timeout connections with no activity like the Java
Broker
org.apache.qpid.transport.ProtocolNegotiationTest#testNoProtocolHeaderSent_BrokerClosesConnection
+org.apache.qpid.transport.ProtocolNegotiationTest#testNoConnectionOpenSent_BrokerClosesConnection
// QPID-6000 : Tests Java specific message compression functionality, and uses
the REST API to test it
org.apache.qpid.systest.MessageCompressionTest#*
Propchange: qpid/java/branches/6.0.x/test-profiles/CPPExcludes
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Fri Mar 25 11:13:08 2016
@@ -6,4 +6,4 @@
/qpid/branches/java-broker-vhost-refactor/java/test-profiles/CPPExcludes:1493674-1494547
/qpid/branches/java-network-refactor/qpid/java/test-profiles/010Excludes:805429-821809
/qpid/branches/qpid-2935/qpid/java/test-profiles/CPPExcludes:1061302-1072333
-/qpid/java/trunk/test-profiles/CPPExcludes:1715446
+/qpid/java/trunk/test-profiles/CPPExcludes:1715446,1732812
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]