Author: kwall
Date: Tue Dec 9 17:36:33 2014
New Revision: 1644128
URL: http://svn.apache.org/r1644128
Log:
Stop reading bytes from the wire once the transport is closed. Also prevent
further ByteBuffers being queued to be sent once the transport is closed too
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
Modified:
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
URL:
http://svn.apache.org/viewvc/qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java?rev=1644128&r1=1644127&r2=1644128&view=diff
==============================================================================
---
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
(original)
+++
qpid/branches/QPID-6262-JavaBrokerNIO/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/NonBlockingSenderReceiver.java
Tue Dec 9 17:36:33 2014
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.SenderClosedException;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.network.Ticker;
@@ -104,6 +105,10 @@ public class NonBlockingSenderReceiver
@Override
public void send(final ByteBuffer msg)
{
+ if (_closed.get())
+ {
+ throw new SenderClosedException("I/O for thread " +
_remoteSocketAddress + " is already closed");
+ }
// append to list and do selector wakeup
_buffers.add(msg);
_selector.wakeup();
@@ -223,8 +228,8 @@ public class NonBlockingSenderReceiver
private void doRead() throws IOException
{
- int remaining;
- do
+ int remaining = 0;
+ while (remaining == 0 && !_closed.get())
{
if(_currentBuffer == null || _currentBuffer.remaining() == 0)
{
@@ -241,7 +246,5 @@ public class NonBlockingSenderReceiver
_currentBuffer = _currentBuffer.slice();
_receiver.received(dup);
}
- while (remaining == 0);
-
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]