Author: trustin Date: Sat Dec 4 22:25:42 2004 New Revision: 109861 URL: http://svn.apache.org/viewcvs?view=rev&rev=109861 Log: * Optimized TcpIoProcessor by using Queue instead of ArrayList * Fixed: problem in high traffic situation * Changed the interface of SessionHandler * added dataWritten * removed markRemoved * passes the number of read/written bytes * Changed the interface of Session * removed flush(mark) * added isIdle(...)
TODO: * Implement TcpConnector * Fix 100% CPU consumption when read buffer is full and user don't clear it. Modified: incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/Session.java incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java Modified: incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java?view=diff&rev=109861&p1=incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java&r1=109860&p2=incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java&r2=109861 ============================================================================== --- incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java (original) +++ incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java Sat Dec 4 22:25:42 2004 @@ -34,15 +34,21 @@ cause.printStackTrace(System.out); } - public void dataRead(Session session, ByteBuffer buf) { - System.out.println(session.getRemoteAddress() + ": READ (" + buf.remaining() + " B)"); - session.getWriteBuffer().put(buf); - session.flush(); + public void dataRead(Session session, ByteBuffer buf, int readBytes) { + System.out.println(session.getRemoteAddress() + ": READ (" + readBytes + " B)"); + if (buf.remaining() <= session.getWriteBuffer().remaining()) { + session.getWriteBuffer().put(buf); + session.flush(); + } } - public void markRemoved(Session session, Object mark) { - } - - public void writeBufferAvailable(Session session) { + public void dataWritten(Session session, ByteBuffer buf, int writtenBytes) { + System.out.println(session.getRemoteAddress() + ": WRITTEN (" + writtenBytes + "B)"); + ByteBuffer readBuf = session.getReadBuffer(); + System.out.println(readBuf.remaining() + " " + buf.remaining()); + if (readBuf.remaining() <= buf.remaining()) { + buf.put(readBuf); + session.flush(); + } } } Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java?view=diff&rev=109861&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java&r1=109860&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java&r2=109861 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java Sat Dec 4 22:25:42 2004 @@ -26,18 +26,14 @@ * @author Trustin Lee (http://gleamynode.net/dev/) */ public class ByteBufferPool { - static final int DEFAULT_BUF_SIZE = 8192; + public static final int CAPACITY = 8192; private static Queue buffers = new Queue(16); - static { - buffers.open(); - } - public static synchronized ByteBuffer open() { ByteBuffer buf = (ByteBuffer) buffers.pop(); if (buf == null) { - buf = ByteBuffer.allocateDirect(DEFAULT_BUF_SIZE); + buf = ByteBuffer.allocateDirect(CAPACITY); } else { buf.clear(); } Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java?view=diff&rev=109861&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java&r1=109860&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java&r2=109861 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java Sat Dec 4 22:25:42 2004 @@ -40,7 +40,6 @@ private int first = 0; private int last = 0; private int size = 0; - private boolean open = false; /** * Construct a new, empty <code>Queue</code> with the specified initial @@ -50,20 +49,10 @@ items = new Object[initialCapacity]; } - public void open() { - clear(); - open = true; - } - - public void close() { - open = false; - clear(); - } - /** * Clears this queue. */ - private void clear() { + public void clear() { Arrays.fill(items, null); first = 0; last = 0; @@ -93,11 +82,7 @@ /** * Enqueue into this queue. */ - public boolean push(Object obj) { - if (!open) { - return false; - } - + public void push(Object obj) { if (size == items.length) { // expand queue final int oldLen = items.length; @@ -118,7 +103,6 @@ items[last] = obj; last = (last + 1) % items.length; size++; - return true; } /** @@ -128,10 +112,6 @@ * really <code>null</code>. */ public Object first() { - if (!open) { - return null; - } - if (size == 0) { return null; } Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java?view=diff&rev=109861&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java&r1=109860&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java&r2=109861 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java Sat Dec 4 22:25:42 2004 @@ -20,9 +20,9 @@ package org.apache.netty.downstream; import java.net.SocketAddress; - import java.nio.ByteBuffer; +import org.apache.netty.common.IdleStatus; import org.apache.netty.common.SessionConfig; @@ -42,10 +42,8 @@ ByteBuffer getReadBuffer(); ByteBuffer getWriteBuffer(); - - void flush(); - void flush(Object mark); + void flush(); boolean isConnected(); @@ -66,4 +64,6 @@ long getLastReadTime(); long getLastWriteTime(); + + boolean isIdle(IdleStatus status); } Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java?view=diff&rev=109861&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java&r1=109860&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java&r2=109861 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java Sat Dec 4 22:25:42 2004 @@ -39,9 +39,7 @@ void exceptionCaught(Session session, Throwable cause); - void dataRead(Session session, ByteBuffer buf); + void dataRead(Session session, ByteBuffer readBuf, int readBytes); - void markRemoved(Session session, Object mark); - - void writeBufferAvailable(Session session); + void dataWritten(Session session, ByteBuffer writeBuf, int writtenBytes); } Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java?view=diff&rev=109861&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java&r1=109860&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java&r2=109861 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java Sat Dec 4 22:25:42 2004 @@ -1,4 +1,20 @@ /* + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +/* * @(#) $Id$ */ package org.apache.netty.downstream.impl.tcp; @@ -8,291 +24,384 @@ import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; -import java.util.ArrayList; import java.util.Iterator; -import java.util.List; import java.util.Set; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.netty.common.IdleStatus; +import org.apache.netty.common.SessionConfig; +import org.apache.netty.common.util.ByteBufferPool; +import org.apache.netty.common.util.Queue; + /** * TODO Document me. - * TODO Implement idleTime/bufferWritable/ + * TODO Implement markRemoved + * * @author Trustin Lee ([EMAIL PROTECTED]) - * @version $Rev$, $Date$, + * @version $Rev$, $Date$, */ class TcpIoProcessor { - private static final Log log = LogFactory.getLog(TcpIoProcessor.class); - private static final TcpIoProcessor instance; - - static { - TcpIoProcessor tmp; - try { - tmp = new TcpIoProcessor(); - } catch (IOException e) { - tmp = null; - } - - instance = tmp; - } - - public static TcpIoProcessor getInstance() throws IOException { - if (instance == null) - throw new IOException("Failed to open selector."); - return instance; - } - - private final Selector selector; - private final List newSessions = new ArrayList(); - private final List removingSessions = new ArrayList(); - private final List flushingSessions = new ArrayList(); - private Worker worker; - - private TcpIoProcessor() throws IOException { - selector = Selector.open(); - } - - public void addSession(TcpSession session) { - if (worker == null) { - synchronized (this) { - if (worker == null) { - worker = new Worker(); - worker.start(); - } - } - } - - synchronized (newSessions) { - newSessions.add(session); - } - selector.wakeup(); - } - - public void removeSession(TcpSession session) { - synchronized (removingSessions) { - removingSessions.add(session); - } - selector.wakeup(); - } - - public void flushSession(TcpSession session) { - synchronized (flushingSessions) { - flushingSessions.add(session); - } - selector.wakeup(); - } - - private class Worker extends Thread { - public Worker() { - super("TcpIoProcessor"); - setDaemon(true); - } - - public void run() { - for (;;) { - try { - int nKeys = selector.select(); - addSessions(); - if (nKeys > 0) { - processSessions(selector.selectedKeys()); - } - flushSessions(); - removeSessions(); - } catch (IOException e) { - log.error("Unexpected exception.", e); - try { - Thread.sleep(1000); - } catch (InterruptedException e1) { - } - } - } - } - } - - private void addSessions() { - if (newSessions.size() == 0) - return; - - synchronized (newSessions) { - Iterator it = newSessions.iterator(); - while (it.hasNext()) { - TcpSession session = (TcpSession) it.next(); - SocketChannel ch = session.getChannel(); - boolean registered; - try { - ch.configureBlocking(false); - session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session)); - registered = true; - } catch (IOException e) { - registered = false; - fireExceptionCaught(session, e); - } - - if (registered) { - fireSessionOpened(session); - } - } - - newSessions.clear(); - } - } - - private void removeSessions() { - if (removingSessions.size() == 0) - return; - - synchronized (removingSessions) { - Iterator it = removingSessions.iterator(); - while (it.hasNext()) { - TcpSession session = (TcpSession) it.next(); - SocketChannel ch = session.getChannel(); - session.getSelectionKey().cancel(); - session.dispose(); - try { - ch.close(); - } catch (IOException e) { - fireExceptionCaught(session, e); - } finally { - fireSessionClosed(session); - } - } - - removingSessions.clear(); - } - } - - private void processSessions(Set selectedKeys) { - Iterator it = selectedKeys.iterator(); - while (it.hasNext()) { - SelectionKey key = (SelectionKey) it.next(); - it.remove(); - TcpSession session = (TcpSession) key.attachment(); - if (key.isReadable()) { - read(session); - } else if (key.isWritable()) { - scheduleFlush(session); - } - } - selectedKeys.clear(); - } - - private void read(TcpSession session) { - ByteBuffer readBuf = session.getReadBuffer(); - SocketChannel ch = session.getChannel(); - try { - int readBytes = 0; - int ret; - - synchronized (readBuf) { - while ((ret = ch.read(readBuf)) > 0) { - readBytes += ret; - } - - if (readBytes > 0) { - session.increaseReadBytes(readBytes); - readBuf.flip(); - fireDataRead(session, readBuf); - readBuf.compact(); - } - } - - if (ret < 0) { - scheduleRemove(session); - } - } catch (IOException e) { - fireExceptionCaught(session, e); - } - } - - private void scheduleRemove(TcpSession session) { - synchronized (removingSessions) { - removingSessions.add(session); - } - } - - private void scheduleFlush(TcpSession session) { - session.getSelectionKey().interestOps(SelectionKey.OP_READ); - synchronized (flushingSessions) { - flushingSessions.add(session); - } - } - - private void flushSessions() { - if (flushingSessions.size() == 0) - return; - - synchronized (flushingSessions) { - Iterator it = flushingSessions.iterator(); - while (it.hasNext()) { - TcpSession session = (TcpSession) it.next(); - if (session.isClosed()) - continue; - - flush(session); - } - - flushingSessions.clear(); - } - } - - private void flush(TcpSession session) { - ByteBuffer writeBuf = session.getWriteBuffer(); - SocketChannel ch = session.getChannel(); - - try { - synchronized (writeBuf) { - writeBuf.flip(); - int nBytes = ch.write(writeBuf); - if (nBytes > 0) - session.increaseWrittenBytes(nBytes); - - int remaining = writeBuf.remaining(); - if (remaining > 0){ - // Kernel buffer is full - session.getSelectionKey().interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); - } - - writeBuf.compact(); - } - } catch (IOException e) { - fireExceptionCaught(session, e); - } - } - - private void fireSessionOpened(TcpSession session) { - try { - session.getHandler().sessionOpened(session); - } catch (Throwable e) { - fireExceptionCaught(session, e); - } - } - - private void fireSessionClosed(TcpSession session) { - try { - session.getHandler().sessionClosed(session); - } catch (Throwable e) { - fireExceptionCaught(session, e); - } - } - - private void fireDataRead(TcpSession session, ByteBuffer readBuf) { - try { - session.getHandler().dataRead(session, readBuf); - } catch (Throwable e) { - fireExceptionCaught(session, e); - } - } - - private void fireExceptionCaught(TcpSession session, Throwable cause) { - try { - session.getHandler().exceptionCaught(session, cause); - if (cause instanceof IOException) { - scheduleRemove(session); - } - } catch (Throwable t) { - log.error("Exception from excaptionCaught.", t); - } - } + private static final Log log = LogFactory.getLog(TcpIoProcessor.class); + private static final TcpIoProcessor instance; + + static { + TcpIoProcessor tmp; + try { + tmp = new TcpIoProcessor(); + } catch (IOException e) { + log.fatal("Failed to open selector.", e); + tmp = null; + } + + instance = tmp; + } + + private final Selector selector; + private final Queue newSessions = new Queue(16); + private final Queue removingSessions = new Queue(16); + private final Queue flushingSessions = new Queue(16); + private Worker worker; + private long lastIdleCheckTime = System.currentTimeMillis(); + + private TcpIoProcessor() throws IOException { + selector = Selector.open(); + } + + public static TcpIoProcessor getInstance() { + return instance; + } + + public void addSession(TcpSession session) { + if (worker == null) { + synchronized (this) { + if (worker == null) { + worker = new Worker(); + worker.start(); + } + } + } + + synchronized (newSessions) { + newSessions.push(session); + } + + selector.wakeup(); + } + + public void removeSession(TcpSession session) { + scheduleRemove(session); + selector.wakeup(); + } + + public void flushSession(TcpSession session) { + scheduleFlush(session); + selector.wakeup(); + } + + private void addSessions() { + if (newSessions.size() == 0) + return; + + TcpSession session; + + for (;;) { + synchronized (newSessions) { + session = (TcpSession) newSessions.pop(); + } + + if (session == null) + break; + + SocketChannel ch = session.getChannel(); + boolean registered; + + try { + ch.configureBlocking(false); + ch.socket().setSendBufferSize(ByteBufferPool.CAPACITY); + ch.socket().setReceiveBufferSize(ByteBufferPool.CAPACITY); + session.setSelectionKey(ch.register(selector, + SelectionKey.OP_READ, + session)); + registered = true; + } catch (IOException e) { + registered = false; + fireExceptionCaught(session, e); + } + + if (registered) { + fireSessionOpened(session); + } + } + } + + private void removeSessions() { + if (removingSessions.size() == 0) + return; + + for (;;) { + TcpSession session; + + synchronized (removingSessions) { + session = (TcpSession) removingSessions.pop(); + } + + if (session == null) + break; + + SocketChannel ch = session.getChannel(); + session.getSelectionKey().cancel(); + session.dispose(); + + try { + ch.close(); + } catch (IOException e) { + fireExceptionCaught(session, e); + } finally { + fireSessionClosed(session); + } + } + } + + private void processSessions(Set selectedKeys) { + Iterator it = selectedKeys.iterator(); + + while (it.hasNext()) { + SelectionKey key = (SelectionKey) it.next(); + TcpSession session = (TcpSession) key.attachment(); + + if (key.isReadable()) { + read(session); + } + + if (key.isWritable()) { + scheduleFlush(session); + } + } + + selectedKeys.clear(); + } + + private void read(TcpSession session) { + ByteBuffer readBuf = session.getReadBuffer(); + SocketChannel ch = session.getChannel(); + + try { + int readBytes = 0; + int ret; + + synchronized (readBuf) { + readBuf.compact(); + try { + while ((ret = ch.read(readBuf)) > 0) { + readBytes += ret; + } + } finally { + readBuf.flip(); + } + + session.increaseReadBytes(readBytes); + if (ret >= 0) + fireDataRead(session, readBuf, readBytes); + else + scheduleRemove(session); + } + } catch (Throwable e) { + fireExceptionCaught(session, e); + } + } + + private void scheduleRemove(TcpSession session) { + synchronized (removingSessions) { + removingSessions.push(session); + } + } + + private void scheduleFlush(TcpSession session) { + synchronized (flushingSessions) { + flushingSessions.push(session); + } + } + + private void notifyIdleSessions() { + Set keys = selector.keys(); + Iterator it; + TcpSession session; + + // process idle sessions + long currentTime = System.currentTimeMillis(); + + if ((keys != null) && ((currentTime - lastIdleCheckTime) >= 1000)) { + lastIdleCheckTime = currentTime; + it = keys.iterator(); + + while (it.hasNext()) { + SelectionKey key = (SelectionKey) it.next(); + session = (TcpSession) key.attachment(); + + notifyIdleSession(session, currentTime); + } + } + } + + private void notifyIdleSession(TcpSession session, long currentTime) { + SessionConfig config = session.getConfig(); + notifyIdleSession0(session, currentTime, + config.getIdleTimeInMillis(IdleStatus.BOTH_IDLE), + IdleStatus.BOTH_IDLE, session.getLastIoTime()); + notifyIdleSession0(session, currentTime, + config.getIdleTimeInMillis(IdleStatus.READER_IDLE), + IdleStatus.READER_IDLE, session.getLastReadTime()); + notifyIdleSession0(session, currentTime, + config.getIdleTimeInMillis(IdleStatus.WRITER_IDLE), + IdleStatus.WRITER_IDLE, session.getLastWriteTime()); + } + + private void notifyIdleSession0(TcpSession session, long currentTime, + long idleTime, IdleStatus status, + long lastIoTime) { + if (idleTime > 0 && session.isIdle(status) && + (currentTime - lastIoTime) >= idleTime) { + session.setIdle(status); + fireSessionIdle(session, status); + } + } + + private void flushSessions() { + if (flushingSessions.size() == 0) + return; + + for (;;) { + TcpSession session; + + synchronized (flushingSessions) { + session = (TcpSession) flushingSessions.pop(); + } + + if (session == null) + break; + + if (session.isClosed()) + continue; + + flush(session); + } + } + + private void flush(TcpSession session) { + ByteBuffer writeBuf = session.getWriteBuffer(); + SocketChannel ch = session.getChannel(); + + try { + synchronized (writeBuf) { + writeBuf.flip(); + int writtenBytes; + try { + writtenBytes = ch.write(writeBuf); + } finally { + if (writeBuf.hasRemaining()) { + // Kernel buffer is full + session.getSelectionKey().interestOps(SelectionKey.OP_READ | + SelectionKey.OP_WRITE); + } else { + session.getSelectionKey().interestOps(SelectionKey.OP_READ); + } + + writeBuf.compact(); + } + + if (writtenBytes > 0) { + session.increaseWrittenBytes(writtenBytes); + fireDataWritten(session, writeBuf, writtenBytes); + } + } + } catch (IOException e) { + fireExceptionCaught(session, e); + } + } + + private void fireSessionOpened(TcpSession session) { + try { + session.getHandler().sessionOpened(session); + } catch (Throwable e) { + fireExceptionCaught(session, e); + } + } + + private void fireSessionClosed(TcpSession session) { + try { + session.getHandler().sessionClosed(session); + } catch (Throwable e) { + fireExceptionCaught(session, e); + } + } + + private void fireSessionIdle(TcpSession session, IdleStatus status) { + try { + session.getHandler().sessionIdle(session, status); + } catch (Throwable e) { + fireExceptionCaught(session, e); + } + } + + private void fireDataRead(TcpSession session, ByteBuffer readBuf, int readBytes) { + try { + session.getHandler().dataRead(session, readBuf, readBytes); + } catch (Throwable e) { + fireExceptionCaught(session, e); + } + } + + private void fireDataWritten(TcpSession session, ByteBuffer writeBuf, int writtenBytes) { + try { + session.getHandler().dataWritten(session, writeBuf, writtenBytes); + } catch (Throwable e) { + fireExceptionCaught(session, e); + } + } + + private void fireExceptionCaught(TcpSession session, Throwable cause) { + try { + session.getHandler().exceptionCaught(session, cause); + + if (cause instanceof IOException) { + scheduleRemove(session); + } + } catch (Throwable t) { + log.error("Exception from excaptionCaught.", t); + } + } + + private class Worker extends Thread { + public Worker() { + super("TcpIoProcessor"); + setDaemon(true); + } + + public void run() { + for (;;) { + try { + int nKeys = selector.select(1000); + addSessions(); + + if (nKeys > 0) { + processSessions(selector.selectedKeys()); + } + + flushSessions(); + removeSessions(); + notifyIdleSessions(); + } catch (IOException e) { + log.error("Unexpected exception.", e); + + try { + Thread.sleep(1000); + } catch (InterruptedException e1) { + } + } + } + } + } } Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java?view=diff&rev=109861&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java&r1=109860&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java&r2=109861 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java Sat Dec 4 22:25:42 2004 @@ -3,13 +3,13 @@ */ package org.apache.netty.downstream.impl.tcp; -import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import org.apache.commons.lang.Validate; +import org.apache.netty.common.IdleStatus; import org.apache.netty.common.SessionConfig; import org.apache.netty.common.util.ByteBufferPool; import org.apache.netty.downstream.Session; @@ -34,6 +34,9 @@ private long writtenBytes; private long lastReadTime; private long lastWriteTime; + private boolean idleForBoth; + private boolean idleForRead; + private boolean idleForWrite; /** * Creates a new instance. @@ -41,7 +44,7 @@ TcpSession(SocketChannel ch, SessionHandler defaultHandler) { this.ch = ch; this.config = new TcpSessionConfig(ch); - this.readBuf = ByteBufferPool.open(); + this.readBuf = (ByteBuffer) ByteBufferPool.open().limit(0); this.writeBuf = ByteBufferPool.open(); this.handler = defaultHandler; } @@ -73,11 +76,7 @@ } public void close() { - try { - TcpIoProcessor.getInstance().removeSession(this); - } catch (IOException e) { - // This cannot happen - } + TcpIoProcessor.getInstance().removeSession(this); } public ByteBuffer getReadBuffer() { @@ -89,19 +88,7 @@ } public void flush() { - try { - TcpIoProcessor.getInstance().flushSession(this); - } catch (IOException e) { - // This cannot happen - } - } - - public void flush(Object mark) { - try { - TcpIoProcessor.getInstance().flushSession(this); - } catch (IOException e) { - // This cannot happen - } + TcpIoProcessor.getInstance().flushSession(this); } public boolean isConnected() { @@ -153,4 +140,28 @@ public long getLastWriteTime() { return lastWriteTime; } + + public boolean isIdle(IdleStatus status) { + if (status == IdleStatus.BOTH_IDLE) + return idleForBoth; + + if (status == IdleStatus.READER_IDLE) + return idleForRead; + + if (status == IdleStatus.WRITER_IDLE) + return idleForWrite; + + throw new IllegalArgumentException("Unknown idle status: " + status); + } + + void setIdle(IdleStatus status) { + if (status == IdleStatus.BOTH_IDLE) + idleForBoth = true; + else if (status == IdleStatus.READER_IDLE) + idleForRead = true; + else if (status == IdleStatus.WRITER_IDLE) + idleForWrite = true; + else + throw new IllegalArgumentException("Unknown idle status: " + status); + } } Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/Session.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/Session.java?view=diff&rev=109861&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/Session.java&r1=109860&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/Session.java&r2=109861 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/Session.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/Session.java Sat Dec 4 22:25:42 2004 @@ -21,6 +21,7 @@ import java.net.SocketAddress; +import org.apache.netty.common.IdleStatus; import org.apache.netty.common.SessionConfig; @@ -43,6 +44,8 @@ boolean write(Object message); + void notifyWhenWritable(); + boolean isConnected(); boolean isClosed(); @@ -58,4 +61,6 @@ long getLastReadTime(); long getLastWriteTime(); + + boolean isIdle(IdleStatus status); } Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java?view=diff&rev=109861&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java&r1=109860&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java&r2=109861 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java Sat Dec 4 22:25:42 2004 @@ -40,4 +40,6 @@ void messageReceived(Session session, Object message); void messageSent(Session session, Object message); + + void sessionWritable(Session session); }
