Author: trustin Date: Sat Dec 4 04:30:00 2004 New Revision: 109795 URL: http://svn.apache.org/viewcvs?view=rev&rev=109795 Log: Basic implementation of downstream TCP layer. It doesn't provide full functionality yet. Added: incubator/directory/seda/branches/trustin/src/examples/ incubator/directory/seda/branches/trustin/src/examples/org/ incubator/directory/seda/branches/trustin/src/examples/org/apache/ incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/ incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/ incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/ incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/ incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java (contents, props changed) incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/Main.java (contents, props changed) incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/BasicSessionConfig.java (contents, props changed) incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java (contents, props changed) incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSessionConfig.java (contents, props changed) Modified: incubator/directory/seda/branches/trustin/maven.xml incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java (contents, props changed) incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java (contents, props changed) incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java (contents, props changed) incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java (contents, props changed) incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/Service.java (props changed) incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java (contents, props changed) incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java
Modified: incubator/directory/seda/branches/trustin/maven.xml Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/maven.xml?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/maven.xml&r1=109794&p2=incubator/directory/seda/branches/trustin/maven.xml&r2=109795 ============================================================================== --- incubator/directory/seda/branches/trustin/maven.xml (original) +++ incubator/directory/seda/branches/trustin/maven.xml Sat Dec 4 04:30:00 2004 @@ -1,7 +1,6 @@ <project default="test" xmlns:ant="jelly:ant" xmlns:maven="jelly:maven"> - <!-- <preGoal name="java:compile"> <ant:path id="my.other.src.dir" @@ -10,6 +9,5 @@ id="maven.compile.src.set" refid="my.other.src.dir"/> </preGoal> - --> </project> Added: incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java?view=auto&rev=109795 ============================================================================== --- (empty file) +++ incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java Sat Dec 4 04:30:00 2004 @@ -0,0 +1,48 @@ +/* + * @(#) $Id$ + */ +package org.apache.netty.examples.echo.server; + +import java.nio.ByteBuffer; + +import org.apache.netty.common.IdleStatus; +import org.apache.netty.downstream.Session; +import org.apache.netty.downstream.SessionHandler; + +/** + * TODO Document me. + * + * @author Trustin Lee ([EMAIL PROTECTED]) + * @version $Rev$, $Date$, + */ +public class EchoServerSessionHandler implements SessionHandler { + + public void sessionOpened(Session session) { + System.out.println(session.getRemoteAddress() + ": OPEN"); + } + + public void sessionClosed(Session session) { + System.out.println(session.getRemoteAddress() + ": CLOSED"); + } + + public void sessionIdle(Session session, IdleStatus status) { + System.out.println(session.getRemoteAddress() + ": IDLE"); + } + + public void exceptionCaught(Session session, Throwable cause) { + System.out.println(session.getRemoteAddress() + ": EXCEPTION"); + cause.printStackTrace(System.out); + } + + public void dataRead(Session session, ByteBuffer buf) { + System.out.println(session.getRemoteAddress() + ": READ (" + buf.remaining() + " B)"); + session.getWriteBuffer().put(buf); + session.flush(); + } + + public void markRemoved(Session session, Object mark) { + } + + public void writeBufferAvailable(Session session) { + } +} Added: incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/Main.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/Main.java?view=auto&rev=109795 ============================================================================== --- (empty file) +++ incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/Main.java Sat Dec 4 04:30:00 2004 @@ -0,0 +1,22 @@ +/* + * @(#) $Id$ + */ +package org.apache.netty.examples.echo.server; + +import java.net.InetSocketAddress; + +import org.apache.netty.downstream.Acceptor; +import org.apache.netty.downstream.impl.tcp.TcpAcceptor; + +/** + * TODO Document me. + * + * @author Trustin Lee ([EMAIL PROTECTED]) + * @version $Rev$, $Date$, + */ +public class Main { + public static void main(String[] args) throws Exception { + Acceptor acceptor = new TcpAcceptor(); + acceptor.bind(new InetSocketAddress(8080), new EchoServerSessionHandler()); + } +} Added: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/BasicSessionConfig.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/BasicSessionConfig.java?view=auto&rev=109795 ============================================================================== --- (empty file) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/BasicSessionConfig.java Sat Dec 4 04:30:00 2004 @@ -0,0 +1,72 @@ +/* + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +/* + * @(#) $Id$ + */ +package org.apache.netty.common.util; + +import org.apache.netty.common.IdleStatus; +import org.apache.netty.common.SessionConfig; + + +/** + * TODO Document me. + * + * @author Trustin Lee ([EMAIL PROTECTED]) + * @version $Rev$, $Date$, + */ +public abstract class BasicSessionConfig implements SessionConfig { + private int idleTimeForRead; + private int idleTimeForWrite; + private int idleTimeForBoth; + + protected BasicSessionConfig() { + } + + public int getIdleTime(IdleStatus status) { + if (status == IdleStatus.BOTH_IDLE) + return idleTimeForBoth; + + if (status == IdleStatus.READER_IDLE) + return idleTimeForRead; + + if (status == IdleStatus.WRITER_IDLE) + return idleTimeForWrite; + + throw new IllegalArgumentException("Unknown idle status: " + status); + } + + public long getIdleTimeInMillis(IdleStatus status) { + return getIdleTime(status) * 1000L; + } + + public void setIdleTime(IdleStatus status, int idleTime) { + if (idleTime < 0) + throw new IllegalArgumentException("Illegal idle time: " + + idleTime); + + if (status == IdleStatus.BOTH_IDLE) + idleTimeForBoth = idleTime; + else if (status == IdleStatus.READER_IDLE) + idleTimeForRead = idleTime; + else if (status == IdleStatus.WRITER_IDLE) + idleTimeForWrite = idleTime; + else + throw new IllegalArgumentException("Unknown idle status: " + + status); + } +} Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java&r2=109795 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java Sat Dec 4 04:30:00 2004 @@ -22,7 +22,7 @@ /** * TODO Insert type comment. * - * @version $Rev: 47 $, $Date: 2004-11-24 14:58:31 +0900 (Wed, 24 Nov 2004) $ + * @version $Rev$, $Date$ * @author Trustin Lee (http://gleamynode.net/dev/) */ public class ByteBufferPool { Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java&r2=109795 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java Sat Dec 4 04:30:00 2004 @@ -15,7 +15,7 @@ * */ /* - * @(#) $Id: Queue.java 47 2004-11-24 05:58:31Z trustin $ + * @(#) $Id$ */ package org.apache.netty.common.util; @@ -33,7 +33,7 @@ * href="http://projects.gleamynode.net/">http://projects.gleamynode.net/ * </a>) * - * @version $Rev: 47 $, $Date: 2004-11-24 14:58:31 +0900 (Wed, 24 Nov 2004) $ + * @version $Rev$, $Date$ */ public class Queue implements Serializable { private Object[] items; Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java&r2=109795 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java Sat Dec 4 04:30:00 2004 @@ -38,12 +38,14 @@ void setHandler(SessionHandler handler); void close(); + + ByteBuffer getReadBuffer(); ByteBuffer getWriteBuffer(); - void setMark(Object mark); - void flush(); + + void flush(Object mark); boolean isConnected(); Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java&r2=109795 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java Sat Dec 4 04:30:00 2004 @@ -31,7 +31,7 @@ * @version $Rev$, $Date$ */ public interface SessionHandler { - void sessionEstablished(Session session); + void sessionOpened(Session session); void sessionClosed(Session session); @@ -42,4 +42,6 @@ void dataRead(Session session, ByteBuffer buf); void markRemoved(Session session, Object mark); + + void writeBufferAvailable(Session session); } Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java&r2=109795 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java Sat Dec 4 04:30:00 2004 @@ -54,8 +54,6 @@ /** * Creates a new instance. * @throws IOException - * - * */ public TcpAcceptor() throws IOException { selector = Selector.open(); @@ -134,7 +132,7 @@ continue; TcpSession session = new TcpSession(ch, (SessionHandler) key.attachment()); - session.start(); + TcpIoProcessor.getInstance().addSession(session); } } catch (IOException e) { log.error("Unexpected exception.", e); Added: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java?view=auto&rev=109795 ============================================================================== --- (empty file) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java Sat Dec 4 04:30:00 2004 @@ -0,0 +1,289 @@ +/* + * @(#) $Id$ + */ +package org.apache.netty.downstream.impl.tcp; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.SocketChannel; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * TODO Document me. + * TODO Implement idleTime/bufferWritable/ + * @author Trustin Lee ([EMAIL PROTECTED]) + * @version $Rev$, $Date$, + */ +class TcpIoProcessor { + private static final Log log = LogFactory.getLog(TcpIoProcessor.class); + private static final TcpIoProcessor instance; + + static { + TcpIoProcessor tmp; + try { + tmp = new TcpIoProcessor(); + } catch (IOException e) { + tmp = null; + } + + instance = tmp; + } + + public static TcpIoProcessor getInstance() throws IOException { + if (instance == null) + throw new IOException("Failed to open selector."); + return instance; + } + + private final Selector selector; + private final List newSessions = new ArrayList(); + private final List removingSessions = new ArrayList(); + private final List flushingSessions = new ArrayList(); + private Worker worker; + + private TcpIoProcessor() throws IOException { + selector = Selector.open(); + } + + public void addSession(TcpSession session) { + if (worker == null) { + synchronized (this) { + if (worker == null) { + worker = new Worker(); + worker.start(); + } + } + } + + synchronized (newSessions) { + newSessions.add(session); + } + selector.wakeup(); + } + + public void removeSession(TcpSession session) { + synchronized (removingSessions) { + removingSessions.add(session); + } + selector.wakeup(); + } + + public void flushSession(TcpSession session) { + synchronized (flushingSessions) { + flushingSessions.add(session); + } + selector.wakeup(); + } + + private class Worker extends Thread { + public Worker() { + super("TcpIoProcessor"); + setDaemon(true); + } + + public void run() { + for (;;) { + try { + int nKeys = selector.select(); + addSessions(); + if (nKeys > 0) { + processSessions(selector.selectedKeys()); + } + flushSessions(); + removeSessions(); + } catch (IOException e) { + log.error("Unexpected exception.", e); + try { + Thread.sleep(1000); + } catch (InterruptedException e1) { + } + } + } + } + } + + private void addSessions() { + if (newSessions.size() == 0) + return; + + synchronized (newSessions) { + Iterator it = newSessions.iterator(); + while (it.hasNext()) { + TcpSession session = (TcpSession) it.next(); + SocketChannel ch = session.getChannel(); + boolean registered; + try { + ch.configureBlocking(false); + session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session)); + registered = true; + } catch (IOException e) { + registered = false; + fireExceptionCaught(session, e); + } + + if (registered) { + fireSessionOpened(session); + } + } + + newSessions.clear(); + } + } + + private void removeSessions() { + if (removingSessions.size() == 0) + return; + + synchronized (removingSessions) { + Iterator it = removingSessions.iterator(); + while (it.hasNext()) { + TcpSession session = (TcpSession) it.next(); + SocketChannel ch = session.getChannel(); + session.getSelectionKey().cancel(); + session.dispose(); + try { + ch.close(); + } catch (IOException e) { + fireExceptionCaught(session, e); + } finally { + fireSessionClosed(session); + } + } + + removingSessions.clear(); + } + } + + private void processSessions(Set selectedKeys) { + Iterator it = selectedKeys.iterator(); + while (it.hasNext()) { + SelectionKey key = (SelectionKey) it.next(); + TcpSession session = (TcpSession) key.attachment(); + if (key.isReadable()) { + read(session); + } else if (key.isWritable()) { + scheduleFlush(session); + } + } + } + + private void read(TcpSession session) { + ByteBuffer readBuf = session.getReadBuffer(); + SocketChannel ch = session.getChannel(); + try { + int readBytes = 0; + int ret; + + synchronized (readBuf) { + while ((ret = ch.read(readBuf)) > 0) { + readBytes += ret; + } + + if (readBytes > 0) { + session.increaseReadBytes(readBytes); + readBuf.flip(); + fireDataRead(session, readBuf); + readBuf.compact(); + } + } + + if (ret < 0) { + synchronized (removingSessions) { + removingSessions.add(session); + } + } + } catch (IOException e) { + fireExceptionCaught(session, e); + } + } + + private void scheduleFlush(TcpSession session) { + session.getSelectionKey().interestOps(SelectionKey.OP_READ); + synchronized (flushingSessions) { + flushingSessions.add(session); + } + } + + private void flushSessions() { + if (flushingSessions.size() == 0) + return; + + synchronized (flushingSessions) { + Iterator it = flushingSessions.iterator(); + while (it.hasNext()) { + TcpSession session = (TcpSession) it.next(); + if (session.isClosed()) + continue; + + flush(session); + } + + flushingSessions.clear(); + } + } + + private void flush(TcpSession session) { + ByteBuffer writeBuf = session.getWriteBuffer(); + SocketChannel ch = session.getChannel(); + + try { + synchronized (writeBuf) { + writeBuf.flip(); + int nBytes = ch.write(writeBuf); + writeBuf.compact(); + + if (nBytes > 0) + session.increaseWrittenBytes(nBytes); + + int remaining = writeBuf.remaining(); + if (remaining > 0){ + // Kernel buffer is full + session.getSelectionKey().interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); + } + } + } catch (IOException e) { + fireExceptionCaught(session, e); + } + } + + private void fireSessionOpened(TcpSession session) { + try { + session.getHandler().sessionOpened(session); + } catch (Throwable e) { + fireExceptionCaught(session, e); + } + } + + private void fireSessionClosed(TcpSession session) { + try { + session.getHandler().sessionClosed(session); + } catch (Throwable e) { + fireExceptionCaught(session, e); + } + } + + private void fireDataRead(TcpSession session, ByteBuffer readBuf) { + try { + session.getHandler().dataRead(session, readBuf); + } catch (Throwable e) { + fireExceptionCaught(session, e); + } + } + + private void fireExceptionCaught(TcpSession session, Throwable cause) { + try { + session.getHandler().exceptionCaught(session, cause); + } catch (Throwable t) { + log.error("Exception from excaptionCaught.", t); + } + } + +} Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java&r2=109795 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java Sat Dec 4 04:30:00 2004 @@ -3,12 +3,15 @@ */ package org.apache.netty.downstream.impl.tcp; +import java.io.IOException; import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import org.apache.commons.lang.Validate; import org.apache.netty.common.SessionConfig; +import org.apache.netty.common.util.ByteBufferPool; import org.apache.netty.downstream.Session; import org.apache.netty.downstream.SessionHandler; @@ -21,18 +24,44 @@ class TcpSession implements Session { private final SocketChannel ch; - private final SessionConfig config = new SimpleSessionConfig(); + private final TcpSessionConfig config; + private final ByteBuffer readBuf; + private final ByteBuffer writeBuf; + + private SelectionKey key; private SessionHandler handler; + private long readBytes; + private long writtenBytes; + private long lastReadTime; + private long lastWriteTime; /** * Creates a new instance. - * - * */ TcpSession(SocketChannel ch, SessionHandler defaultHandler) { this.ch = ch; + this.config = new TcpSessionConfig(ch); + this.readBuf = ByteBufferPool.open(); + this.writeBuf = ByteBufferPool.open(); this.handler = defaultHandler; } + + SocketChannel getChannel() { + return ch; + } + + SelectionKey getSelectionKey() { + return key; + } + + void setSelectionKey(SelectionKey key) { + this.key = key; + } + + void dispose() { + ByteBufferPool.close(readBuf); + ByteBufferPool.close(writeBuf); + } public SessionHandler getHandler() { return handler; @@ -44,16 +73,35 @@ } public void close() { + try { + TcpIoProcessor.getInstance().removeSession(this); + } catch (IOException e) { + // This cannot happen + } } - + + public ByteBuffer getReadBuffer() { + return readBuf; + } + public ByteBuffer getWriteBuffer() { - return null; + return writeBuf; } - - public void setMark(Object mark) { + + public void flush() { + try { + TcpIoProcessor.getInstance().flushSession(this); + } catch (IOException e) { + // This cannot happen + } } - public void flush() { + public void flush(Object mark) { + try { + TcpIoProcessor.getInstance().flushSession(this); + } catch (IOException e) { + // This cannot happen + } } public boolean isConnected() { @@ -61,7 +109,7 @@ } public boolean isClosed() { - return !ch.isConnected(); + return !isConnected(); } public SessionConfig getConfig() { @@ -83,15 +131,25 @@ public long getWrittenBytes() { return writtenBytes; } + + void increaseReadBytes(int increment) { + readBytes += increment; + lastReadTime = System.currentTimeMillis(); + } + + void increaseWrittenBytes(int increment) { + writtenBytes += increment; + lastWriteTime = System.currentTimeMillis(); + } public long getLastIoTime() { - return Math.max(lastReadtime, lastWriteTime); + return Math.max(lastReadTime, lastWriteTime); } public long getLastReadTime() { return lastReadTime; } - + public long getLastWriteTime() { return lastWriteTime; } Added: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSessionConfig.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSessionConfig.java?view=auto&rev=109795 ============================================================================== --- (empty file) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSessionConfig.java Sat Dec 4 04:30:00 2004 @@ -0,0 +1,90 @@ +/* + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +/* + * @(#) $Id$ + */ +package org.apache.netty.downstream.impl.tcp; + +import java.net.SocketException; + +import java.nio.channels.SocketChannel; + +import org.apache.netty.common.util.BasicSessionConfig; + + +/** + * TODO Document me. + * + * @author Trustin Lee ([EMAIL PROTECTED]) + * @version $Rev$, $Date$, + */ +public class TcpSessionConfig extends BasicSessionConfig { + private final SocketChannel ch; + + TcpSessionConfig(SocketChannel ch) { + this.ch = ch; + } + + public boolean getKeepAlive() throws SocketException { + return ch.socket().getKeepAlive(); + } + + public void setKeepAlive(boolean on) throws SocketException { + ch.socket().setKeepAlive(on); + } + + public boolean getOOBInline() throws SocketException { + return ch.socket().getOOBInline(); + } + + public void setOOBInline(boolean on) throws SocketException { + ch.socket().setOOBInline(on); + } + + public boolean getReuseAddress() throws SocketException { + return ch.socket().getReuseAddress(); + } + + public void setReuseAddress(boolean on) throws SocketException { + ch.socket().setReuseAddress(on); + } + + public int getSoLinger() throws SocketException { + return ch.socket().getSoLinger(); + } + + public void setSoLinger(boolean on, int linger) + throws SocketException { + ch.socket().setSoLinger(on, linger); + } + + public boolean getTcpNoDelay() throws SocketException { + return ch.socket().getTcpNoDelay(); + } + + public void setTcpNoDelay(boolean on) throws SocketException { + ch.socket().setTcpNoDelay(on); + } + + public int getTrafficClass() throws SocketException { + return ch.socket().getTrafficClass(); + } + + public void setTrafficClass(int tc) throws SocketException { + ch.socket().setTrafficClass(tc); + } +} Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java&r2=109795 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java Sat Dec 4 04:30:00 2004 @@ -30,7 +30,7 @@ * * @author [EMAIL PROTECTED] * @author [EMAIL PROTECTED] - * @version $Rev: 56478 $, $Date$ + * @version $Rev$, $Date$ */ public interface ServiceRegistry { void bind(Service service, Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java&r2=109795 ============================================================================== --- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java (original) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java Sat Dec 4 04:30:00 2004 @@ -29,7 +29,7 @@ * @version $Rev$, $Date$ */ public interface SessionHandler { - void sessionEstablished(Session session); + void sessionOpened(Session session); void sessionClosed(Session session);
