Author: rgodfrey
Date: Wed Nov 11 19:41:09 2015
New Revision: 1713921

URL: http://svn.apache.org/viewvc?rev=1713921&view=rev
Log:
QPID-6794 : Update running count for all tasks executed by the network 
connection scheduler

Modified:
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
    
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java?rev=1713921&r1=1713920&r2=1713921&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/NetworkConnectionScheduler.java
 Wed Nov 11 19:41:09 2015
@@ -103,7 +103,7 @@ public class NetworkConnectionScheduler
         Thread.currentThread().setName( connection.getThreadName() );
         try
         {
-            _running.incrementAndGet();
+            incrementRunningCount();
             boolean rerun;
             do
             {
@@ -149,11 +149,21 @@ public class NetworkConnectionScheduler
         }
         finally
         {
-            _running.decrementAndGet();
+            decrementRunningCount();
         }
 
     }
 
+    void decrementRunningCount()
+    {
+        _running.decrementAndGet();
+    }
+
+    void incrementRunningCount()
+    {
+        _running.incrementAndGet();
+    }
+
     public void close()
     {
         if(_selectorThread != null)

Modified: 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java?rev=1713921&r1=1713920&r2=1713921&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
 (original)
+++ 
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/transport/SelectorThread.java
 Wed Nov 11 19:41:09 2015
@@ -186,6 +186,7 @@ class SelectorThread extends Thread
                         {
                             try
                             {
+                                _scheduler.incrementRunningCount();
                                 transport.acceptSocketChannel(channel);
                             }
                             finally
@@ -200,6 +201,10 @@ class SelectorThread extends Thread
                                     LOGGER.error("Failed to register selector 
on accepting port {}",
                                                  localSocketAddress, e);
                                 }
+                                finally
+                                {
+                                    _scheduler.decrementRunningCount();
+                                }
                             }
                         }
                     });
@@ -260,85 +265,93 @@ class SelectorThread extends Thread
 
         private void performSelect()
         {
-            while(!_closed.get())
+            _scheduler.incrementRunningCount();
+            try
             {
-                if(acquireSelecting())
+                while (!_closed.get())
                 {
-                    List<ConnectionProcessor> connections = new ArrayList<>();
-                    try
+                    if (acquireSelecting())
                     {
-                        if (!_closed.get())
+                        List<ConnectionProcessor> connections = new 
ArrayList<>();
+                        try
                         {
-                            Thread.currentThread().setName("Selector-" + 
_scheduler.getName());
-                            _inSelect.set(true);
-                            try
-                            {
-                                if(_wakeups.getAndSet(0) > 0)
-                                {
-                                    _selector.selectNow();
-                                }
-                                else
-                                {
-                                    _selector.select(_nextTimeout);
-                                }
-                            }
-                            catch (IOException e)
-                            {
-                                // TODO Inform the model object
-                                LOGGER.error("Failed to trying to select()", 
e);
-                                closeSelector();
-                                return;
-                            }
-                            finally
-                            {
-                                _inSelect.set(false);
-                            }
-                            runTasks();
-                            for (NonBlockingConnection connection : 
processSelectionKeys())
-                            {
-                                if(connection.setScheduled())
-                                {
-                                    connections.add(new 
ConnectionProcessor(_scheduler, connection));
-                                }
-                            }
-                            for (NonBlockingConnection connection : 
reregisterUnregisteredConnections())
+                            if (!_closed.get())
                             {
-                                if(connection.setScheduled())
-                                {
-                                    connections.add(new 
ConnectionProcessor(_scheduler, connection));
-                                }
-                            }
-                            for (NonBlockingConnection connection : 
processUnscheduledConnections())
-                            {
-                                if(connection.setScheduled())
+                                Thread.currentThread().setName("Selector-" + 
_scheduler.getName());
+                                _inSelect.set(true);
+                                try
                                 {
-                                    connections.add(new 
ConnectionProcessor(_scheduler, connection));
+                                    if (_wakeups.getAndSet(0) > 0)
+                                    {
+                                        _selector.selectNow();
+                                    }
+                                    else
+                                    {
+                                        _selector.select(_nextTimeout);
+                                    }
+                                }
+                                catch (IOException e)
+                                {
+                                    // TODO Inform the model object
+                                    LOGGER.error("Failed to trying to 
select()", e);
+                                    closeSelector();
+                                    return;
+                                }
+                                finally
+                                {
+                                    _inSelect.set(false);
+                                }
+                                runTasks();
+                                for (NonBlockingConnection connection : 
processSelectionKeys())
+                                {
+                                    if (connection.setScheduled())
+                                    {
+                                        connections.add(new 
ConnectionProcessor(_scheduler, connection));
+                                    }
+                                }
+                                for (NonBlockingConnection connection : 
reregisterUnregisteredConnections())
+                                {
+                                    if (connection.setScheduled())
+                                    {
+                                        connections.add(new 
ConnectionProcessor(_scheduler, connection));
+                                    }
+                                }
+                                for (NonBlockingConnection connection : 
processUnscheduledConnections())
+                                {
+                                    if (connection.setScheduled())
+                                    {
+                                        connections.add(new 
ConnectionProcessor(_scheduler, connection));
+                                    }
                                 }
-                            }
 
+                            }
                         }
+                        finally
+                        {
+                            clearSelecting();
+                        }
+                        _workQueue.addAll(connections);
+                        _workQueue.add(this);
+                        for (ConnectionProcessor connectionProcessor : 
connections)
+                        {
+                            connectionProcessor.run();
+                        }
+
                     }
-                    finally
-                    {
-                        clearSelecting();
-                    }
-                    _workQueue.add(this);
-                    _workQueue.addAll(connections);
-                    for(ConnectionProcessor connectionProcessor : connections)
+                    else
                     {
-                        connectionProcessor.run();
+                        break;
                     }
-
                 }
-                else
+
+                if (_closed.get() && acquireSelecting())
                 {
-                    break;
+                    closeSelector();
                 }
             }
-
-            if(_closed.get() && acquireSelecting())
+            finally
             {
-                closeSelector();
+                _scheduler.decrementRunningCount();
             }
         }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to