Author: fhanik
Date: Mon Oct 23 10:39:28 2006
New Revision: 467044

URL: http://svn.apache.org/viewvc?view=rev&rev=467044
Log:
Added in blocking logic to the NIO connector. This logic ensures that if there 
is a slow client, we will not be wasting CPU cycles doing endless spinning.
Ideas for this implementation can be credited to the Tribes implementation 
where we have a pool of selectors, so that each sending thread uses its own 
selector and from Jeanfrancois Arcand's blog about wrapping a NIO channel in a 
blocking inputstream.

Added:
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java
Modified:
    
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
    tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
    
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
    
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java?view=diff&rev=467044&r1=467043&r2=467044
==============================================================================
--- 
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
 (original)
+++ 
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/transport/nio/NioSender.java
 Mon Oct 23 10:39:28 2006
@@ -31,6 +31,7 @@
 import org.apache.catalina.tribes.transport.AbstractSender;
 import org.apache.catalina.tribes.transport.DataSender;
 import org.apache.catalina.tribes.RemoteProcessException;
+import java.io.EOFException;
 
 /**
  * This class is NOT thread safe and should never be used with more than one 
thread at a time
@@ -177,6 +178,7 @@
                 //weve written everything, or we are starting a new package
                 //protect against buffer overwrite
                 int byteswritten = socketChannel.write(writebuf);
+                if (byteswritten == -1 ) throw new EOFException();
                 remaining -= byteswritten;
                 //if the entire message was written from the buffer
                 //reset the position counter

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java?view=diff&rev=467044&r1=467043&r2=467044
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/Http11NioProcessor.java 
Mon Oct 23 10:39:28 2006
@@ -100,7 +100,7 @@
 
         response = new Response();
         response.setHook(this);
-        outputBuffer = new InternalNioOutputBuffer(response, headerBufferSize);
+        outputBuffer = new InternalNioOutputBuffer(response, 
headerBufferSize,readTimeout);
         response.setOutputBuffer(outputBuffer);
         request.setResponse(response);
 
@@ -806,6 +806,8 @@
         this.socket = socket;
         inputBuffer.setSocket(socket);
         outputBuffer.setSocket(socket);
+        inputBuffer.setSelectorPool(endpoint.getSelectorPool());
+        outputBuffer.setSelectorPool(endpoint.getSelectorPool());
 
         // Error flag
         error = false;

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java?view=diff&rev=467044&r1=467043&r2=467044
==============================================================================
--- 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java 
(original)
+++ 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioInputBuffer.java 
Mon Oct 23 10:39:28 2006
@@ -33,6 +33,8 @@
 import org.apache.tomcat.util.net.NioEndpoint.Poller;
 import org.apache.tomcat.util.res.StringManager;
 import org.apache.tomcat.util.net.NioChannel;
+import org.apache.tomcat.util.net.NioSelectorPool;
+import java.nio.channels.Selector;
 
 /**
  * Implementation of InputBuffer which provides HTTP request header parsing as
@@ -157,7 +159,12 @@
      * Underlying socket.
      */
     protected NioChannel socket;
-
+    
+    /**
+     * Selector pool, for blocking reads and blocking writes
+     */
+    protected NioSelectorPool pool;
+    
 
     /**
      * Underlying input buffer.
@@ -199,8 +206,7 @@
     public void setSocket(NioChannel socket) {
         this.socket = socket;
     }
-
-
+    
     /**
      * Get the underlying socket input stream.
      */
@@ -208,6 +214,15 @@
         return socket;
     }
 
+    public void setSelectorPool(NioSelectorPool pool) { 
+        this.pool = pool;
+    }
+    
+    public NioSelectorPool getSelectorPool() {
+        return pool;
+    }
+
+
     /**
      * Add an input filter to the filter library.
      */
@@ -549,47 +564,34 @@
      */
     private boolean readSocket(boolean timeout, boolean block) throws 
