Author: rgodfrey
Date: Thu Jan 29 19:28:45 2015
New Revision: 1655825

URL: http://svn.apache.org/r1655825
Log:
Correctly deregister and also optimise case where work can be rescheduled on 
same thread

Modified:
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java?rev=1655825&r1=1655824&r2=1655825&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
 Thu Jan 29 19:28:45 2015
@@ -165,11 +165,11 @@ public class NonBlockingSenderReceiver
                 }
 
                 _receiver.setTransportBlockedForWriting(!doWrite());
-                doRead();
+                boolean dataRead = doRead();
                 _fullyWritten = doWrite();
                 _receiver.setTransportBlockedForWriting(!_fullyWritten);
 
-                if(_workDone && _netInputBuffer != null && 
_netInputBuffer.position() != 0)
+                if(dataRead || (_workDone && _netInputBuffer != null && 
_netInputBuffer.position() != 0))
                 {
                     _stateChanged.set(true);
                 }
@@ -202,64 +202,6 @@ public class NonBlockingSenderReceiver
 
     }
 
-
-/*   public void run()
-    {
-        LOGGER.debug("I/O for thread " + _remoteSocketAddress + " started");
-
-
-        while (!_closed.get())
-        {
-
-            try
-            {
-                long currentTime = System.currentTimeMillis();
-                int tick = _ticker.getTimeToNextTick(currentTime);
-                if(tick <= 0)
-                {
-                    tick = _ticker.tick(currentTime);
-                }
-
-                _selector.select(tick <= 0 ? 1 : tick);
-                Set<SelectionKey> selectionKeys = _selector.selectedKeys();
-                selectionKeys.clear();
-
-                _receiver.setTransportBlockedForWriting(!doWrite());
-                doRead();
-                _fullyWritten = doWrite();
-                _receiver.setTransportBlockedForWriting(!_fullyWritten);
-
-                _socketChannel.register(_selector,
-                                        _fullyWritten
-                                                ? SelectionKey.OP_READ
-                                                : (SelectionKey.OP_WRITE | 
SelectionKey.OP_READ));
-
-            }
-            catch (IOException e)
-            {
-                LOGGER.info("Exception performing I/O for thread '" + 
_remoteSocketAddress + "': " + e);
-                close();
-            }
-        }
-
-        try(Selector selector = _selector; SocketChannel channel = 
_socketChannel)
-        {
-            while(!doWrite())
-            {
-            }
-
-            _receiver.closed();
-        }
-        catch (IOException e)
-        {
-            LOGGER.info("Exception performing final write/close for thread '" 
+ _remoteSocketAddress + "': " + e);
-        }
-        finally
-        {
-            LOGGER.debug("Shutting down IO thread for " + 
_remoteSocketAddress);
-        }
-    }*/
-
     @Override
     public void flush()
     {
@@ -368,9 +310,9 @@ public class NonBlockingSenderReceiver
         }
     }
 
-    private void doRead() throws IOException
+    private boolean doRead() throws IOException
     {
-
+        boolean readData = false;
         if(_transportEncryption == TransportEncryption.NONE)
         {
             int remaining = 0;
@@ -381,6 +323,10 @@ public class NonBlockingSenderReceiver
                     _currentBuffer = ByteBuffer.allocate(_receiveBufSize);
                 }
                 int read = _socketChannel.read(_currentBuffer);
+                if(read > 0)
+                {
+                    readData = true;
+                }
                 if (read == -1)
                 {
                     _closed.set(true);
@@ -406,7 +352,10 @@ public class NonBlockingSenderReceiver
                 {
                     _closed.set(true);
                 }
-
+                else if(read > 0)
+                {
+                    readData = true;
+                }
                 if (LOGGER.isDebugEnabled())
                 {
                     LOGGER.debug("Read " + read + " encrypted bytes ");
@@ -427,6 +376,10 @@ public class NonBlockingSenderReceiver
 
                     appInputBuffer.flip();
                     unwrapped = appInputBuffer.remaining();
+                    if(unwrapped > 0)
+                    {
+                        readData = true;
+                    }
                     _receiver.received(appInputBuffer);
                 }
                 while(unwrapped > 0 || tasksRun);
@@ -470,12 +423,13 @@ public class NonBlockingSenderReceiver
                     {
                         _onTransportEncryptionAction.run();
                         _netInputBuffer.compact();
-                        doRead();
+                        readData = doRead();
                     }
                     break;
                 }
             }
         }
+        return readData;
     }
 
     private boolean runSSLEngineTasks(final SSLEngineResult status)

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java?rev=1655825&r1=1655824&r2=1655825&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/SelectorThread.java
 Thu Jan 29 19:28:45 2015
@@ -31,6 +31,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
 * Created by keith on 28/01/2015.
