Author: costin Date: Fri Nov 25 21:43:33 2005 New Revision: 349085 URL: http://svn.apache.org/viewcvs?rev=349085&view=rev Log: The nio endpoint. Uses the thread pool. Only accept is implemented - the polling of keep alive needs merging some code in the http11protocol.
Added: tomcat/sandbox/java/org/apache/tomcat/util/net/NioEndpoint.java Added: tomcat/sandbox/java/org/apache/tomcat/util/net/NioEndpoint.java URL: http://svn.apache.org/viewcvs/tomcat/sandbox/java/org/apache/tomcat/util/net/NioEndpoint.java?rev=349085&view=auto ============================================================================== --- tomcat/sandbox/java/org/apache/tomcat/util/net/NioEndpoint.java (added) +++ tomcat/sandbox/java/org/apache/tomcat/util/net/NioEndpoint.java Fri Nov 25 21:43:33 2005 @@ -0,0 +1,316 @@ +/* + * Copyright 1999-2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tomcat.util.net; + +import java.io.IOException; +import java.net.BindException; +import java.net.InetSocketAddress; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketAddress; +import java.nio.channels.ClosedChannelException; +import java.nio.channels.SelectableChannel; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.Iterator; +import java.util.Set; + +import org.apache.tomcat.util.threads.ThreadPool; +import org.apache.tomcat.util.threads.ThreadPoolRunnable; +import org.apache.tomcat.util.threads.ThreadWithAttributes; + + +/** All threads blocked in accept(). New thread created on demand. + * No use of ThreadPool or ServerSocketFactory. + * + * + */ +public class NioEndpoint extends SimpleEndpoint { + + private final Object threadSync = new Object(); + + // active acceptors + private int acceptors=0; + + ThreadPool tp; + + public NioEndpoint() { + tp=new ThreadPool(); + tp.setMinSpareThreads(2); + tp.setMaxSpareThreads(8); + } + + // -------------------- Configuration -------------------- + // -------------------- Thread pool -------------------- + + public void setMaxThreads(int maxThreads) { + if( maxThreads > 0) + tp.setMaxThreads(maxThreads); + } + + public int getMaxThreads() { + return tp.getMaxThreads(); + } + + public void setMaxSpareThreads(int maxThreads) { + if(maxThreads > 0) + tp.setMaxSpareThreads(maxThreads); + } + + public int getMaxSpareThreads() { + return tp.getMaxSpareThreads(); + } + + public void setMinSpareThreads(int minThreads) { + if(minThreads > 0) + tp.setMinSpareThreads(minThreads); + } + + public int getMinSpareThreads() { + return tp.getMinSpareThreads(); + } + + public void setThreadPriority(int threadPriority) { + tp.setThreadPriority(threadPriority); + } + + public int getThreadPriority() { + return tp.getThreadPriority(); + } + + public void setDaemon(boolean b) { + daemon=b; + tp.setDaemon( b ); + } + + public boolean getDaemon() { + return tp.getDaemon(); + } + + public String getName() { + return tp.getName(); + } + + public void setName(String name) { + tp.setName(name); + } + + + // ---------------------- + public String getStrategy() { + return "nio"; + } + + public int getCurrentThreadsBusy() { + return curThreads; + } + + // -------------------- Public methods -------------------- + + public void initEndpoint() throws IOException, InstantiationException { + try { + if(serverSocket==null) { + try { + ServerSocketChannel ssc=ServerSocketChannel.open(); + serverSocket = ssc.socket(); + SocketAddress sa = null; + if (inet == null) { + sa = new InetSocketAddress( port ); + } else { + sa = new InetSocketAddress(inet, port); + } + serverSocket.bind( sa , backlog); + } catch ( BindException be ) { + throw new BindException(be.getMessage() + ":" + port); + } + } + if( serverTimeout >= 0 ) + serverSocket.setSoTimeout( serverTimeout ); + + } catch( IOException ex ) { + throw ex; + } + initialized = true; + } + + public void startEndpoint() throws IOException, InstantiationException { + if (!initialized) { + initEndpoint(); + } + if( maxSpareThreads == minSpareThreads ) { + maxSpareThreads = minSpareThreads + 4; + } + running = true; + paused = false; + + tp.start(); + try { + selector = Selector.open(); + } catch (IOException e) { + e.printStackTrace(); + } + addSocketAccept( serverSocket, new SocketDispatch()); + Thread poller = new Thread( new PollerThread()); + poller.start(); + } + + + // -------------------------------------------------- Master Slave Methods + + + + + public boolean getPolling() { + return true; + } + + public void addPolling(Socket s, Object context ) { + + } + + + public void run() { + // nothing + } + + + public void addSocketRead(Socket s, Object o) throws IOException { + s.getChannel().register( selector, SelectionKey.OP_READ, o); + } + + public void addSocketAccept( ServerSocket ss, Object o) throws IOException { + ServerSocketChannel ssc=ss.getChannel(); + ssc.configureBlocking(false); + ssc.register( selector, SelectionKey.OP_ACCEPT, o); + } + + Selector selector; + + /** Uses NIO to implment selection. + * In addition to sockets, you can add other kind of objects. + * + * @author Costin Manolache + */ + class PollerThread implements Runnable { + + public PollerThread() { + } + + public void run() { + while( running ) { + + try { + int selRes = selector.select(); + + if( selRes == 0 ) { + System.err.println("Select with 0 keys " + + selector.keys().size() ); + for( SelectionKey k : selector.keys() ) { + System.err.println("K " + k.interestOps() + + " " + k.readyOps() + " " + k.toString() + " " + + k.isValid() ); + } + continue; + } + + Set selected = selector.selectedKeys(); + Iterator selI = selected.iterator(); + + while( selI.hasNext() ) { + SelectionKey sk = (SelectionKey)selI.next(); + selI.remove(); + Object skAt = sk.attachment(); + + int readyOps = sk.readyOps(); + SelectableChannel sc = sk.channel(); + + // TODO: use the attachment to decide what's to do. + if( sk.isAcceptable() ) { + ServerSocketChannel ssc=(ServerSocketChannel)sc; + SocketChannel sockC = ssc.accept(); + + + // process the connection in the thread pool + if( skAt instanceof ThreadPoolRunnable ) { + tp.runIt( (ThreadPoolRunnable) skAt, sockC); + } + //sk.interestOps( sk.interestOps() | + // SelectionKey.OP_ACCEPT ); + System.err.println( sk.interestOps() ); + + continue; + } + + // TODO: this is for keep alive + if( sk.isReadable() ) { + SocketChannel sockC = (SocketChannel)sc; + + // Incoming data on keep-alive connection. + continue; + } + + // dispatch the socket to a pool thread + System.err.println("Select: " + readyOps); + } + + } catch (IOException e) { + e.printStackTrace(); + } + + } + + } + + } + + class SocketDispatch implements ThreadPoolRunnable { + + public Object[] getInitData() { + // no synchronization overhead, but 2 array access + Object obj[]=new Object[2]; + obj[1]= getConnectionHandler().init(); + obj[0]=new TcpConnection(); + return obj; + } + + public void runIt(Object perThrData[]) { + ThreadWithAttributes t=(ThreadWithAttributes)Thread.currentThread(); + + SocketChannel sc=(SocketChannel)t.getParam(tp); + if (isRunning()) { + // Loop if endpoint is paused + while (isPaused()) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // Ignore + } + } + + if (null != sc) { + processSocket(sc.socket(), (TcpConnection) perThrData[0], + (Object[]) perThrData[1]); + } + + } + } + + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]