IOException {
         int nRead = 0;
-        long start = System.currentTimeMillis();
-        boolean timedOut = false;
-        do {
-            
-            socket.getBufHandler().getReadBuffer().clear();
+        long rto = timeout?this.readTimeout:-1;
+        socket.getBufHandler().getReadBuffer().clear();
+        if ( block ) {
+            Selector selector = null;
+            try { selector = getSelectorPool().get(); }catch ( IOException x ) 
{}
+            try {
+                nRead = 
getSelectorPool().read(socket.getBufHandler().getReadBuffer(),socket.getIOChannel(),selector,rto);
+            } catch ( EOFException eof ) {
+                nRead = -1;
+            } finally { 
+                if ( selector != null ) getSelectorPool().put(selector);
+            }
+        } else {
             nRead = socket.read(socket.getBufHandler().getReadBuffer());
-            if (nRead > 0) {
-                socket.getBufHandler().getReadBuffer().flip();
-                socket.getBufHandler().getReadBuffer().limit(nRead);
-                expand(nRead + pos);
-                socket.getBufHandler().getReadBuffer().get(buf, pos, nRead);
-                lastValid = pos + nRead;
-                return true;
-            } else if (nRead == -1) {
-                //return false;
-                throw new EOFException(sm.getString("iib.eof.error"));
-            } else if ( !block ) {
-                return false;
-            } else {
-                timedOut = timeout && (readTimeout != -1) && 
((System.currentTimeMillis()-start)>readTimeout);
-                if ( !timedOut && nRead == 0 )  {
-                    try {
-                        final SelectionKey key = 
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
-                        final KeyAttachment att = 
(KeyAttachment)key.attachment();
-                        //to do, add in a check, we might have just timed out 
on the wait,
-                        //so there is no need to register us again.
-                        boolean addToQueue = false;
-                        try { addToQueue = 
((att.interestOps()&SelectionKey.OP_READ) != SelectionKey.OP_READ); } catch ( 
CancelledKeyException ckx ){ throw new IOException("Socket key cancelled.");}
-                        if ( addToQueue ) {
-                            synchronized (att.getMutex()) {
-                                addToReadQueue(key, att);
-                                att.getMutex().wait(readTimeout);
-                            }
-                        }//end if
-                    }catch ( Exception x ) {}
-                }
-             }
-        }while ( nRead == 0 && (!timedOut) );
-        //else throw new IOException(sm.getString("iib.failedread"));
-        //return false; //timeout
-        throw new IOException("read timed out.");
+        }
+        if (nRead > 0) {
+            socket.getBufHandler().getReadBuffer().flip();
+            socket.getBufHandler().getReadBuffer().limit(nRead);
+            expand(nRead + pos);
+            socket.getBufHandler().getReadBuffer().get(buf, pos, nRead);
+            lastValid = pos + nRead;
+            return true;
+        } else if (nRead == -1) {
+            //return false;
+            throw new EOFException(sm.getString("iib.eof.error"));
+        } else {
+            return false;
+        }
     }
 
     private void addToReadQueue(final SelectionKey key, final KeyAttachment 
att) {

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java?view=diff&rev=467044&r1=467043&r2=467044
==============================================================================
--- 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java 
(original)
+++ 
tomcat/tc6.0.x/trunk/java/org/apache/coyote/http11/InternalNioOutputBuffer.java 
Mon Oct 23 10:39:28 2006
@@ -17,9 +17,11 @@
 
 package org.apache.coyote.http11;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
 
 import org.apache.coyote.ActionCode;
 import org.apache.coyote.OutputBuffer;
@@ -31,6 +33,7 @@
 import org.apache.tomcat.util.http.MimeHeaders;
 import org.apache.tomcat.util.net.NioChannel;
 import org.apache.tomcat.util.net.NioEndpoint;
+import org.apache.tomcat.util.net.NioSelectorPool;
 import org.apache.tomcat.util.res.StringManager;
 
 /**
@@ -54,14 +57,14 @@
      * Default constructor.
      */
     public InternalNioOutputBuffer(Response response) {
-        this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE);
+        this(response, Constants.DEFAULT_HTTP_HEADER_BUFFER_SIZE, 10000);
     }
 
 
     /**
      * Alternate constructor.
      */
