Author: kwall
Date: Tue Dec  9 17:36:33 2014
New Revision: 1644128

URL: http://svn.apache.org/r1644128
Log:
Stop reading bytes from the wire once the transport is closed.  Also prevent 
further ByteBuffers being queued to be sent once the transport is closed too

Modified:
    
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java

Modified: 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java?rev=1644128&r1=1644127&r2=1644128&view=diff
==============================================================================
--- 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
 (original)
+++ 
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
 Tue Dec  9 17:36:33 2014
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.thread.Threading;
 import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderClosedException;
 import org.apache.qpid.transport.SenderException;
 import org.apache.qpid.transport.network.Ticker;
 
@@ -104,6 +105,10 @@ public class NonBlockingSenderReceiver
     @Override
     public void send(final ByteBuffer msg)
     {
+        if (_closed.get())
+        {
+            throw new SenderClosedException("I/O for thread " + 
_remoteSocketAddress + " is already closed");
+        }
         // append to list and do selector wakeup
         _buffers.add(msg);
         _selector.wakeup();
@@ -223,8 +228,8 @@ public class NonBlockingSenderReceiver
     private void doRead() throws IOException
     {
 
-        int remaining;
-        do
+        int remaining = 0;
+        while (remaining == 0 && !_closed.get())
         {
             if(_currentBuffer == null || _currentBuffer.remaining() == 0)
             {
@@ -241,7 +246,5 @@ public class NonBlockingSenderReceiver
             _currentBuffer = _currentBuffer.slice();
             _receiver.received(dup);
         }
-        while (remaining == 0);
-
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to