Author: fhanik
Date: Wed Aug  9 10:12:37 2006
New Revision: 430097

URL: http://svn.apache.org/viewvc?rev=430097&view=rev
Log:
Added in a cache for byte buffers

Modified:
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
    tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java

Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java?rev=430097&r1=430096&r2=430097&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioChannel.java Wed 
Aug  9 10:12:37 2006
@@ -46,6 +46,11 @@
         this.sc = channel;
         this.bufHandler = bufHandler;
     }
+    
+    public void reset() throws IOException {
+        bufHandler.getReadBuffer().clear();
+        bufHandler.getWriteBuffer().clear();
+    }
 
     /**
      * returns true if the network buffer has 
@@ -119,7 +124,6 @@
     public Poller getPoller() {
         return poller;
     }
-
     /**
      * getIOChannel
      *
@@ -156,6 +160,14 @@
 
     public void setPoller(Poller poller) {
         this.poller = poller;
+    }
+
+    public void setIOChannel(SocketChannel IOChannel) {
+        this.sc = IOChannel;
+    }
+
+    public String toString() {
+        return super.toString()+":"+this.sc.toString();
     }
 
 }

Modified: tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=430097&r1=430096&r2=430097&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/NioEndpoint.java Wed 
Aug  9 10:12:37 2006
@@ -45,6 +45,7 @@
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.net.Socket;
 
 /**
  * NIO tailored thread pool, providing the following services:
@@ -149,11 +150,13 @@
     protected ServerSocketChannel serverSock = null;
 
 
-    /**
-     * APR memory pool for the server socket.
-     */
-    protected long serverSockPool = 0;
-
+    protected ConcurrentLinkedQueue<NioChannel> nioChannels = new 
ConcurrentLinkedQueue<NioChannel>() {
+        public boolean offer(NioChannel o) {
+            //avoid over growing our cache or add after we have stopped
+            if ( running && (size() < curThreads) ) return super.offer(o);
+            else return false;
+        }
+    };
 
     
 
@@ -581,6 +584,7 @@
             }
             pollers = null;
         }
+        nioChannels.clear();
     }
 
 
@@ -597,6 +601,7 @@
         serverSock = null;
         sslContext = null;
         initialized = false;
+        nioChannels.clear();
     }
 
 
