Repository: activemq
Updated Branches:
  refs/heads/trunk d43e70956 -> 210e39df8


https://issues.apache.org/jira/browse/AMQ-5144

Fix issue where the writer would go into a cycle of adding delay for
writes of large messages where each packet ended up as the same size.  

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/210e39df
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/210e39df
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/210e39df

Branch: refs/heads/trunk
Commit: 210e39df8b553f1b2f5e437ae49f7418e862c1d4
Parents: d43e709
Author: Timothy Bish <[email protected]>
Authored: Thu May 8 12:56:54 2014 -0400
Committer: Timothy Bish <[email protected]>
Committed: Thu May 8 12:56:54 2014 -0400

----------------------------------------------------------------------
 .../activemq/transport/nio/NIOOutputStream.java | 77 ++++++++++++--------
 1 file changed, 45 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/210e39df/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
----------------------------------------------------------------------
diff --git 
a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
 
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
index 643416f..621e956 100644
--- 
a/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
+++ 
b/activemq-client/src/main/java/org/apache/activemq/transport/nio/NIOOutputStream.java
@@ -28,11 +28,11 @@ import javax.net.ssl.SSLEngine;
 import org.apache.activemq.transport.tcp.TimeStampStream;
 
 /**
- * An optimized buffered outputstream for Tcp
+ * An optimized buffered OutputStream for TCP/IP
  */
 public class NIOOutputStream extends OutputStream implements TimeStampStream {
 
-    private static final int BUFFER_SIZE = 8192;
+    private static final int BUFFER_SIZE = 8196;
 
     private final WritableByteChannel out;
     private final byte[] buffer;
@@ -40,7 +40,7 @@ public class NIOOutputStream extends OutputStream implements 
TimeStampStream {
 
     private int count;
     private boolean closed;
-    private volatile long writeTimestamp = -1;//concurrent reads of this value
+    private volatile long writeTimestamp = -1; // concurrent reads of this 
value
 
     private SSLEngine engine;
 
@@ -48,6 +48,7 @@ public class NIOOutputStream extends OutputStream implements 
TimeStampStream {
      * Constructor
      *
      * @param out
+     *        the channel to write data to.
      */
     public NIOOutputStream(WritableByteChannel out) {
         this(out, BUFFER_SIZE);
@@ -57,8 +58,11 @@ public class NIOOutputStream extends OutputStream implements 
TimeStampStream {
      * Creates a new buffered output stream to write data to the specified
      * underlying output stream with the specified buffer size.
      *
-     * @param out the underlying output stream.
-     * @param size the buffer size.
+     * @param out
+     *        the underlying output stream.
+     * @param size
+     *        the buffer size.
+     *
      * @throws IllegalArgumentException if size <= 0.
      */
     public NIOOutputStream(WritableByteChannel out, int size) {
@@ -73,8 +77,10 @@ public class NIOOutputStream extends OutputStream implements 
TimeStampStream {
     /**
      * write a byte on to the stream
      *
-     * @param b - byte to write
-     * @throws IOException
+     * @param b
+     *        byte to write
+     *
+     * @throws IOException if an error occurs while writing the data.
      */
     @Override
     public void write(int b) throws IOException {
@@ -82,16 +88,20 @@ public class NIOOutputStream extends OutputStream 
implements TimeStampStream {
         if (availableBufferToWrite() < 1) {
             flush();
         }
-        buffer[count++] = (byte)b;
+        buffer[count++] = (byte) b;
     }
 
     /**
      * write a byte array to the stream
      *
-     * @param b the byte buffer
-     * @param off the offset into the buffer
-     * @param len the length of data to write
-     * @throws IOException
+     * @param b
+     *        the byte buffer
+     * @param off
+     *        the offset into the buffer
+     * @param len
+     *        the length of data to write
+     *
+     * @throws IOException if an error occurs while writing the data.
      */
     @Override
     public void write(byte b[], int off, int len) throws IOException {
@@ -109,10 +119,10 @@ public class NIOOutputStream extends OutputStream 
implements TimeStampStream {
 
     /**
      * flush the data to the output stream This doesn't call flush on the
-     * underlying outputstream, because Tcp is particularly efficent at doing
+     * underlying OutputStream, because TCP/IP is particularly efficient at 
doing
      * this itself ....
      *
-     * @throws IOException
+     * @throws IOException if an error occurs while writing the data.
      */
     @Override
     public void flush() throws IOException {
@@ -163,22 +173,22 @@ public class NIOOutputStream extends OutputStream 
implements TimeStampStream {
             plain.clear();
             engine.wrap(data, plain);
             plain.flip();
-        }  else {
+        } else {
             plain = data;
         }
 
         int remaining = plain.remaining();
-        int lastRemaining = remaining - 1;
         long delay = 1;
+        int lastWriteSize = -1;
         try {
             writeTimestamp = System.currentTimeMillis();
             while (remaining > 0) {
 
-                // We may need to do a little bit of sleeping to avoid a busy 
loop.
-                // Slow down if no data was written out..
-                if (remaining == lastRemaining) {
+                // We may need to do a little bit of sleeping to avoid a busy
+                // loop. Slow down if no data was written out..
+                if (lastWriteSize == 0) {
                     try {
-                        // Use exponential rollback to increase sleep time.
+                        // Use exponential growth to increase sleep time.
                         Thread.sleep(delay);
                         delay *= 2;
                         if (delay > 1000) {
@@ -190,16 +200,15 @@ public class NIOOutputStream extends OutputStream 
implements TimeStampStream {
                 } else {
                     delay = 1;
                 }
-                lastRemaining = remaining;
 
-                // Since the write is non-blocking, all the data may not have 
been
-                // written.
-                out.write(plain);
+                // Since the write is non-blocking, all the data may not have
+                // been written.
+                lastWriteSize = out.write(plain);
 
-                // if the data buffer was larger than the packet buffer we 
might need to
-                // wrap more packets until we reach the end of data, but only 
when plain
-                // has no more space since we are non-blocking and a write 
might not have
-                // written anything.
+                // if the data buffer was larger than the packet buffer we 
might
+                // need to wrap more packets until we reach the end of data, 
but only
+                // when plain has no more space since we are non-blocking and 
a write
+                // might not have written anything.
                 if (engine != null && data.hasRemaining() && 
!plain.hasRemaining()) {
                     plain.clear();
                     engine.wrap(data, plain);
@@ -213,8 +222,9 @@ public class NIOOutputStream extends OutputStream 
implements TimeStampStream {
         }
     }
 
-
-    /* (non-Javadoc)
+    /*
+     * (non-Javadoc)
+     *
      * @see org.apache.activemq.transport.tcp.TimeStampStream#isWriting()
      */
     @Override
@@ -222,8 +232,11 @@ public class NIOOutputStream extends OutputStream 
implements TimeStampStream {
         return writeTimestamp > 0;
     }
 
-    /* (non-Javadoc)
-     * @see 
org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp()
+    /*
+     * (non-Javadoc)
+     *
+     * @see
+     * org.apache.activemq.transport.tcp.TimeStampStream#getWriteTimestamp()
      */
     @Override
     public long getWriteTimestamp() {

Reply via email to