-    public InternalNioOutputBuffer(Response response, int headerBufferSize) {
+    public InternalNioOutputBuffer(Response response, int headerBufferSize, 
long writeTimeout) {
 
         this.response = response;
         headers = response.getMimeHeaders();
@@ -83,6 +86,8 @@
 
         committed = false;
         finished = false;
+        
+        this.writeTimeout = writeTimeout;
 
         // Cause loading of HttpMessages
         HttpMessages.getMessage(200);
@@ -143,6 +148,12 @@
      * Underlying socket.
      */
     protected NioChannel socket;
+    
+    /**
+     * Selector pool, for blocking reads and blocking writes
+     */
+    protected NioSelectorPool pool;
+
 
 
     /**
@@ -168,7 +179,11 @@
      * Index of the last active filter.
      */
     protected int lastActiveFilter;
-
+    
+    /**
+     * Write time out in milliseconds
+     */
+    protected long writeTimeout = -1;
 
 
     // ------------------------------------------------------------- Properties
@@ -181,12 +196,28 @@
         this.socket = socket;
     }
 
+    public void setWriteTimeout(long writeTimeout) {
+        this.writeTimeout = writeTimeout;
+    }
+
     /**
      * Get the underlying socket input stream.
      */
     public NioChannel getSocket() {
         return socket;
     }
+
+    public long getWriteTimeout() {
+        return writeTimeout;
+    }
+
+    public void setSelectorPool(NioSelectorPool pool) { 
+        this.pool = pool;
+    }
+
+    public NioSelectorPool getSelectorPool() {
+        return pool;
+    }    
     /**
      * Set the socket buffer size.
      */
@@ -392,14 +423,22 @@
     private synchronized void writeToSocket(ByteBuffer bytebuffer, boolean 
flip) throws IOException {
         //int limit = bytebuffer.position();
         if ( flip ) bytebuffer.flip();
-        while ( bytebuffer.hasRemaining() ) {
-            int written = socket.write(bytebuffer);
+        int written = 0;
+        Selector selector = null;
+        try {
+            selector = getSelectorPool().get();
+        } catch ( IOException x ) {
+            //ignore
+        }
+        try {
+            written = getSelectorPool().write(bytebuffer, 
socket.getIOChannel(), selector, writeTimeout);
+            //make sure we are flushed 
+            do {
+                if (socket.flush(selector)) break;
+            }while ( true );
+        }finally { 
+            if ( selector != null ) getSelectorPool().put(selector);
         }
-        //make sure we are flushed 
-        do {
-            if (socket.flush()) break;
-        }while ( true );
-        
         socket.getBufHandler().getWriteBuffer().clear();
         this.total = 0;
     } 

Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java?view=diff&rev=467044&r1=467043&r2=467044
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java Mon 
Oct 23 10:39:28 2006
@@ -1,19 +1,21 @@
 /*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
  *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
+
+
 package org.apache.tomcat.util.net;
 
 import java.io.IOException;
@@ -23,6 +25,7 @@
 
 import org.apache.tomcat.util.net.NioEndpoint.Poller;
 import org.apache.tomcat.util.net.SecureNioChannel.ApplicationBufferHandler;
+import java.nio.channels.Selector;
 
 /**
  * 
@@ -34,20 +37,20 @@
  * @version 1.0
  */
 public class NioChannel implements ByteChannel{
-    
+
     protected static ByteBuffer emptyBuf = ByteBuffer.allocate(0);
 
     protected SocketChannel sc = null;
 
     protected ApplicationBufferHandler bufHandler;
-    
+
     protected Poller poller;
 
     public NioChannel(SocketChannel channel, ApplicationBufferHandler 
bufHandler) throws IOException {
         this.sc = channel;
         this.bufHandler = bufHandler;
     }
-    
+
     public void reset() throws IOException {
         bufHandler.getReadBuffer().clear();
         bufHandler.getWriteBuffer().clear();
@@ -58,7 +61,7 @@
      * been flushed out and is empty
      * @return boolean
      */
-    public boolean flush() throws IOException {
+    public boolean flush(Selector s) throws IOException {
         return true; //no network buffer in the regular channel
     }
 
@@ -154,7 +157,7 @@
     public boolean isInitHandshakeComplete() {
         return true;
     }
-    
+
     public int handshake(boolean read, boolean write) throws IOException {
         return 0;
     }
@@ -171,4 +174,4 @@
         return super.toString()+":"+this.sc.toString();
     }
 
-}
\ No newline at end of file
+}

Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?view=diff&rev=467044&r1=467043&r2=467044
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Mon 
Oct 23 10:39:28 2006
@@ -35,7 +35,6 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicLong;
-
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
@@ -143,6 +142,8 @@
     protected int readBufSize = 8192;
     protected int writeBufSize = 8192;
     
+    protected NioSelectorPool selectorPool = new NioSelectorPool();;
+    
     /**
      * Server socket "pointer".
      */
@@ -418,6 +419,10 @@
         this.readBufSize = readBufSize;
     }
 
+    public void setSelectorPool(NioSelectorPool selectorPool) {
+        this.selectorPool = selectorPool;
+    }
+
     protected SSLContext sslContext = null;
     public SSLContext getSSLContext() { return sslContext;}
     public void setSSLContext(SSLContext c) { sslContext = c;}
@@ -548,6 +553,9 @@
             running = true;
             paused = false;
             
+            selectorPool.setMaxSelectors(maxThreads);
+            selectorPool.setMaxSpareSelectors(-1);
+            selectorPool.open();
             
             // Create worker collection
             if (executor == null) {
@@ -611,6 +619,7 @@
             }
             pollers = null;
         }
