Author: markt
Date: Thu Jan  8 13:10:46 2015
New Revision: 1650278

URL: http://svn.apache.org/r1650278
Log:
Untested first pass at pushing down NIO2 writes.

Modified:
    tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java
    tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
    tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java

Modified: 
tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java?rev=1650278&r1=1650277&r2=1650278&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java 
(original)
+++ tomcat/trunk/java/org/apache/coyote/http11/InternalNio2OutputBuffer.java 
Thu Jan  8 13:10:46 2015
@@ -17,20 +17,10 @@
 
 package org.apache.coyote.http11;
 
-import java.io.EOFException;
 import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import org.apache.coyote.Response;
-import org.apache.tomcat.util.buf.ByteBufferHolder;
 import org.apache.tomcat.util.net.Nio2Channel;
-import org.apache.tomcat.util.net.Nio2Endpoint;
-import org.apache.tomcat.util.net.Nio2Endpoint.Nio2SocketWrapper;
 
 /**
  * Output buffer implementation for NIO2.
@@ -50,178 +40,16 @@ public class InternalNio2OutputBuffer ex
     // ------------------------------------------------------ Protected Methods
 
     @Override
-    protected void addToBB(byte[] buf, int offset, int length)
-            throws IOException {
-
-        if (length == 0)
-            return;
-        if (socketWrapper == null || socketWrapper.getSocket() == null)
-            return;
-
-        if (isBlocking()) {
-            while (length > 0) {
-                int thisTime = transfer(buf, offset, length, 
socketWrapper.socketWriteBuffer);
-                length = length - thisTime;
-                offset = offset + thisTime;
-                if (socketWrapper.socketWriteBuffer.remaining() == 0) {
-                    flushBuffer(true);
-                }
-            }
-        } else {
-            // FIXME: Possible new behavior:
-            // If there's non blocking abuse (like a test writing 1MB in a 
single
-            // "non blocking" write), then block until the previous write is
-            // done rather than continue buffering
-            // Also allows doing autoblocking
-            // Could be "smart" with coordination with the main 
CoyoteOutputStream to
-            // indicate the end of a write
-            // Uses: if (writePending.tryAcquire(socketWrapper.getTimeout(), 
TimeUnit.MILLISECONDS))
-            if (((Nio2SocketWrapper)socketWrapper).writePending.tryAcquire()) {
-                synchronized 
(((Nio2SocketWrapper)socketWrapper).writeCompletionHandler) {
-                    // No pending completion handler, so writing to the main 
buffer
-                    // is possible
-                    int thisTime = transfer(buf, offset, length, 
socketWrapper.socketWriteBuffer);
-                    length = length - thisTime;
-                    offset = offset + thisTime;
-                    if (length > 0) {
-                        // Remaining data must be buffered
-                        addToBuffers(buf, offset, length);
-                    }
-                    flushBufferInternal(false, true);
-                }
-            } else {
-                synchronized 
(((Nio2SocketWrapper)socketWrapper).writeCompletionHandler) {
-                    addToBuffers(buf, offset, length);
-                }
-            }
-        }
-    }
-
-
-    private void addToBuffers(byte[] buf, int offset, int length) {
-        ByteBuffer buffer = ByteBuffer.allocate(length);
-        buffer.put(buf, offset, length);
-        socketWrapper.bufferedWrites.add(new ByteBufferHolder(buffer, false));
+    protected void addToBB(byte[] buf, int offset, int length) throws 
IOException {
+        socketWrapper.write(isBlocking(), buf, offset, length);
     }
 
 
-    /**
-     * Callback to write data from the buffer.
-     */
     @Override
     protected boolean flushBuffer(boolean block) throws IOException {
-        if (socketWrapper.getError() != null) {
-            throw socketWrapper.getError();
-        }
-        return flushBufferInternal(block, false);
+        return socketWrapper.flush(block);
     }
 