@@ -66,70 +67,64 @@ public class SelectorThread extends Thre
 
         try
         {
-            try
+            while (!_closed.get())
             {
-                while (!_closed.get())
-                {
 
-                    _selector.select(nextTimeout);
+                _selector.select(nextTimeout);
 
-                    List<NonBlockingConnection> toBeScheduled = new 
ArrayList<>();
+                List<NonBlockingConnection> toBeScheduled = new ArrayList<>();
 
 
-                    Set<SelectionKey> selectionKeys = _selector.selectedKeys();
-                    for (SelectionKey key : selectionKeys)
-                    {
-                        NonBlockingConnection connection = 
(NonBlockingConnection) key.attachment();
+                Set<SelectionKey> selectionKeys = _selector.selectedKeys();
+                for (SelectionKey key : selectionKeys)
+                {
+                    NonBlockingConnection connection = (NonBlockingConnection) 
key.attachment();
 
-                        key.channel().register(_selector, 0);
+                    key.channel().register(_selector, 0);
 
-                        toBeScheduled.add(connection);
-                        _unscheduledConnections.remove(connection);
+                    toBeScheduled.add(connection);
+                    _unscheduledConnections.remove(connection);
 
-                    }
-                    selectionKeys.clear();
+                }
+                selectionKeys.clear();
 
-                    while (_unregisteredConnections.peek() != null)
-                    {
-                        NonBlockingConnection unregisteredConnection = 
_unregisteredConnections.poll();
-                        _unscheduledConnections.add(unregisteredConnection);
+                while (_unregisteredConnections.peek() != null)
+                {
+                    NonBlockingConnection unregisteredConnection = 
_unregisteredConnections.poll();
+                    _unscheduledConnections.add(unregisteredConnection);
 
 
-                        final int ops = (unregisteredConnection.canRead() ? 
SelectionKey.OP_READ : 0)
-                                        | 
(unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
-                        
unregisteredConnection.getSocketChannel().register(_selector, ops, 
unregisteredConnection);
+                    final int ops = (unregisteredConnection.canRead() ? 
SelectionKey.OP_READ : 0)
+                                    | 
(unregisteredConnection.waitingForWrite() ? SelectionKey.OP_WRITE : 0);
+                    
unregisteredConnection.getSocketChannel().register(_selector, ops, 
unregisteredConnection);
 
-                    }
+                }
 
-                    long currentTime = System.currentTimeMillis();
-                    Iterator<NonBlockingConnection> iterator = 
_unscheduledConnections.iterator();
-                    nextTimeout = Integer.MAX_VALUE;
-                    while (iterator.hasNext())
-                    {
-                        NonBlockingConnection connection = iterator.next();
+                long currentTime = System.currentTimeMillis();
+                Iterator<NonBlockingConnection> iterator = 
_unscheduledConnections.iterator();
+                nextTimeout = Integer.MAX_VALUE;
+                while (iterator.hasNext())
+                {
+                    NonBlockingConnection connection = iterator.next();
 
-                        int period = 
connection.getTicker().getTimeToNextTick(currentTime);
-                        if (period < 0 || connection.isStateChanged())
-                        {
-                            toBeScheduled.add(connection);
-                            iterator.remove();
-                        }
-                        else
-                        {
-                            nextTimeout = Math.min(period, nextTimeout);
-                        }
+                    int period = 
connection.getTicker().getTimeToNextTick(currentTime);
+                    if (period < 0 || connection.isStateChanged())
+                    {
+                        toBeScheduled.add(connection);
+                        connection.getSocketChannel().register(_selector, 
0).cancel();
+                        iterator.remove();
                     }
-
-                    for (NonBlockingConnection connection : toBeScheduled)
+                    else
                     {
-                        _scheduler.schedule(connection);
+                        nextTimeout = Math.min(period, nextTimeout);
                     }
+                }
 
+                for (NonBlockingConnection connection : toBeScheduled)
+                {
+                    _scheduler.schedule(connection);
                 }
-            }
-            finally
-            {
-                _selector.close();
+
             }
         }
         catch (IOException e)
@@ -137,6 +132,18 @@ public class SelectorThread extends Thre
             //TODO
             e.printStackTrace();
         }
+        finally
+        {
+            try
+            {
+                _selector.close();
+            }
+            catch (IOException e)
+            {
+                e.printStackTrace();
+            }
+        }
+
 
 
 
@@ -164,26 +171,52 @@ public class SelectorThread extends Thre
     private class NetworkConnectionScheduler
     {
         private final ScheduledThreadPoolExecutor _executor;
+        private final AtomicInteger _running = new AtomicInteger();
+        private final int _poolSize;
 
         private NetworkConnectionScheduler()
         {
-            _executor = new 
ScheduledThreadPoolExecutor(Runtime.getRuntime().availableProcessors());
+            _poolSize = Runtime.getRuntime().availableProcessors();
+            _executor = new ScheduledThreadPoolExecutor(_poolSize);
+            _executor.prestartAllCoreThreads();
         }
 
         public void processConnection(final NonBlockingConnection connection)
         {
-            boolean closed = connection.doWork();
-
-            if (!closed)
+            try
             {
-                if (connection.isStateChanged())
+                _running.incrementAndGet();
+                boolean rerun;
+                do
                 {
-                    schedule(connection);
-                }
-                else
-                {
-                    SelectorThread.this.addConnection(connection);
-                }
+                    rerun = false;
+                    boolean closed = connection.doWork();
+
+                    if (!closed)
+                    {
+
+                        if (connection.isStateChanged())
+                        {
+                            if (_running.get() == _poolSize)
+                            {
+                                schedule(connection);
+                            }
+                            else
+                            {
+                                rerun = true;
+                            }
+                        }
+                        else
+                        {
+                            SelectorThread.this.addConnection(connection);
+                        }
+                    }
+
+                } while (rerun);
+            }
+            finally
+            {
+                _running.decrementAndGet();
             }
         }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to