+        try {selectorPool.close();}catch (IOException x){}
         nioChannels.clear();
     }
 
@@ -650,8 +659,12 @@
         return readBufSize;
     }
 
+    public NioSelectorPool getSelectorPool() {
+        return selectorPool;
+    }
+
     /**
-     * Unlock the server socket accept using a bugus connection.
+     * Unlock the server socket accept using a bogus connection.
      */
     protected void unlockAccept() {
         java.net.Socket s = null;
@@ -709,7 +722,7 @@
                     int appbufsize = 
engine.getSession().getApplicationBufferSize();
                     int bufsize = Math.max(Math.max(getReadBufSize(), 
getWriteBufSize()), appbufsize);
                     NioBufferHandler bufhandler = new 
NioBufferHandler(bufsize, bufsize);
-                    channel = new SecureNioChannel(socket, engine, bufhandler);
+                    channel = new SecureNioChannel(socket, engine, bufhandler, 
selectorPool);
                 } else {
                     NioBufferHandler bufhandler = new 
NioBufferHandler(getReadBufSize(), getWriteBufSize());
                     channel = new NioChannel(socket, bufhandler);

Added: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java?view=auto&rev=467044
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java 
(added)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioSelectorPool.java 
Mon Oct 23 10:39:28 2006
@@ -0,0 +1,200 @@
+/*
+ *  Copyright 2005-2006 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.tomcat.util.net;
+
+import java.util.LinkedList;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.nio.channels.Selector;
+import java.io.IOException;
+import java.util.NoSuchElementException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.SelectionKey;
+import java.io.EOFException;
+import java.net.SocketTimeoutException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * 
+ * Thread safe non blocking selector pool
+ * @author Filip Hanik
+ * @version 1.0
+ * @since 6.0
+ */
+
+public class NioSelectorPool {
+    protected int maxSelectors = 100;
+    protected int maxSpareSelectors = -1;
+    protected boolean enabled = true;
+    protected AtomicInteger active = new AtomicInteger(0);
+    protected AtomicInteger spare = new AtomicInteger(0);
+    //protected LinkedList<Selector> selectors = new LinkedList<Selector>();
+    protected ConcurrentLinkedQueue<Selector> selectors = new 
ConcurrentLinkedQueue<Selector>();
+    
+    public Selector get() throws IOException{
+        if ( (!enabled) || active.incrementAndGet() >= maxSelectors ) {
+            active.decrementAndGet();
+            return null;
+        }
+        Selector s = null;
+        try {
+            s = selectors.size()>0?selectors.poll():null;
+            if (s == null) s = Selector.open();
+            else spare.decrementAndGet();
+            
+        }catch (NoSuchElementException x ) {
+            try {s = Selector.open();}catch (IOException iox){}
+        } finally {
+            if ( s == null ) active.decrementAndGet();//we were unable to find 
a selector
+        }
+        return s;            
+    }
+    
+    
+    
+    public void put(Selector s) throws IOException {
+        active.decrementAndGet();
+        if ( enabled && (maxSpareSelectors==-1 || spare.get() < 
Math.min(maxSpareSelectors,maxSelectors)) ) {
+            spare.incrementAndGet();
+            selectors.offer(s);
+        }
+        else s.close();
+    }
+    
+    public void close() throws IOException {
+        enabled = false;
+        Selector s;
+        while ( (s = selectors.poll()) != null ) s.close();
+        spare.set(0);
+    }
+    
+    public void open(){
+        enabled = true;
+    }
+    
+    /**
+     * Performs a blocking write using the bytebuffer for data to be written 
and a selector to block.
+     * If the <code>selector</code> parameter is null, then it will perform a 
busy write that could
+     * take up a lot of CPU cycles.
+     * @param buf ByteBuffer - the buffer containing the data, we will write 
as long as <code>(buf.hasRemaining()==true)</code>
+     * @param socket SocketChannel - the socket to write data to
+     * @param selector Selector - the selector to use for blocking, if null 
then a busy write will be initiated
+     * @param writeTimeout long - the timeout for this write operation in 
milliseconds, -1 means no timeout
+     * @return int - returns the number of bytes written
+     * @throws EOFException if write returns -1
+     * @throws SocketTimeoutException if the write times out
+     * @throws IOException if an IO Exception occurs in the underlying socket 
logic
+     */
+    public int write(ByteBuffer buf, SocketChannel socket, Selector selector, 
long writeTimeout) throws IOException {
+        SelectionKey key = null;
+        int written = 0;
+        boolean timedout = false;
+        int keycount = 1; //assume we can write
+        long time = System.currentTimeMillis(); //start the timeout timer
+        try {
+            while ( (!timedout) && buf.hasRemaining() ) {
+                if ( keycount > 0 ) { //only write if we were registered for a 
write
+                    int cnt = socket.write(buf); //write the data
+                    if (cnt == -1) throw new EOFException();
+                    written += cnt;
+                    if (cnt > 0) {
+                        time = System.currentTimeMillis(); //reset our timeout 
timer
+                        continue; //we successfully wrote, try again without a 
selector
+                    }
+                }
+                if ( selector != null ) {
+                    //register OP_WRITE to the selector
+                    if (key==null) key = socket.register(selector, 
SelectionKey.OP_WRITE);
+                    else key.interestOps(SelectionKey.OP_WRITE);
+                    keycount = selector.select(writeTimeout);
+                }                
+                if (writeTimeout > 0 && (selector == null || keycount == 0) ) 
timedout = (System.currentTimeMillis()-time)>=writeTimeout;
+            }//while
+            if ( timedout ) throw new SocketTimeoutException();
+        } finally {
+            if (key != null) key.cancel();
+            if (selector != null) selector.selectNow();
+        }
+        return written;
+    }
+    
+    /**
+     * Performs a blocking read using the bytebuffer for data to be read and a 
selector to block.
+     * If the <code>selector</code> parameter is null, then it will perform a 
busy read that could
+     * take up a lot of CPU cycles.
+     * @param buf ByteBuffer - the buffer containing the data, we will read as 
until we have read at least one byte or we timed out
+     * @param socket SocketChannel - the socket to write data to
+     * @param selector Selector - the selector to use for blocking, if null 
then a busy read will be initiated
+     * @param readTimeout long - the timeout for this read operation in 
milliseconds, -1 means no timeout
+     * @return int - returns the number of bytes read
+     * @throws EOFException if read returns -1
+     * @throws SocketTimeoutException if the read times out
+     * @throws IOException if an IO Exception occurs in the underlying socket 
logic
+     */
+    public int read(ByteBuffer buf, SocketChannel socket, Selector selector, 
long readTimeout) throws IOException {
+        SelectionKey key = null;
+        int read = 0;
+        boolean timedout = false;
+        int keycount = 1; //assume we can write
+        long time = System.currentTimeMillis(); //start the timeout timer
+        try {
+            while ( (!timedout) && read == 0 ) {
+                if ( keycount > 0 ) { //only read if we were registered for a 
read
+                    int cnt = socket.read(buf);
+                    if (cnt == -1) throw new EOFException();
+                    read += cnt;
+                    if (cnt > 0) break;
+                }
+                if ( selector != null ) {
+                    //register OP_WRITE to the selector
+                    if (key==null) key = socket.register(selector, 
SelectionKey.OP_READ);
+                    else key.interestOps(SelectionKey.OP_READ);
+                    keycount = selector.select(readTimeout);
+                }                
+                if (readTimeout > 0 && (selector == null || keycount == 0) ) 
timedout = (System.currentTimeMillis()-time)>=readTimeout;
+            }//while
+            if ( timedout ) throw new SocketTimeoutException();
+        } finally {
+            if (key != null) key.cancel();
+            if (selector != null) selector.selectNow();
+        }
+        return read;
+    }
+    
+    public void setMaxSelectors(int maxSelectors) {
+        this.maxSelectors = maxSelectors;
+    }
+
+    public void setMaxSpareSelectors(int maxSpareSelectors) {
+        this.maxSpareSelectors = maxSpareSelectors;
+    }
+
+    public void setEnabled(boolean enabled) {
+        this.enabled = enabled;
+    }
+
+    public int getMaxSelectors() {
+        return maxSelectors;
+    }
+
+    public int getMaxSpareSelectors() {
+        return maxSpareSelectors;
+    }
+
+    public boolean isEnabled() {
+        return enabled;
+    }
+}
\ No newline at end of file

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java?view=diff&rev=467044&r1=467043&r2=467044
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java 
Mon Oct 23 10:39:28 2006
@@ -8,6 +8,7 @@
 import javax.net.ssl.SSLEngineResult;
 import javax.net.ssl.SSLEngineResult.HandshakeStatus;
 import javax.net.ssl.SSLEngineResult.Status;
+import java.nio.channels.Selector;
 
 /**
  * 
@@ -29,7 +30,10 @@
     protected boolean closed = false;
     protected boolean closing = false;
     
-    public SecureNioChannel(SocketChannel channel, SSLEngine engine, 
ApplicationBufferHandler bufHandler) throws IOException {
+    protected NioSelectorPool pool;
+    
+    public SecureNioChannel(SocketChannel channel, SSLEngine engine, 
+                            ApplicationBufferHandler bufHandler, 
NioSelectorPool pool) throws IOException {
         super(channel,bufHandler);
         this.sslEngine = engine;
         int appBufSize = sslEngine.getSession().getApplicationBufferSize();
@@ -37,7 +41,10 @@
         //allocate network buffers - TODO, add in optional direct non-direct 
buffers
         if ( netInBuffer == null ) netInBuffer = 
ByteBuffer.allocateDirect(netBufSize);
         if ( netOutBuffer == null ) netOutBuffer = 
ByteBuffer.allocateDirect(netBufSize);
-
+        
+        //selector pool for blocking operations
+        this.pool = pool;
+        
         //ensure that the application has a large enough read/write buffers
         //by doing this, we should not encounter any buffer overflow errors
         bufHandler.expand(bufHandler.getReadBuffer(), appBufSize);
@@ -72,12 +79,13 @@
      * been flushed out and is empty
      * @return boolean
      */
-    public boolean flush() throws IOException {
-        return flush(netOutBuffer);
+    public boolean flush(Selector s, long timeout) throws IOException {
+        pool.write(netOutBuffer,sc,s,timeout);
+        return !netOutBuffer.hasRemaining();
     }
     
     /**
-     * Flushes the buffer to the network
+     * Flushes the buffer to the network, non blocking
      * @param buf ByteBuffer
      * @return boolean true if the buffer has been emptied out, false otherwise
      * @throws IOException



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to