Author: rgodfrey
Date: Wed Nov 11 19:41:09 2015
New Revision: 1713921
URL: http://svn.apache.org/viewvc?rev=1713921&view=rev
Log:
QPID-6794 : Update running count for all tasks executed by the network
connection scheduler
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1713921&r1=1713920&r2=1713921&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
Wed Nov 11 19:41:09 2015
@@ -103,7 +103,7 @@ public class NetworkConnectionScheduler
Thread.currentThread().setName( connection.getThreadName() );
try
{
- _running.incrementAndGet();
+ incrementRunningCount();
boolean rerun;
do
{
@@ -149,11 +149,21 @@ public class NetworkConnectionScheduler
}
finally
{
- _running.decrementAndGet();
+ decrementRunningCount();
}
}
+ void decrementRunningCount()
+ {
+ _running.decrementAndGet();
+ }
+
+ void incrementRunningCount()
+ {
+ _running.incrementAndGet();
+ }
+
public void close()
{
if(_selectorThread != null)
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1713921&r1=1713920&r2=1713921&view=diff
==============================================================================
---
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
(original)
+++
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
Wed Nov 11 19:41:09 2015
@@ -186,6 +186,7 @@ class SelectorThread extends Thread
{
try
{
+ _scheduler.incrementRunningCount();
transport.acceptSocketChannel(channel);
}
finally
@@ -200,6 +201,10 @@ class SelectorThread extends Thread
LOGGER.error("Failed to register selector
on accepting port {}",
localSocketAddress, e);
}
+ finally
+ {
+ _scheduler.decrementRunningCount();
+ }
}
}
});
@@ -260,85 +265,93 @@ class SelectorThread extends Thread
private void performSelect()
{
- while(!_closed.get())
+ _scheduler.incrementRunningCount();
+ try
{
- if(acquireSelecting())
+ while (!_closed.get())
{
- List<ConnectionProcessor> connections = new ArrayList<>();
- try
+ if (acquireSelecting())
{
- if (!_closed.get())
+ List<ConnectionProcessor> connections = new
ArrayList<>();
+ try
{
- Thread.currentThread().setName("Selector-" +
_scheduler.getName());
- _inSelect.set(true);
- try
- {
- if(_wakeups.getAndSet(0) > 0)
- {
- _selector.selectNow();
- }
- else
- {
- _selector.select(_nextTimeout);
- }
- }
- catch (IOException e)
- {
- // TODO Inform the model object
- LOGGER.error("Failed to trying to select()",
e);
- closeSelector();
- return;
- }
- finally
- {
- _inSelect.set(false);
- }
- runTasks();
- for (NonBlockingConnection connection :
processSelectionKeys())
- {
- if(connection.setScheduled())
- {
- connections.add(new
ConnectionProcessor(_scheduler, connection));
- }
- }
- for (NonBlockingConnection connection :
reregisterUnregisteredConnections())
+ if (!_closed.get())
{
- if(connection.setScheduled())
- {
- connections.add(new
ConnectionProcessor(_scheduler, connection));
- }
- }
- for (NonBlockingConnection connection :
processUnscheduledConnections())
- {
- if(connection.setScheduled())
+ Thread.currentThread().setName("Selector-" +
_scheduler.getName());
+ _inSelect.set(true);
+ try
{
- connections.add(new
ConnectionProcessor(_scheduler, connection));
+ if (_wakeups.getAndSet(0) > 0)
+ {
+ _selector.selectNow();
+ }
+ else
+ {
+ _selector.select(_nextTimeout);
+ }
+ }
+ catch (IOException e)
+ {
+ // TODO Inform the model object
+ LOGGER.error("Failed to trying to
select()", e);
+ closeSelector();
+ return;
+ }
+ finally
+ {
+ _inSelect.set(false);
+ }
+ runTasks();
+ for (NonBlockingConnection connection :
processSelectionKeys())
+ {
+ if (connection.setScheduled())
+ {
+ connections.add(new
ConnectionProcessor(_scheduler, connection));
+ }
+ }
+ for (NonBlockingConnection connection :
reregisterUnregisteredConnections())
+ {
+ if (connection.setScheduled())
+ {
+ connections.add(new
ConnectionProcessor(_scheduler, connection));
+ }
+ }
+ for (NonBlockingConnection connection :
processUnscheduledConnections())
+ {
+ if (connection.setScheduled())
+ {
+ connections.add(new
ConnectionProcessor(_scheduler, connection));
+ }
}
- }
+ }
}
+ finally
+ {
+ clearSelecting();
+ }
+ _workQueue.addAll(connections);
+ _workQueue.add(this);
+ for (ConnectionProcessor connectionProcessor :
connections)
+ {
+ connectionProcessor.run();
+ }
+
}
- finally
- {
- clearSelecting();
- }
- _workQueue.add(this);
- _workQueue.addAll(connections);
- for(ConnectionProcessor connectionProcessor : connections)
+ else
{
- connectionProcessor.run();
+ break;
}
-
}
- else
+
+ if (_closed.get() && acquireSelecting())
{
- break;
+ closeSelector();
}
}
-
- if(_closed.get() && acquireSelecting())
+ finally
{
- closeSelector();
+ _scheduler.decrementRunningCount();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]