Author: costin Date: Fri Aug 29 21:33:15 2008 New Revision: 690460 URL: http://svn.apache.org/viewvc?rev=690460&view=rev Log: Initial support for plugging in ssl
Modified: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java Modified: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java?rev=690460&r1=690459&r2=690460&view=diff ============================================================================== --- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java (original) +++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorCallback.java Fri Aug 29 21:33:15 2008 @@ -31,12 +31,13 @@ * seems cleaner and it's nice to be able to use APR more portably. * ( older version used long - but non-blocking connect needs a second param ) */ -public class SelectorCallback { +public class SelectorCallback implements SelectorThread.IOThreadRunnable, + SelectorThread.DataEvents { /** * Called when the protocol is connected. */ - public void connected(SelectorData selThread) + public void connected(SelectorData sdata) throws IOException { } @@ -44,34 +45,34 @@ * It is possible to write data. * For both read and write - re-enable interest if you want more data. */ - public void dataWriteable(SelectorData selThread) throws IOException { + public void dataWriteable(SelectorData sdata) throws IOException { } /** * Data available for read. * For both read and write - re-enable interest if you want more data. */ - public void dataReceived(SelectorData selThread) throws IOException { + public void dataReceived(SelectorData sdata) throws IOException { } /** * nextTimeEvent reached. */ - public void timeEvent(SelectorData selThread) { + public void timeEvent(SelectorData sdata) { } /** * @throws IOException * */ - public void ioThreadRun(SelectorData selThread) throws IOException { + public void ioThreadRun(SelectorData sdata) throws IOException { } /** * Close was detected, or an unhandled exception happened while processing * this callback. */ - public void channelClosed(SelectorData selThread, Throwable ex) { + public void channelClosed(SelectorData sdata, Throwable ex) { } /** @@ -83,7 +84,7 @@ * TODO: is there any case where something else besides registering read * interest on the new connection is needed ? Maybe it could read some data ? */ - public SelectorCallback connectionAccepted(SelectorData selThread, + public SelectorCallback connectionAccepted(SelectorData sdata, Channel sockC) { return null; } Modified: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java?rev=690460&r1=690459&r2=690460&view=diff ============================================================================== --- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java (original) +++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThread.java Fri Aug 29 21:33:15 2008 @@ -74,7 +74,11 @@ int lastReadResult; int zeroReads = 0; int lastWriteResult; + + public SelectorFilter ssl; + public int suspended = 0; + public String toString() { StringBuffer sb = new StringBuffer(); sb.append("SelData: ") @@ -85,6 +89,47 @@ return sb.toString(); } } + + public static interface IOThreadRunnable { + public void ioThreadRun(SelectorData selThread) throws IOException; + } + + public static interface DataEvents { + /** + * Called by selector thread when it can write data. + * SSL may consume sending data for negotiation. + * @throws IOException + */ + public void dataWriteable(SelectorData sdata) throws IOException; + + public void dataReceived(SelectorData sdata) throws IOException; + } + + public static interface DataChannel { + public int writeNonBlocking(SelectorThread st, SelectorData sdata, + ByteBuffer bb) throws IOException; + + public int readNonBlocking(SelectorThread st, SelectorData sdata, + ByteBuffer bb) throws IOException; + + } + + /** + * Used for non-blocking SSL, gzip, etc + */ + public static interface SelectorFilter extends DataEvents, DataChannel { + } + + public static interface SslSupport extends SelectorFilter { + + // For CONNECT, etc + public void startSSL(); + + // To force a new handshake + public void handshake(); + + // TODO: get certs, etc + } // ----------- IO handling ----------- protected long inactivityTimeout = 5000; @@ -118,8 +163,8 @@ * @param sc * @param nextTimer time to call the timeEvent() callback */ - public void setTimerEventTime(SelectorData selectorData, long nextTimer) { - selectorData.nextTimeEvent = nextTimer; + public void setTimerEventTime(SelectorData sdata, long nextTimer) { + sdata.nextTimeEvent = nextTimer; } public int readNonBlocking(SelectorData sc, ByteBuffer bb) @@ -151,7 +196,10 @@ { } - public void ioThreadRun(SelectorData sdata) throws IOException { + public void runInSelectorThread(SelectorData sdata) throws IOException { + } + + public void runInSelectorThread(Runnable cb) throws IOException { } /** @@ -174,11 +222,11 @@ public void readInterest(SelectorData sc, boolean readInterest) throws IOException { } - public int getPort(SelectorData sd, boolean remote) { + public int getPort(SelectorData sdata, boolean remote) { return 0; } - public InetAddress getAddress(SelectorData sd, boolean remote) { + public InetAddress getAddress(SelectorData sdata, boolean remote) { return null; } } \ No newline at end of file Modified: tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java URL: http://svn.apache.org/viewvc/tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java?rev=690460&r1=690459&r2=690460&view=diff ============================================================================== --- tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java (original) +++ tomcat/sandbox/tomcat-lite/coyote-extensions/org/apache/tomcat/util/net/SelectorThreadNio.java Fri Aug 29 21:33:15 2008 @@ -39,7 +39,9 @@ import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.tomcat.util.modeler.Registry; +import org.apache.tomcat.util.ObjectManager; +import org.apache.tomcat.util.net.SelectorThread.IOThreadRunnable; +import org.apache.tomcat.util.net.SelectorThread.SelectorData; /** * NIO implementation. @@ -60,6 +62,8 @@ new ArrayList<SelectorData>(); ArrayList<SelectorData> runInterest = new ArrayList<SelectorData>(); + ArrayList<Runnable> runnableInterest = + new ArrayList<Runnable>(); AtomicInteger opened = new AtomicInteger(); AtomicInteger closed = new AtomicInteger(); @@ -69,7 +73,7 @@ // thread ArrayList<SelectorData> active = new ArrayList<SelectorData>(); - boolean debug = false; + boolean debug = true; boolean running = true; long lastWakeup = System.currentTimeMillis(); // last time we woke @@ -105,12 +109,8 @@ public void setName(String n) { selectorThread.setName(n); - Registry registry = Registry.getRegistry(null, null); - try { - registry.registerComponent(this, ":name=" + n, "SelectorThread"); - } catch (Exception e) { - e.printStackTrace(); - } + ObjectManager.get().registerObject(this, "SelectorThread-" + n, + "SelectorThread"); } /** @@ -213,8 +213,8 @@ //} //checkChannelKey(cstate); if (sdata.selKey != sk || sdata.channelData != sk.channel()) { - sdata.selKey = sk; - sdata.channelData = sk.channel(); + sdata.selKey = sk; + sdata.channelData = sk.channel(); } if (sk.isValid() && sk.isAcceptable()) { @@ -264,7 +264,11 @@ + sk.readyOps() + " " + cstate + " " + sk); } - cstate.dataWriteable(sdata); + if(sdata.ssl != null) { + sdata.ssl.dataWriteable(sdata); + } else { + cstate.dataWriteable(sdata); + } if (sdata.lastWriteResult > 0 && sdata.writeInterest) { @@ -273,7 +277,7 @@ } } - if (sk.isReadable()) { + if (sk.isValid() && sk.isReadable()) { sdata.lastReadResult = 0; if (debug) { log.info("dataReceived " + selected @@ -282,12 +286,11 @@ + sk.readyOps() + " " + cstate + " " + sk); } - cstate.dataReceived(sdata); -// if (cstate.selectorData.lastReadResult > 0 && -// cstate.selectorData.readInterest) { -// log.warning("SelectorThread: read interest" + -// " after incomplete read"); -// } + if (sdata.ssl != null) { + sdata.ssl.dataReceived(sdata); + } else { + cstate.dataReceived(sdata); + } } } catch (Throwable t) { t.printStackTrace(); @@ -449,12 +452,22 @@ throws IOException { try { int off = bb.position(); - int done = ((SocketChannel) selectorData.channelData).read(bb); + + int done = 0; + + if (selectorData.ssl != null) { + done = selectorData.ssl.readNonBlocking(this, selectorData, bb); + } else { + done = ((SocketChannel) selectorData.channelData).read(bb); + } + + if (debug) { log.info("-------------readNB rd=" + done + " bb.limit=" + bb.limit() + " pos=" + bb.position() + " " + selectorData.callback); } if (done > 0) { + if (debug) { String s = new String(bb.array(), off, bb.position() - off); @@ -495,6 +508,11 @@ public int writeNonBlocking(SelectorData selectorData, ByteBuffer bb) throws IOException { try { + if (selectorData.suspended != 0) { + // for example some SSL negotiation is going on + log.info("Suspended - no write possible"); + return 0; + } if (debug) { log.info("writeNB pos=" + bb.position() + " len=" + (bb.limit() - bb.position()) + " " + selectorData.callback); @@ -504,7 +522,13 @@ bb.limit() - bb.position()); log.info("Data:\n" + s); } - int done = ((SocketChannel) selectorData.channelData).write(bb); + + int done = 0; + if (selectorData.ssl != null) { + done = selectorData.ssl.writeNonBlocking(this, selectorData, bb); + } else { + done = ((SocketChannel) selectorData.channelData).write(bb); + } selectorData.lastWriteResult = done; return done; } catch(IOException ex) { @@ -551,6 +575,8 @@ selectorData.channelData = socketChannel; selectorData.channelData = socketChannel; // no key + // TODO: add SSL filter + socketChannel.connect(new InetSocketAddress(host, port)); opened.incrementAndGet(); @@ -618,6 +644,7 @@ if( serverTimeout >= 0 ) { serverSocket.setSoTimeout( serverTimeout ); } + ssc.configureBlocking(false); @@ -625,7 +652,8 @@ selectorData.channelData = ssc; // no key yet selectorData.callback = cstate; // key will be set in pending - + + // TODO: add SSL here synchronized (connectAcceptInterest) { connectAcceptInterest.add(selectorData); @@ -633,7 +661,9 @@ selector.wakeup(); } - public void ioThreadRun(SelectorData sdata) throws IOException { + + @Override + public void runInSelectorThread(SelectorData sdata) throws IOException { if (isSelectorThread()) { sdata.callback.ioThreadRun(sdata); } else { @@ -643,6 +673,20 @@ selector.wakeup(); } } + + @Override + public void runInSelectorThread(Runnable cb) throws IOException { + if (isSelectorThread()) { + cb.run(); + } else { + synchronized (runnableInterest) { + runnableInterest.add(cb); + } + selector.wakeup(); + } + + } + /** * Example config: @@ -752,6 +796,8 @@ @Override public void writeInterest(SelectorData selectorData, boolean b) { + // TODO: suspended ? + SelectionKey sk = (SelectionKey) selectorData.selKey; if (!sk.isValid()) { return; @@ -947,6 +993,23 @@ runInterest.clear(); } } + if (runnableInterest.size() > 0) { + synchronized (runnableInterest) { + Iterator<Runnable> ci = runnableInterest.iterator(); + while (ci.hasNext()) { + Runnable cstate = ci.next(); + try { + cstate.run(); + } catch (Throwable t) { + t.printStackTrace(); + } + if (debug) { + log.info("Run in selthread: " + cstate); + } + } + runnableInterest.clear(); + } + } processPendingUpdateCallback(); } @@ -980,4 +1043,11 @@ } } } + + // SSL support - TODO: move to separate class, make it pluggable + // This uses plain java APIs + + + + } \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]