Author: costin
Date: Fri Nov 25 21:43:33 2005
New Revision: 349085

URL: http://svn.apache.org/viewcvs?rev=349085&view=rev
Log:
The nio endpoint. Uses the thread pool. Only accept is implemented - the
polling of keep alive needs merging some code in the http11protocol.

Added:
    tomcat/sandbox/java/org/apache/tomcat/util/net/NioEndpoint.java

Added: tomcat/sandbox/java/org/apache/tomcat/util/net/NioEndpoint.java
URL: 
http://svn.apache.org/viewcvs/tomcat/sandbox/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=349085&view=auto
==============================================================================
--- tomcat/sandbox/java/org/apache/tomcat/util/net/NioEndpoint.java (added)
+++ tomcat/sandbox/java/org/apache/tomcat/util/net/NioEndpoint.java Fri Nov 25 
21:43:33 2005
@@ -0,0 +1,316 @@
+/*
+ *  Copyright 1999-2004 The Apache Software Foundation
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.tomcat.util.net;
+
+import java.io.IOException;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.apache.tomcat.util.threads.ThreadPool;
+import org.apache.tomcat.util.threads.ThreadPoolRunnable;
+import org.apache.tomcat.util.threads.ThreadWithAttributes;
+
+
+/** All threads blocked in accept(). New thread created on demand.
+ * No use of ThreadPool or ServerSocketFactory.
+ * 
+ * 
+ */
+public class NioEndpoint extends SimpleEndpoint { 
+
+    private final Object threadSync = new Object();
+
+    // active acceptors
+    private int acceptors=0;
+    
+    ThreadPool tp;
+    
+    public NioEndpoint() {
+        tp=new ThreadPool();
+        tp.setMinSpareThreads(2);
+        tp.setMaxSpareThreads(8);
+    }
+
+    // -------------------- Configuration --------------------
+    // -------------------- Thread pool --------------------
+
+    public void setMaxThreads(int maxThreads) {
+        if( maxThreads > 0)
+            tp.setMaxThreads(maxThreads);
+    }
+
+    public int getMaxThreads() {
+        return tp.getMaxThreads();
+    }
+
+    public void setMaxSpareThreads(int maxThreads) {
+        if(maxThreads > 0) 
+            tp.setMaxSpareThreads(maxThreads);
+    }
+
+    public int getMaxSpareThreads() {
+        return tp.getMaxSpareThreads();
+    }
+
+    public void setMinSpareThreads(int minThreads) {
+        if(minThreads > 0) 
+            tp.setMinSpareThreads(minThreads);
+    }
+
+    public int getMinSpareThreads() {
+        return tp.getMinSpareThreads();
+    }
+
+    public void setThreadPriority(int threadPriority) {
+      tp.setThreadPriority(threadPriority);
+    }
+
+    public int getThreadPriority() {
+      return tp.getThreadPriority();
+    }
+
+    public void setDaemon(boolean b) {
+        daemon=b;
+        tp.setDaemon( b );
+    }
+    
+    public boolean getDaemon() {
+        return tp.getDaemon();
+    }
+
+    public String getName() {
+        return tp.getName();
+    }
+
+    public void setName(String name) {
+        tp.setName(name);
+    }
+
+    
+    // ---------------------- 
+    public String getStrategy() {
+        return "nio";
+    }
+    
+    public int getCurrentThreadsBusy() {
+        return curThreads;
+    }
+    
+    // -------------------- Public methods --------------------
+    
+    public void initEndpoint() throws IOException, InstantiationException {
+        try {
+            if(serverSocket==null) {
+                try {
+                    ServerSocketChannel ssc=ServerSocketChannel.open();
+                    serverSocket = ssc.socket();
+                    SocketAddress sa = null;
+                    if (inet == null) {
+                        sa = new InetSocketAddress( port );
+                    } else {
+                        sa = new InetSocketAddress(inet, port);
+                    }
+                    serverSocket.bind( sa , backlog);
+                } catch ( BindException be ) {
+                    throw new BindException(be.getMessage() + ":" + port);
+                }
+            }
+            if( serverTimeout >= 0 )
+                serverSocket.setSoTimeout( serverTimeout );
+            
+        } catch( IOException ex ) {
+            throw ex;
+        }
+        initialized = true;
+    }
+
+    public void startEndpoint() throws IOException, InstantiationException {
+        if (!initialized) {
+            initEndpoint();
+        }
+        if( maxSpareThreads == minSpareThreads ) {
+            maxSpareThreads = minSpareThreads + 4;
+        }
+        running = true;
+        paused = false;
+        
+        tp.start();
+        try {
+            selector = Selector.open();
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        addSocketAccept( serverSocket, new SocketDispatch());
+        Thread poller = new Thread( new PollerThread());
+        poller.start();
+    }
+
+
+    // -------------------------------------------------- Master Slave Methods
+
+    
+
+
+    public boolean getPolling() {
+        return true;
+    }
+    
+    public void addPolling(Socket s, Object context ) {
+        
+    }
+
+    
+    public void run() {
+        // nothing 
+    }
+    
+
+    public void addSocketRead(Socket s, Object o) throws IOException {
+        s.getChannel().register( selector, SelectionKey.OP_READ, o);        
+    }
+    
+    public void addSocketAccept( ServerSocket ss, Object o) throws IOException 
{
+        ServerSocketChannel ssc=ss.getChannel();
+        ssc.configureBlocking(false);
+        ssc.register( selector, SelectionKey.OP_ACCEPT, o);
+    }
+
+    Selector selector;
+    
+    /** Uses NIO to implment selection.
+     *  In addition to sockets, you can add other kind of objects.
+     *  
+     * @author Costin Manolache
+     */
+    class PollerThread implements Runnable {
+
+        public PollerThread() {
+        }
+               
+        public void run() {
+            while( running ) {
+                
+                try {
+                    int selRes = selector.select();
+
+                    if( selRes == 0 ) {
+                        System.err.println("Select with 0 keys " + 
+                                selector.keys().size() );
+                        for( SelectionKey k : selector.keys() ) {
+                            System.err.println("K " + k.interestOps() +
+                                    " " + k.readyOps() + " " + k.toString() + 
" "
+                                    + k.isValid() );
+                        }
+                        continue;
+                    }
+                    
+                    Set selected = selector.selectedKeys();
+                    Iterator selI = selected.iterator();
+                    
+                    while( selI.hasNext() ) {
+                        SelectionKey sk = (SelectionKey)selI.next();
+                        selI.remove();
+                        Object skAt = sk.attachment();
+                        
+                        int readyOps = sk.readyOps();
+                        SelectableChannel sc = sk.channel();
+                        
+                        // TODO: use the attachment to decide what's to do.
+                        if( sk.isAcceptable() ) {
+                            ServerSocketChannel ssc=(ServerSocketChannel)sc;
+                            SocketChannel sockC = ssc.accept();
+                            
+                            
+                            // process the connection in the thread pool
+                            if( skAt instanceof ThreadPoolRunnable ) {
+                                tp.runIt( (ThreadPoolRunnable) skAt, sockC);
+                            }
+                            //sk.interestOps( sk.interestOps() | 
+                            //        SelectionKey.OP_ACCEPT );
+                            System.err.println( sk.interestOps() ); 
+
+                            continue;
+                        }
+
+                        // TODO: this is for keep alive
+                        if( sk.isReadable() ) {
+                            SocketChannel sockC = (SocketChannel)sc;
+                            
+                            // Incoming data on keep-alive connection.
+                            continue;
+                        }
+                        
+                        // dispatch the socket to a pool thread
+                        System.err.println("Select: " + readyOps);
+                    }
+                    
+                } catch (IOException e) {
+                    e.printStackTrace();
+                }
+                
+            }
+            
+        }
+        
+    }
+    
+    class SocketDispatch implements ThreadPoolRunnable {
+
+        public Object[] getInitData() {
+            // no synchronization overhead, but 2 array access 
+            Object obj[]=new Object[2];
+            obj[1]= getConnectionHandler().init();
+            obj[0]=new TcpConnection();
+            return obj;
+        }
+        
+        public void runIt(Object perThrData[]) {
+            ThreadWithAttributes 
t=(ThreadWithAttributes)Thread.currentThread();
+            
+            SocketChannel sc=(SocketChannel)t.getParam(tp);
+            if (isRunning()) {
+                // Loop if endpoint is paused
+                while (isPaused()) {
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+                        // Ignore
+                    }
+                }
+
+                if (null != sc) {
+                    processSocket(sc.socket(), (TcpConnection) perThrData[0], 
+                            (Object[]) perThrData[1]);
+                }
+
+            }
+        }
+        
+    }
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to