Author: kwall
Date: Mon Nov 14 11:23:36 2016
New Revision: 1769597
URL: http://svn.apache.org/viewvc?rev=1769597&view=rev
Log:
QPID-7508: [Java Broker] Ensure that returning connection to the selectors pays
attention to a change in ticker state
Also prevents unnecessary spinning of the selector
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AggregateTicker.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AggregateTicker.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AggregateTicker.java?rev=1769597&r1=1769596&r2=1769597&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AggregateTicker.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/AggregateTicker.java
Mon Nov 14 11:23:36 2016
@@ -20,6 +20,7 @@
package org.apache.qpid.server.transport;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.qpid.transport.network.Ticker;
@@ -27,6 +28,7 @@ public class AggregateTicker implements
{
private final CopyOnWriteArrayList<Ticker> _tickers = new
CopyOnWriteArrayList<>();
+ private final AtomicBoolean _modified = new AtomicBoolean();
@Override
public int getTimeToNextTick(final long currentTime)
@@ -58,19 +60,26 @@ public class AggregateTicker implements
return nextTick;
}
- public CopyOnWriteArrayList<Ticker> getTickers()
- {
- return _tickers;
- }
-
public void addTicker(Ticker ticker)
{
_tickers.add(ticker);
+ _modified.set(true);
}
public void removeTicker(Ticker ticker)
{
_tickers.remove(ticker);
+ _modified.set(true);
+ }
+
+ public boolean getModified()
+ {
+ return _modified.get();
+ }
+
+ public void resetModified()
+ {
+ _modified.set(false);
}
@Override
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java?rev=1769597&r1=1769596&r2=1769597&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NonBlockingConnection.java
Mon Nov 14 11:23:36 2016
@@ -133,7 +133,7 @@ public class NonBlockingConnection imple
return _partialRead;
}
- Ticker getTicker()
+ AggregateTicker getTicker()
{
return _protocolEngine.getAggregateTicker();
}
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1769597&r1=1769596&r2=1769597&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
Mon Nov 14 11:23:36 2016
@@ -138,7 +138,9 @@ class SelectorThread extends Thread
{
NonBlockingConnection connection = iterator.next();
- int period =
connection.getTicker().getTimeToNextTick(currentTime);
+ final AggregateTicker ticker = connection.getTicker();
+ int period = ticker.getTimeToNextTick(currentTime);
+ ticker.resetModified();
if (period <= 0 || connection.isStateChanged())
{
@@ -616,13 +618,14 @@ class SelectorThread extends Thread
public void returnConnectionToSelector(final NonBlockingConnection
connection)
{
- if(selectionInterestRequiresUpdate(connection))
+ SelectionTask selectionTask = connection.getSelectionTask();
+ if(selectionTask == null)
+ {
+ throw new IllegalStateException("returnConnectionToSelector should
only be called with connections that are currently assigned a selector task");
+ }
+
+ if (selectionInterestRequiresUpdate(connection) ||
connection.getTicker().getModified())
{
- SelectionTask selectionTask = connection.getSelectionTask();
- if(selectionTask == null)
- {
- throw new IllegalStateException("returnConnectionToSelector
should only be called with connections that are currently assigned a selector
task");
- }
selectionTask.getUnregisteredConnections().add(connection);
selectionTask.wakeup();
}
@@ -698,10 +701,5 @@ class SelectorThread extends Thread
{
_workQueue.add(new ConnectionProcessor(_scheduler, connection));
}
- SelectionTask selectionTask = connection.getSelectionTask();
- if (selectionTask != null)
- {
- selectionTask.wakeup();
- }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]