-    private boolean flushBufferInternal(boolean block, boolean hasPermit) 
throws IOException {
-        if (socketWrapper == null || socketWrapper.getSocket() == null)
-            return false;
-
-        if (block) {
-            if (!isBlocking()) {
-                // The final flush is blocking, but the processing was using
-                // non blocking so wait until an async write is done
-                try {
-                    if 
(((Nio2SocketWrapper)socketWrapper).writePending.tryAcquire(socketWrapper.getTimeout(),
 TimeUnit.MILLISECONDS)) {
-                        
((Nio2SocketWrapper)socketWrapper).writePending.release();
-                    }
-                } catch (InterruptedException e) {
-                    // Ignore timeout
-                }
-            }
-            try {
-                if (socketWrapper.bufferedWrites.size() > 0) {
-                    for (ByteBufferHolder holder : 
socketWrapper.bufferedWrites) {
-                        holder.flip();
-                        ByteBuffer buffer = holder.getBuf();
-                        while (buffer.hasRemaining()) {
-                            if 
(socketWrapper.getSocket().write(buffer).get(socketWrapper.getTimeout(), 
TimeUnit.MILLISECONDS).intValue() < 0) {
-                                throw new 
EOFException(sm.getString("iob.failedwrite"));
-                            }
-                        }
-                    }
-                    socketWrapper.bufferedWrites.clear();
-                }
-                if (!socketWrapper.writeBufferFlipped) {
-                    socketWrapper.socketWriteBuffer.flip();
-                    socketWrapper.writeBufferFlipped = true;
-                }
-                while (socketWrapper.socketWriteBuffer.hasRemaining()) {
-                    if 
(socketWrapper.getSocket().write(socketWrapper.socketWriteBuffer).get(socketWrapper.getTimeout(),
 TimeUnit.MILLISECONDS).intValue() < 0) {
-                        throw new 
EOFException(sm.getString("iob.failedwrite"));
-                    }
-                }
-            } catch (ExecutionException e) {
-                if (e.getCause() instanceof IOException) {
-                    throw (IOException) e.getCause();
-                } else {
-                    throw new IOException(e);
-                }
-            } catch (InterruptedException e) {
-                throw new IOException(e);
-            } catch (TimeoutException e) {
-                throw new SocketTimeoutException();
-            }
-            socketWrapper.socketWriteBuffer.clear();
-            socketWrapper.writeBufferFlipped = false;
-            return false;
-        } else {
-            synchronized 
(((Nio2SocketWrapper)socketWrapper).writeCompletionHandler) {
-                if (hasPermit || 
((Nio2SocketWrapper)socketWrapper).writePending.tryAcquire()) {
-                    if (!socketWrapper.writeBufferFlipped) {
-                        socketWrapper.socketWriteBuffer.flip();
-                        socketWrapper.writeBufferFlipped = true;
-                    }
-                    Nio2Endpoint.startInline();
-                    if (socketWrapper.bufferedWrites.size() > 0) {
-                        // Gathering write of the main buffer plus all 
leftovers
-                        ArrayList<ByteBuffer> arrayList = new ArrayList<>();
-                        if (socketWrapper.socketWriteBuffer.hasRemaining()) {
-                            arrayList.add(socketWrapper.socketWriteBuffer);
-                        }
-                        for (ByteBufferHolder buffer : 
socketWrapper.bufferedWrites) {
-                            buffer.flip();
-                            arrayList.add(buffer.getBuf());
-                        }
-                        socketWrapper.bufferedWrites.clear();
-                        ByteBuffer[] array = arrayList.toArray(new 
ByteBuffer[arrayList.size()]);
-                        socketWrapper.getSocket().write(array, 0, 
array.length, socketWrapper.getTimeout(),
-                                TimeUnit.MILLISECONDS, array, 
((Nio2SocketWrapper)socketWrapper).gatheringWriteCompletionHandler);
-                    } else if (socketWrapper.socketWriteBuffer.hasRemaining()) 
{
-                        // Regular write
-                        
socketWrapper.getSocket().write(socketWrapper.socketWriteBuffer, 
socketWrapper.getTimeout(),
-                                TimeUnit.MILLISECONDS, 
socketWrapper.socketWriteBuffer, 
((Nio2SocketWrapper)socketWrapper).writeCompletionHandler);
-                    } else {
-                        // Nothing was written
-                        
((Nio2SocketWrapper)socketWrapper).writePending.release();
-                    }
-                    Nio2Endpoint.endInline();
-                    if 
(((Nio2SocketWrapper)socketWrapper).writePending.availablePermits() > 0) {
-                        if (socketWrapper.socketWriteBuffer.remaining() == 0) {
-                            socketWrapper.socketWriteBuffer.clear();
-                            socketWrapper.writeBufferFlipped = false;
-                        }
-                    }
-                }
-                return socketWrapper.hasMoreDataToFlush() || hasBufferedData() 
|| socketWrapper.getError() != null;
-            }
-        }
-    }
-
-
-    @Override
-    public boolean hasDataToWrite() {
-        synchronized 
(((Nio2SocketWrapper)socketWrapper).writeCompletionHandler) {
-            return socketWrapper.hasMoreDataToFlush() || hasBufferedData() || 
socketWrapper.getError() != null;
-        }
-    }
-
-    protected boolean hasBufferedData() {
-        return socketWrapper.bufferedWrites.size() > 0;
-    }
 
     @Override
     protected void registerWriteInterest() {

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java?rev=1650278&r1=1650277&r2=1650278&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java (original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/Nio2Endpoint.java Thu Jan  8 
13:10:46 2015
@@ -722,16 +722,13 @@ public class Nio2Endpoint extends Abstra
         private volatile boolean readPending = false;
         private volatile boolean interest = true;
 
-        private final int maxWrite;
-        // TODO These are public for now to aid refactoring
-        public final CompletionHandler<Integer, ByteBuffer> 
writeCompletionHandler;
-        public final CompletionHandler<Long, ByteBuffer[]> 
gatheringWriteCompletionHandler;
-        public final Semaphore writePending = new Semaphore(1);
+        private final CompletionHandler<Integer, ByteBuffer> 
writeCompletionHandler;
+        private final CompletionHandler<Long, ByteBuffer[]> 
gatheringWriteCompletionHandler;
+        private final Semaphore writePending = new Semaphore(1);
 
 
         public Nio2SocketWrapper(Nio2Channel channel, Nio2Endpoint endpoint) {
             super(channel, endpoint);
-            maxWrite = channel.getBufHandler().getWriteBuffer().capacity();
 
             this.readCompletionHandler = new CompletionHandler<Integer, 
SocketWrapperBase<Nio2Channel>>() {
                 @Override
@@ -1091,57 +1088,105 @@ public class Nio2Endpoint extends Abstra
 
 
         @Override
-        public void write(boolean block, byte[] b, int off, int len) throws 
IOException {
-            int leftToWrite = len;
-            int offset = off;
-
-            while (leftToWrite > 0) {
-                int writeThisLoop;
-                int writtenThisLoop;
+        public void write(boolean block, byte[] buf, int off, int len) throws 
IOException {
+            if (len == 0)
+                return;
+            if (getSocket() == null)
+                return;
 
-                if (leftToWrite > maxWrite) {
-                    writeThisLoop = maxWrite;
-                } else {
-                    writeThisLoop = leftToWrite;
-                }
-
-                writtenThisLoop = writeInternal(block, b, offset, 
writeThisLoop);
-                if (writtenThisLoop < 0) {
-                    throw new EOFException();
-                }
-                if (!block && writePending.availablePermits() == 0) {
-                    // Prevent concurrent writes in non blocking mode,
-                    // leftover data has to be buffered
-                    return;
+            if (block) {
+                while (len > 0) {
+                    int thisTime = transfer(buf, off, len, socketWriteBuffer);
+                    len = len - thisTime;
+                    off = off + thisTime;
+                    if (socketWriteBuffer.remaining() == 0) {
+                        flush(true);
+                    }
                 }
-                offset += writtenThisLoop;
-                leftToWrite -= writtenThisLoop;
-
-                if (writtenThisLoop < writeThisLoop) {
-                    break;
+            } else {
+                // FIXME: Possible new behavior:
+                // If there's non blocking abuse (like a test writing 1MB in a 
single
+                // "non blocking" write), then block until the previous write 
is
+                // done rather than continue buffering
+                // Also allows doing autoblocking
+                // Could be "smart" with coordination with the main 
CoyoteOutputStream to
+                // indicate the end of a write
+                // Uses: if 
(writePending.tryAcquire(socketWrapper.getTimeout(), TimeUnit.MILLISECONDS))
+                if (writePending.tryAcquire()) {
+                    synchronized (writeCompletionHandler) {
+                        // No pending completion handler, so writing to the 
main buffer
+                        // is possible
+                        int thisTime = transfer(buf, off, len, 
socketWriteBuffer);
+                        len = len - thisTime;
+                        off = off + thisTime;
+                        if (len > 0) {
+                            // Remaining data must be buffered
+                            addToBuffers(buf, off, len);
+                        }
+                        flush(false, true);
+                    }
+                } else {
+                    synchronized (writeCompletionHandler) {
+                        addToBuffers(buf, off, len);
+                    }
                 }
             }
         }
 
 
         @Override
-        protected int doWrite(ByteBuffer buffer, boolean block, boolean flip)
-                throws IOException {
-            // TODO Auto-generated method stub
+        protected int doWrite(ByteBuffer buffer, boolean block, boolean flip) 
throws IOException {
+            // NO-OP for NIO2 since write() is over-ridden above.
             return 0;
         }
 
-        private int writeInternal(boolean block, byte[] b, int off, int len)
-                throws IOException {
-            ByteBuffer writeBuffer = 
getSocket().getBufHandler().getWriteBuffer();
-            int written = 0;
+
+        @Override
+        public boolean flush(boolean block) throws IOException {
+            if (getError() != null) {
+                throw getError();
+            }
+            return super.flush(block);
+        }
+
+
+        @Override
+        protected boolean flush(boolean block, boolean hasPermit) throws 
IOException {
+            if (getSocket() == null)
+                return false;
+
             if (block) {
-                writeBuffer.clear();
-                writeBuffer.put(b, off, len);
-                writeBuffer.flip();
-                writeBufferFlipped = true;
                 try {
-                    written = getSocket().write(writeBuffer).get(getTimeout(), 
TimeUnit.MILLISECONDS).intValue();
+                    if (writePending.tryAcquire(getTimeout(), 
TimeUnit.MILLISECONDS)) {
+                        writePending.release();
+                    } else {
+                        // TODO
+                    }
+                } catch (InterruptedException e) {
+                    // Ignore timeout
+                }
+                try {
+                    if (bufferedWrites.size() > 0) {
+                        for (ByteBufferHolder holder : bufferedWrites) {
+                            holder.flip();
+                            ByteBuffer buffer = holder.getBuf();
+                            while (buffer.hasRemaining()) {
+                                if 
(getSocket().write(buffer).get(getTimeout(), TimeUnit.MILLISECONDS).intValue() 
< 0) {
+                                    throw new 
EOFException(sm.getString("iob.failedwrite"));
+                                }
+                            }
+                        }
+                        bufferedWrites.clear();
+                    }
+                    if (!writeBufferFlipped) {
+                        socketWriteBuffer.flip();
+                        writeBufferFlipped = true;
+                    }
+                    while (socketWriteBuffer.hasRemaining()) {
+                        if 
(getSocket().write(socketWriteBuffer).get(getTimeout(), 
TimeUnit.MILLISECONDS).intValue() < 0) {
+                            throw new 
EOFException(sm.getString("iob.failedwrite"));
+                        }
+                    }
                 } catch (ExecutionException e) {
                     if (e.getCause() instanceof IOException) {
                         throw (IOException) e.getCause();
@@ -1151,22 +1196,60 @@ public class Nio2Endpoint extends Abstra
                 } catch (InterruptedException e) {
                     throw new IOException(e);
                 } catch (TimeoutException e) {
-                    SocketTimeoutException ex = new SocketTimeoutException();
-                    throw ex;
+                    throw new SocketTimeoutException();
                 }
+                socketWriteBuffer.clear();
+                writeBufferFlipped = false;
+                return false;
             } else {
-                if (writePending.tryAcquire()) {
-                    writeBuffer.clear();
-                    writeBuffer.put(b, off, len);
-                    writeBuffer.flip();
-                    writeBufferFlipped = true;
-                    Nio2Endpoint.startInline();
-                    getSocket().write(writeBuffer, getTimeout(), 
TimeUnit.MILLISECONDS, writeBuffer, writeCompletionHandler);
-                    Nio2Endpoint.endInline();
-                    written = len;
+                synchronized (writeCompletionHandler) {
+                    if (hasPermit || writePending.tryAcquire()) {
+                        if (!writeBufferFlipped) {
+                            socketWriteBuffer.flip();
+                            writeBufferFlipped = true;
+                        }
+                        Nio2Endpoint.startInline();
+                        if (bufferedWrites.size() > 0) {
+                            // Gathering write of the main buffer plus all 
leftovers
+                            ArrayList<ByteBuffer> arrayList = new 
ArrayList<>();
+                            if (socketWriteBuffer.hasRemaining()) {
+                                arrayList.add(socketWriteBuffer);
+                            }
+                            for (ByteBufferHolder buffer : bufferedWrites) {
+                                buffer.flip();
+                                arrayList.add(buffer.getBuf());
+                            }
+                            bufferedWrites.clear();
+                            ByteBuffer[] array = arrayList.toArray(new 
ByteBuffer[arrayList.size()]);
+                            getSocket().write(array, 0, array.length, 
getTimeout(),
+                                    TimeUnit.MILLISECONDS, array, 
gatheringWriteCompletionHandler);
+                        } else if (socketWriteBuffer.hasRemaining()) {
+                            // Regular write
+                            getSocket().write(socketWriteBuffer, getTimeout(),
+                                    TimeUnit.MILLISECONDS, socketWriteBuffer, 
writeCompletionHandler);
+                        } else {
+                            // Nothing was written
+                            writePending.release();
+                        }
+                        Nio2Endpoint.endInline();
+                        if (writePending.availablePermits() > 0) {
+                            if (socketWriteBuffer.remaining() == 0) {
+                                socketWriteBuffer.clear();
+                                writeBufferFlipped = false;
+                            }
+                        }
+                    }
+                    return hasMoreDataToFlush() || hasBufferedData() || 
getError() != null;
                 }
             }
-            return written;
+        }
+
+
+        @Override
+        public boolean hasDataToWrite() {
+            synchronized (writeCompletionHandler) {
+                return hasMoreDataToFlush() || hasBufferedData() || getError() 
!= null;
+            }
         }
 
 

Modified: tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java
URL: 
http://svn.apache.org/viewvc/tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java?rev=1650278&r1=1650277&r2=1650278&view=diff
==============================================================================
--- tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java 
(original)
+++ tomcat/trunk/java/org/apache/tomcat/util/net/SocketWrapperBase.java Thu Jan 
 8 13:10:46 2015
@@ -70,9 +70,8 @@ public abstract class SocketWrapperBase<
      */
     private final Object writeThreadLock = new Object();
 
-    // TODO These being public is a temporary hack to simplify refactoring
-    public volatile ByteBuffer socketWriteBuffer;
-    public volatile boolean writeBufferFlipped;
+    protected volatile ByteBuffer socketWriteBuffer;
+    protected volatile boolean writeBufferFlipped;
 
     /**
      * For "non-blocking" writes use an external set of buffers. Although the
@@ -80,8 +79,7 @@ public abstract class SocketWrapperBase<
      * the possible need to write HTTP headers, there may be more than one 
write
      * to the OutputBuffer.
      */
-    // TODO This being public is a temporary hack to simplify refactoring
-    public final LinkedBlockingDeque<ByteBufferHolder> bufferedWrites =
+    protected final LinkedBlockingDeque<ByteBufferHolder> bufferedWrites =
             new LinkedBlockingDeque<>();
 
     /**
@@ -180,8 +178,7 @@ public abstract class SocketWrapperBase<
     }
     public Object getWriteThreadLock() { return writeThreadLock; }
 
-    // TODO This being public is a temporary hack to simplify refactoring
-    public boolean hasMoreDataToFlush() {
+    protected boolean hasMoreDataToFlush() {
         return (writeBufferFlipped && socketWriteBuffer.remaining() > 0) ||
         (!writeBufferFlipped && socketWriteBuffer.position() > 0);
     }



---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org
For additional commands, e-mail: dev-h...@tomcat.apache.org

Reply via email to