@@ -658,33 +663,36 @@
         try {
             //disable blocking, APR style, we are gonna be polling it
             socket.configureBlocking(false);
-
+            Socket sock = socket.socket();
             // 1: Set socket options: timeout, linger, etc
             if (soLinger >= 0)
-                socket.socket().setSoLinger(true,soLinger);
+                sock.setSoLinger(true,soLinger);
             if (tcpNoDelay)
-                socket.socket().setTcpNoDelay(true);
+                sock.setTcpNoDelay(true);
             if (soTimeout > 0)
-                socket.socket().setSoTimeout(soTimeout);
+                sock.setSoTimeout(soTimeout);
 
-            NioChannel channel = null;
-            // 2: SSL setup
-            step = 2;
-            if (sslContext != null) {
-                SSLEngine engine = sslContext.createSSLEngine();
-                engine.setNeedClientAuth(getClientAuth());
-                engine.setUseClientMode(false);
-                int appbufsize = 
engine.getSession().getApplicationBufferSize();
-                int bufsize = 
Math.max(Math.max(getReadBufSize(),getWriteBufSize()),appbufsize);
-                NioBufferHandler bufhandler = new 
NioBufferHandler(bufsize,bufsize);
-                channel = new SecureNioChannel(socket,engine,bufhandler);
-                
+            NioChannel channel = nioChannels.poll();
+            if ( channel == null ) {
+                // 2: SSL setup
+                step = 2;
+
+                if (sslContext != null) {
+                    SSLEngine engine = sslContext.createSSLEngine();
+                    engine.setNeedClientAuth(getClientAuth());
+                    engine.setUseClientMode(false);
+                    int appbufsize = 
engine.getSession().getApplicationBufferSize();
+                    int bufsize = Math.max(Math.max(getReadBufSize(), 
getWriteBufSize()), appbufsize);
+                    NioBufferHandler bufhandler = new 
NioBufferHandler(bufsize, bufsize);
+                    channel = new SecureNioChannel(socket, engine, bufhandler);
+                } else {
+                    NioBufferHandler bufhandler = new 
NioBufferHandler(getReadBufSize(), getWriteBufSize());
+                    channel = new NioChannel(socket, bufhandler);
+                }
             } else {
-                NioBufferHandler bufhandler = new 
NioBufferHandler(getReadBufSize(),getWriteBufSize());
-                channel = new NioChannel(socket,bufhandler);
+                channel.setIOChannel(socket);
+                channel.reset();
             }
-            
-            
             getPoller0().register(channel);
 
         } catch (Throwable t) {
@@ -779,6 +787,21 @@
     }
 
 
+    protected boolean processSocket(SocketChannel socket) {
+        try {
+            if (executor == null) {
+                getWorkerThread().assign(socket);
+            }  else {
+                executor.execute(new SocketOptionsProcessor(socket));
+            }
+        } catch (Throwable t) {
+            // This means we got an OOM or similar creating a thread, or that
+            // the pool and its queue are full
+            log.error(sm.getString("endpoint.process.fail"), t);
+            return false;
+        }
+        return true;
+    }
     /**
      * Process given socket.
      */
@@ -849,13 +872,14 @@
                 try {
                     // Accept the next incoming connection from the server 
socket
                     SocketChannel socket = serverSock.accept();
+                    processSocket(socket);
                     // Hand this socket off to an appropriate processor
-                    if(!setSocketOptions(socket))
-                    {
-                        // Close socket right away
-                        socket.socket().close();
-                        socket.close();
-                    }
+//                    if(!setSocketOptions(socket))
+//                    {
+//                        // Close socket right away
+//                        socket.socket().close();
+//                        socket.close();
+//                    }
                 } catch (Throwable t) {
                     log.error(sm.getString("endpoint.accept.fail"), t);
                 }
@@ -1187,7 +1211,7 @@
 
         protected Thread thread = null;
         protected boolean available = false;
-        protected NioChannel socket = null;
+        protected Object socket = null;
         protected boolean event = false;
         protected boolean error = false;
 
@@ -1201,7 +1225,7 @@
          *
          * @param socket TCP socket to process
          */
-        protected synchronized void assign(NioChannel socket) {
+        protected synchronized void assign(Object socket) {
 
             // Wait for the Processor to get the previous Socket
             while (available) {
@@ -1210,7 +1234,6 @@
                 } catch (InterruptedException e) {
                 }
             }
-
             // Store the newly available Socket and notify our thread
             this.socket = socket;
             event = false;
@@ -1221,7 +1244,7 @@
         }
 
 
-        protected synchronized void assign(NioChannel socket, boolean error) {
+        protected synchronized void assign(Object socket, boolean error) {
 
             // Wait for the Processor to get the previous Socket
             while (available) {
@@ -1244,7 +1267,7 @@
          * Await a newly assigned Socket from our Connector, or 
<code>null</code>
          * if we are supposed to shut down.
          */
-        protected synchronized NioChannel await() {
+        protected synchronized Object await() {
 
             // Wait for the Connector to provide a new Socket
             while (!available) {
@@ -1255,7 +1278,7 @@
             }
 
             // Notify the Connector that we have received this Socket
-            NioChannel socket = this.socket;
+            Object socket = this.socket;
             available = false;
             notifyAll();
 
@@ -1272,72 +1295,99 @@
 
             // Process requests until we receive a shutdown signal
             while (running) {
-                // Wait for the next socket to be assigned
-                NioChannel socket = await();
-                if (socket == null)
-                    continue;
-                SelectionKey key = 
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
-                int handshake = -1;
                 try {
-                    handshake = socket.handshake(key.isReadable(), 
key.isWritable());
-                }catch ( IOException x ) {
-                    handshake = -1;
-                    log.error("Error during SSL handshake",x);
-                }catch ( CancelledKeyException ckx ) {
-                    handshake = -1;
-                }
-                if ( handshake == 0 ) {
-                    // Process the request from this socket
-                    if ((event) && (handler.event(socket, error) == 
Handler.SocketState.CLOSED)) {
-                        // Close socket and pool
-                        try {
-                            try {socket.close();}catch (Exception ignore){}
-                            if ( socket.isOpen() ) socket.close(true);
-                        }catch ( Exception x ) {
-                            log.error("",x);
+                    // Wait for the next socket to be assigned
+                    Object channel = await();
+                    if (channel == null)
+                        continue;
+
+                    if ( channel instanceof SocketChannel) {
+                        SocketChannel sc = (SocketChannel)channel;
+                        if ( !setSocketOptions(sc) ) {
+                            try {
+                                sc.socket().close();
+                                sc.close();
+                            }catch ( IOException ix ) {
+                                if ( log.isDebugEnabled() ) log.debug("",ix);
+                            }
+                        } else {
+                            //now we have it registered, remove it from the 
cache
+                            
                         }
-                    } else if ((!event) && (handler.process(socket) == 
Handler.SocketState.CLOSED)) {
-                        // Close socket and pool
+                    } else {
+                        
+                        NioChannel socket = (NioChannel)channel;
+
+                        SelectionKey key = 
socket.getIOChannel().keyFor(socket.getPoller().getSelector());
+                        int handshake = -1;
                         try {
-                            try {socket.close();}catch (Exception ignore){}
-                            if ( socket.isOpen() ) socket.close(true);
-                        }catch ( Exception x ) {
-                            log.error("",x);
+                            handshake = socket.handshake(key.isReadable(), 
key.isWritable());
+                        }catch ( IOException x ) {
+                            handshake = -1;
+                            log.error("Error during SSL handshake",x);
+                        }catch ( CancelledKeyException ckx ) {
+                            handshake = -1;
                         }
-                    }
-                } else if (handshake == -1 ) {
-                    if ( key.isValid() ) key.cancel();
-                    try {socket.close(true);}catch (IOException ignore){}
-                } else {
-                    final SelectionKey fk = key;
-                    final int intops = handshake;
-                    final KeyAttachment ka = (KeyAttachment)fk.attachment();
-                    //register for handshake ops
-                    Runnable r = new Runnable() {
-                        public void run() {
-                            try {
-                                fk.interestOps(intops);
-                                ka.interestOps(intops);
-                            } catch (CancelledKeyException ckx) {
+                        if ( handshake == 0 ) {
+                            // Process the request from this socket
+                            if ((event) && (handler.event(socket, error) == 
Handler.SocketState.CLOSED)) {
+                                // Close socket and pool
                                 try {
-                                    if ( fk != null && fk.attachment() != null 
) {
-                                        
-                                        ka.setError(true); //set to collect 
this socket immediately
-                                        try 
{ka.getChannel().getIOChannel().socket().close();}catch(Exception ignore){}
-                                        try 
{ka.getChannel().close();}catch(Exception ignore){}
-                                        ka.setWakeUp(false);
-                                    }
-                                } catch (Exception ignore) {}
+                                    
+                                    try {socket.close();}catch (Exception 
ignore){}
+                                    if ( socket.isOpen() ) socket.close(true);
+                                    nioChannels.offer(socket);
+                                }catch ( Exception x ) {
+                                    log.error("",x);
+                                }
+                            } else if ((!event) && (handler.process(socket) == 
Handler.SocketState.CLOSED)) {
+                                // Close socket and pool
+                                try {
+                                    
+                                    try {socket.close();}catch (Exception 
ignore){}
+                                    if ( socket.isOpen() ) socket.close(true);
+                                    nioChannels.offer(socket);
+                                }catch ( Exception x ) {
+                                    log.error("",x);
+                                }
                             }
+                        } else if (handshake == -1 ) {
+                            if ( key.isValid() ) key.cancel();
+                            try {socket.close(true);}catch (IOException 
ignore){}
+                            nioChannels.offer(socket);
+                        } else {
+                            final SelectionKey fk = key;
+                            final int intops = handshake;
+                            final KeyAttachment ka = 
(KeyAttachment)fk.attachment();
+                            //register for handshake ops
+                            Runnable r = new Runnable() {
+                                public void run() {
+                                    try {
+                                        fk.interestOps(intops);
+                                        ka.interestOps(intops);
+                                    } catch (CancelledKeyException ckx) {
+                                        try {
+                                            if ( fk != null && fk.attachment() 
!= null ) {
+
+                                                ka.setError(true); //set to 
collect this socket immediately
+                                                try 
{ka.getChannel().getIOChannel().socket().close();}catch(Exception ignore){}
+                                                try 
{ka.getChannel().close();}catch(Exception ignore){}
+                                                ka.setWakeUp(false);
+                                            }
+                                        } catch (Exception ignore) {}
+                                    }
 
+                                }
+                            };
+                            ka.getPoller().addEvent(r);
                         }
-                    };
-                    ka.getPoller().addEvent(r);
+                    }
+                } finally {
+                    //dereference socket to let GC do its job
+                    socket = null;
+                    // Finish up this request
+                    recycleWorkerThread(this);
                 }
-                //dereference socket to let GC do its job
-                socket = null;
-                // Finish up this request
-                recycleWorkerThread(this);
             }
         }
 
@@ -1446,6 +1496,32 @@
     }
 
 
+    // ---------------------------------------------- SocketOptionsProcessor 
Inner Class
+
+
+    /**
+     * This class is the equivalent of the Worker, but will simply use in an
+     * external Executor thread pool.
+     */
+    protected class SocketOptionsProcessor implements Runnable {
+
+        protected SocketChannel sc = null;
+
+        public SocketOptionsProcessor(SocketChannel socket) {
+            this.sc = socket;
+        }
+
+        public void run() {
+            if ( !setSocketOptions(sc) ) {
+                try {
+                    sc.socket().close();
+                    sc.close();
+                }catch ( IOException ix ) {
+                    if ( log.isDebugEnabled() ) log.debug("",ix);
+                }
+            }
+        }
+    }
     // ---------------------------------------------- SocketProcessor Inner 
Class
 
 

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java?rev=430097&r1=430096&r2=430097&view=diff
==============================================================================
--- tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java 
(original)
+++ tomcat/tc6.0.x/trunk/java/org/apache/tomcat/util/net/SecureNioChannel.java 
Wed Aug  9 10:12:37 2006
@@ -31,28 +31,31 @@
     
     public SecureNioChannel(SocketChannel channel, SSLEngine engine, 
ApplicationBufferHandler bufHandler) throws IOException {
         super(channel,bufHandler);
-
         this.sslEngine = engine;
-
-        
         int appBufSize = sslEngine.getSession().getApplicationBufferSize();
         int netBufSize = sslEngine.getSession().getPacketBufferSize();
-        
+        //allocate network buffers - TODO, add in optional direct non-direct 
buffers
+        if ( netInBuffer == null ) netInBuffer = 
ByteBuffer.allocateDirect(netBufSize);
+        if ( netOutBuffer == null ) netOutBuffer = 
ByteBuffer.allocateDirect(netBufSize);
+
         //ensure that the application has a large enough read/write buffers
         //by doing this, we should not encounter any buffer overflow errors
         bufHandler.expand(bufHandler.getReadBuffer(), appBufSize);
         bufHandler.expand(bufHandler.getWriteBuffer(), appBufSize);
-        //allocate network buffers - TODO, add in optional direct buffers
-        this.netInBuffer = ByteBuffer.allocate(netBufSize);
-        this.netOutBuffer = ByteBuffer.allocate(netBufSize);
-        this.netOutBuffer.position(0);
-        this.netOutBuffer.limit(0);
-        this.netInBuffer.position(0);
-        this.netInBuffer.limit(0);
+        reset();
+    }
+    
+    public void reset() throws IOException {
+        super.reset();
+        netOutBuffer.position(0);
+        netOutBuffer.limit(0);
+        netInBuffer.position(0);
+        netInBuffer.limit(0);
 
         //initiate handshake
         sslEngine.beginHandshake();
         initHandshakeStatus = sslEngine.getHandshakeStatus();
+        
     }
     
 
//===========================================================================================
    



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to