Author: trustin
Date: Sat Dec  4 04:30:00 2004
New Revision: 109795

URL: http://svn.apache.org/viewcvs?view=rev&rev=109795
Log:
Basic implementation of downstream TCP layer.
It doesn't provide full functionality yet.
Added:
   incubator/directory/seda/branches/trustin/src/examples/
   incubator/directory/seda/branches/trustin/src/examples/org/
   incubator/directory/seda/branches/trustin/src/examples/org/apache/
   incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/
   
incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/
   
incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/
   
incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/
   
incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java
   (contents, props changed)
   
incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/Main.java
   (contents, props changed)
   
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/BasicSessionConfig.java
   (contents, props changed)
   
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java
   (contents, props changed)
   
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSessionConfig.java
   (contents, props changed)
Modified:
   incubator/directory/seda/branches/trustin/maven.xml
   
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java
   (contents, props changed)
   
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java
   (contents, props changed)
   
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java
   
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java
   
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java
   (contents, props changed)
   
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java
   (contents, props changed)
   
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/Service.java
   (props changed)
   
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java
   (contents, props changed)
   
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java

Modified: incubator/directory/seda/branches/trustin/maven.xml
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/maven.xml?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/maven.xml&r1=109794&p2=incubator/directory/seda/branches/trustin/maven.xml&r2=109795
==============================================================================
--- incubator/directory/seda/branches/trustin/maven.xml (original)
+++ incubator/directory/seda/branches/trustin/maven.xml Sat Dec  4 04:30:00 2004
@@ -1,7 +1,6 @@
 <project default="test"
   xmlns:ant="jelly:ant" xmlns:maven="jelly:maven">
 
-  <!--
   <preGoal name="java:compile">
     <ant:path
       id="my.other.src.dir"
@@ -10,6 +9,5 @@
       id="maven.compile.src.set"
       refid="my.other.src.dir"/>
   </preGoal>
-  -->
 
 </project>

Added: 
incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java?view=auto&rev=109795
==============================================================================
--- (empty file)
+++ 
incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java
  Sat Dec  4 04:30:00 2004
@@ -0,0 +1,48 @@
+/*
+ * @(#) $Id$
+ */
+package org.apache.netty.examples.echo.server;
+
+import java.nio.ByteBuffer;
+
+import org.apache.netty.common.IdleStatus;
+import org.apache.netty.downstream.Session;
+import org.apache.netty.downstream.SessionHandler;
+
+/**
+ * TODO Document me.
+ * 
+ * @author Trustin Lee ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$, 
+ */
+public class EchoServerSessionHandler implements SessionHandler {
+
+       public void sessionOpened(Session session) {
+               System.out.println(session.getRemoteAddress() + ": OPEN");
+       }
+
+       public void sessionClosed(Session session) {
+               System.out.println(session.getRemoteAddress() + ": CLOSED");
+       }
+
+       public void sessionIdle(Session session, IdleStatus status) {
+               System.out.println(session.getRemoteAddress() + ": IDLE");
+       }
+
+       public void exceptionCaught(Session session, Throwable cause) {
+               System.out.println(session.getRemoteAddress() + ": EXCEPTION");
+               cause.printStackTrace(System.out);
+       }
+
+       public void dataRead(Session session, ByteBuffer buf) {
+               System.out.println(session.getRemoteAddress() + ": READ (" + 
buf.remaining() + " B)");
+               session.getWriteBuffer().put(buf);
+               session.flush();
+       }
+
+       public void markRemoved(Session session, Object mark) {
+       }
+
+       public void writeBufferAvailable(Session session) {
+       }
+}

Added: 
incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/Main.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/Main.java?view=auto&rev=109795
==============================================================================
--- (empty file)
+++ 
incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/Main.java
      Sat Dec  4 04:30:00 2004
@@ -0,0 +1,22 @@
+/*
+ * @(#) $Id$
+ */
+package org.apache.netty.examples.echo.server;
+
+import java.net.InetSocketAddress;
+
+import org.apache.netty.downstream.Acceptor;
+import org.apache.netty.downstream.impl.tcp.TcpAcceptor;
+
+/**
+ * TODO Document me.
+ * 
+ * @author Trustin Lee ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$, 
+ */
+public class Main {
+       public static void main(String[] args) throws Exception {
+               Acceptor acceptor = new TcpAcceptor();
+               acceptor.bind(new InetSocketAddress(8080), new 
EchoServerSessionHandler());
+       }
+}

Added: 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/BasicSessionConfig.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/BasicSessionConfig.java?view=auto&rev=109795
==============================================================================
--- (empty file)
+++ 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/BasicSessionConfig.java
     Sat Dec  4 04:30:00 2004
@@ -0,0 +1,72 @@
+/*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+/*
+ * @(#) $Id$
+ */
+package org.apache.netty.common.util;
+
+import org.apache.netty.common.IdleStatus;
+import org.apache.netty.common.SessionConfig;
+
+
+/**
+ * TODO Document me.
+ *
+ * @author Trustin Lee ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$,
+ */
+public abstract class BasicSessionConfig implements SessionConfig {
+    private int idleTimeForRead;
+    private int idleTimeForWrite;
+    private int idleTimeForBoth;
+
+    protected BasicSessionConfig() {
+    }
+
+    public int getIdleTime(IdleStatus status) {
+        if (status == IdleStatus.BOTH_IDLE)
+            return idleTimeForBoth;
+
+        if (status == IdleStatus.READER_IDLE)
+            return idleTimeForRead;
+
+        if (status == IdleStatus.WRITER_IDLE)
+            return idleTimeForWrite;
+
+        throw new IllegalArgumentException("Unknown idle status: " + status);
+    }
+
+    public long getIdleTimeInMillis(IdleStatus status) {
+        return getIdleTime(status) * 1000L;
+    }
+
+    public void setIdleTime(IdleStatus status, int idleTime) {
+        if (idleTime < 0)
+            throw new IllegalArgumentException("Illegal idle time: " +
+                                               idleTime);
+
+        if (status == IdleStatus.BOTH_IDLE)
+            idleTimeForBoth = idleTime;
+        else if (status == IdleStatus.READER_IDLE)
+            idleTimeForRead = idleTime;
+        else if (status == IdleStatus.WRITER_IDLE)
+            idleTimeForWrite = idleTime;
+        else
+            throw new IllegalArgumentException("Unknown idle status: " +
+                                               status);
+    }
+}

Modified: 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java&r2=109795
==============================================================================
--- 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java
 (original)
+++ 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java
 Sat Dec  4 04:30:00 2004
@@ -22,7 +22,7 @@
 /**
  * TODO Insert type comment.
  *
- * @version $Rev: 47 $, $Date: 2004-11-24 14:58:31 +0900 (Wed, 24 Nov 2004) $
+ * @version $Rev$, $Date$
  * @author Trustin Lee (http://gleamynode.net/dev/)
  */
 public class ByteBufferPool {

Modified: 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java&r2=109795
==============================================================================
--- 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java
  (original)
+++ 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java
  Sat Dec  4 04:30:00 2004
@@ -15,7 +15,7 @@
  *
  */
 /*
- * @(#) $Id: Queue.java 47 2004-11-24 05:58:31Z trustin $
+ * @(#) $Id$
  */
 package org.apache.netty.common.util;
 
@@ -33,7 +33,7 @@
  *         
href="http://projects.gleamynode.net/";>http://projects.gleamynode.net/
  *         </a>)
  *
- * @version $Rev: 47 $, $Date: 2004-11-24 14:58:31 +0900 (Wed, 24 Nov 2004) $
+ * @version $Rev$, $Date$
  */
 public class Queue implements Serializable {
     private Object[] items;

Modified: 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java&r2=109795
==============================================================================
--- 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java
 (original)
+++ 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java
 Sat Dec  4 04:30:00 2004
@@ -38,12 +38,14 @@
     void setHandler(SessionHandler handler);
 
     void close();
+    
+    ByteBuffer getReadBuffer();
 
     ByteBuffer getWriteBuffer();
 
-    void setMark(Object mark);
-
     void flush();
+    
+    void flush(Object mark);
 
     boolean isConnected();
 

Modified: 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java&r2=109795
==============================================================================
--- 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java
  (original)
+++ 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java
  Sat Dec  4 04:30:00 2004
@@ -31,7 +31,7 @@
  * @version $Rev$, $Date$
  */
 public interface SessionHandler {
-    void sessionEstablished(Session session);
+    void sessionOpened(Session session);
 
     void sessionClosed(Session session);
 
@@ -42,4 +42,6 @@
     void dataRead(Session session, ByteBuffer buf);
 
     void markRemoved(Session session, Object mark);
+    
+    void writeBufferAvailable(Session session);
 }

Modified: 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java&r2=109795
==============================================================================
--- 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java
    (original)
+++ 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java
    Sat Dec  4 04:30:00 2004
@@ -54,8 +54,6 @@
     /**
      * Creates a new instance.
      * @throws IOException
-     *
-     *
      */
     public TcpAcceptor() throws IOException {
         selector = Selector.open();
@@ -134,7 +132,7 @@
                             continue;
                         
                         TcpSession session = new TcpSession(ch, 
(SessionHandler) key.attachment());
-                        session.start();
+                        TcpIoProcessor.getInstance().addSession(session);
                     }
                 } catch (IOException e) {
                     log.error("Unexpected exception.", e);

Added: 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java?view=auto&rev=109795
==============================================================================
--- (empty file)
+++ 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java
 Sat Dec  4 04:30:00 2004
@@ -0,0 +1,289 @@
+/*
+ * @(#) $Id$
+ */
+package org.apache.netty.downstream.impl.tcp;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * TODO Document me.
+ * TODO Implement idleTime/bufferWritable/
+ * @author Trustin Lee ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$, 
+ */
+class TcpIoProcessor {
+       private static final Log log = LogFactory.getLog(TcpIoProcessor.class);
+       private static final TcpIoProcessor instance;
+       
+       static {
+               TcpIoProcessor tmp;
+               try {
+                       tmp = new TcpIoProcessor();
+               } catch (IOException e) {
+                       tmp = null;
+               }
+               
+               instance = tmp;
+       }
+       
+       public static TcpIoProcessor getInstance() throws IOException {
+               if (instance == null)
+                       throw new IOException("Failed to open selector.");
+               return instance;
+       }
+       
+       private final Selector selector;
+       private final List newSessions = new ArrayList();
+       private final List removingSessions = new ArrayList();
+       private final List flushingSessions = new ArrayList();
+       private Worker worker;
+       
+       private TcpIoProcessor() throws IOException {
+               selector = Selector.open();
+       }
+       
+       public void addSession(TcpSession session) {
+               if (worker == null) {
+                       synchronized (this) {
+                               if (worker == null) {
+                                       worker = new Worker();
+                                       worker.start();
+                               }
+                       }
+               }
+               
+               synchronized (newSessions) {
+                       newSessions.add(session);
+               }
+               selector.wakeup();
+       }
+       
+       public void removeSession(TcpSession session) {
+               synchronized (removingSessions) {
+                       removingSessions.add(session);
+               }
+               selector.wakeup();
+       }
+       
+       public void flushSession(TcpSession session) {
+               synchronized (flushingSessions) {
+                       flushingSessions.add(session);
+               }
+               selector.wakeup();
+       }
+       
+       private class Worker extends Thread {
+               public Worker() {
+                       super("TcpIoProcessor");
+                       setDaemon(true);
+               }
+               
+               public void run() {
+                       for (;;) {
+                               try {
+                                       int nKeys = selector.select();
+                                       addSessions();
+                                       if (nKeys > 0) {
+                                               
processSessions(selector.selectedKeys());
+                                       }
+                                       flushSessions();
+                                       removeSessions();
+                               } catch (IOException e) {
+                                       log.error("Unexpected exception.", e);
+                                       try {
+                                               Thread.sleep(1000);
+                                       } catch (InterruptedException e1) {
+                                       }
+                               }
+                       }
+               }
+       }
+       
+       private void addSessions() {
+               if (newSessions.size() == 0)
+                       return;
+               
+               synchronized (newSessions) {
+                       Iterator it = newSessions.iterator();
+                       while (it.hasNext()) {
+                               TcpSession session = (TcpSession) it.next();
+                               SocketChannel ch = session.getChannel();
+                               boolean registered;
+                               try {
+                                       ch.configureBlocking(false);
+                                       
session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
+                                       registered = true;
+                               } catch (IOException e) {
+                                       registered = false;
+                                       fireExceptionCaught(session, e);
+                               }
+                               
+                               if (registered) {
+                                       fireSessionOpened(session);
+                               }
+                       }
+                       
+                       newSessions.clear();
+               }
+       }
+
+       private void removeSessions() {
+               if (removingSessions.size() == 0)
+                       return;
+
+               synchronized (removingSessions) {
+                       Iterator it = removingSessions.iterator();
+                       while (it.hasNext()) {
+                               TcpSession session = (TcpSession) it.next();
+                               SocketChannel ch = session.getChannel();
+                               session.getSelectionKey().cancel();
+                               session.dispose();
+                               try {
+                                       ch.close();
+                               } catch (IOException e) {
+                                       fireExceptionCaught(session, e);
+                               } finally {
+                                       fireSessionClosed(session);
+                               }
+                       }
+                       
+                       removingSessions.clear();
+               }
+       }
+       
+       private void processSessions(Set selectedKeys) {
+               Iterator it = selectedKeys.iterator();
+               while (it.hasNext()) {
+                       SelectionKey key = (SelectionKey) it.next();
+                       TcpSession session = (TcpSession) key.attachment();
+                       if (key.isReadable()) {
+                               read(session);
+                       } else if (key.isWritable()) {
+                               scheduleFlush(session);
+                       }
+               }
+       }
+       
+       private void read(TcpSession session) {
+               ByteBuffer readBuf = session.getReadBuffer();
+               SocketChannel ch = session.getChannel();
+               try {
+                       int readBytes = 0;
+                       int ret;
+                       
+                       synchronized (readBuf) {
+                               while ((ret = ch.read(readBuf)) > 0) {
+                                       readBytes += ret;
+                               }
+                               
+                               if (readBytes > 0) {
+                                       session.increaseReadBytes(readBytes);
+                                       readBuf.flip();
+                                       fireDataRead(session, readBuf);
+                                       readBuf.compact();
+                               }
+                       }
+                       
+                       if (ret < 0) {
+                               synchronized (removingSessions) {
+                                       removingSessions.add(session);
+                               }
+                       }
+               } catch (IOException e) {
+                       fireExceptionCaught(session, e);
+               }
+       }
+
+       private void scheduleFlush(TcpSession session) {
+               session.getSelectionKey().interestOps(SelectionKey.OP_READ);
+               synchronized (flushingSessions) {
+                       flushingSessions.add(session);
+               }
+       }
+
+       private void flushSessions() {
+               if (flushingSessions.size() == 0)
+                       return;
+
+               synchronized (flushingSessions) {
+                       Iterator it = flushingSessions.iterator();
+                       while (it.hasNext()) {
+                               TcpSession session = (TcpSession) it.next();
+                               if (session.isClosed())
+                                       continue;
+
+                               flush(session);
+                       }
+                       
+                       flushingSessions.clear();
+               }
+       }
+       
+       private void flush(TcpSession session) {
+               ByteBuffer writeBuf = session.getWriteBuffer();
+               SocketChannel ch = session.getChannel();
+               
+               try {
+                       synchronized (writeBuf) {
+                               writeBuf.flip();
+                               int nBytes = ch.write(writeBuf);
+                               writeBuf.compact();
+                               
+                               if (nBytes > 0)
+                                       session.increaseWrittenBytes(nBytes);
+
+                               int remaining = writeBuf.remaining();
+                               if (remaining > 0){
+                                       // Kernel buffer is full
+                                       
session.getSelectionKey().interestOps(SelectionKey.OP_READ | 
SelectionKey.OP_WRITE);
+                               }
+                       }
+               } catch (IOException e) {
+                       fireExceptionCaught(session, e);
+               }
+       }
+
+       private void fireSessionOpened(TcpSession session) {
+               try {
+                       session.getHandler().sessionOpened(session);
+               } catch (Throwable e) {
+                       fireExceptionCaught(session, e);
+               }
+       }
+       
+       private void fireSessionClosed(TcpSession session) {
+               try {
+                       session.getHandler().sessionClosed(session);
+               } catch (Throwable e) {
+                       fireExceptionCaught(session, e);
+               }
+       }
+       
+       private void fireDataRead(TcpSession session, ByteBuffer readBuf) {
+               try {
+                       session.getHandler().dataRead(session, readBuf);
+               } catch (Throwable e) {
+                       fireExceptionCaught(session, e);
+               }
+       }
+       
+       private void fireExceptionCaught(TcpSession session, Throwable cause) {
+               try {
+                       session.getHandler().exceptionCaught(session, cause);
+               } catch (Throwable t) {
+                       log.error("Exception from excaptionCaught.", t);
+               }
+       }
+
+}

Modified: 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java&r2=109795
==============================================================================
--- 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java
     (original)
+++ 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java
     Sat Dec  4 04:30:00 2004
@@ -3,12 +3,15 @@
  */
 package org.apache.netty.downstream.impl.tcp;
 
+import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 
 import org.apache.commons.lang.Validate;
 import org.apache.netty.common.SessionConfig;
+import org.apache.netty.common.util.ByteBufferPool;
 import org.apache.netty.downstream.Session;
 import org.apache.netty.downstream.SessionHandler;
 
@@ -21,18 +24,44 @@
 class TcpSession implements Session {
 
     private final SocketChannel ch;
-    private final SessionConfig config = new SimpleSessionConfig();
+    private final TcpSessionConfig config;
+    private final ByteBuffer readBuf;
+    private final ByteBuffer writeBuf;
+
+    private SelectionKey key;
     private SessionHandler handler;
+    private long readBytes;
+       private long writtenBytes;
+       private long lastReadTime;
+       private long lastWriteTime;
     
     /**
      * Creates a new instance.
-     * 
-     * 
      */
     TcpSession(SocketChannel ch, SessionHandler defaultHandler) {
         this.ch = ch;
+        this.config = new TcpSessionConfig(ch);
+        this.readBuf = ByteBufferPool.open();
+        this.writeBuf = ByteBufferPool.open();
         this.handler = defaultHandler;
     }
+    
+    SocketChannel getChannel() {
+       return ch;
+    }
+    
+    SelectionKey getSelectionKey() {
+       return key;
+    }
+    
+    void setSelectionKey(SelectionKey key) {
+       this.key = key;
+    }
+    
+    void dispose() {
+       ByteBufferPool.close(readBuf);
+       ByteBufferPool.close(writeBuf);
+    }
 
     public SessionHandler getHandler() {
         return handler;
@@ -44,16 +73,35 @@
     }
 
     public void close() {
+       try {
+                       TcpIoProcessor.getInstance().removeSession(this);
+               } catch (IOException e) {
+                       // This cannot happen
+               }
     }
-
+    
+    public ByteBuffer getReadBuffer() {
+       return readBuf;
+    }
+    
     public ByteBuffer getWriteBuffer() {
-        return null;
+        return writeBuf;
     }
-
-    public void setMark(Object mark) {
+    
+    public void flush() {
+       try {
+                       TcpIoProcessor.getInstance().flushSession(this);
+               } catch (IOException e) {
+                       // This cannot happen
+               }
     }
 
-    public void flush() {
+    public void flush(Object mark) {
+       try {
+                       TcpIoProcessor.getInstance().flushSession(this);
+               } catch (IOException e) {
+                       // This cannot happen
+               }
     }
 
     public boolean isConnected() {
@@ -61,7 +109,7 @@
     }
 
     public boolean isClosed() {
-        return !ch.isConnected();
+        return !isConnected();
     }
 
     public SessionConfig getConfig() {
@@ -83,15 +131,25 @@
     public long getWrittenBytes() {
         return writtenBytes;
     }
+    
+    void increaseReadBytes(int increment) {
+               readBytes += increment;
+       lastReadTime = System.currentTimeMillis();
+    }
+    
+    void increaseWrittenBytes(int increment) {
+       writtenBytes += increment;
+       lastWriteTime = System.currentTimeMillis();
+    }
 
     public long getLastIoTime() {
-        return Math.max(lastReadtime, lastWriteTime);
+        return Math.max(lastReadTime, lastWriteTime);
     }
 
     public long getLastReadTime() {
         return lastReadTime;
     }
-
+    
     public long getLastWriteTime() {
         return lastWriteTime;
     }

Added: 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSessionConfig.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSessionConfig.java?view=auto&rev=109795
==============================================================================
--- (empty file)
+++ 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSessionConfig.java
       Sat Dec  4 04:30:00 2004
@@ -0,0 +1,90 @@
+/*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+/*
+ * @(#) $Id$
+ */
+package org.apache.netty.downstream.impl.tcp;
+
+import java.net.SocketException;
+
+import java.nio.channels.SocketChannel;
+
+import org.apache.netty.common.util.BasicSessionConfig;
+
+
+/**
+ * TODO Document me.
+ *
+ * @author Trustin Lee ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$,
+ */
+public class TcpSessionConfig extends BasicSessionConfig {
+    private final SocketChannel ch;
+
+    TcpSessionConfig(SocketChannel ch) {
+        this.ch = ch;
+    }
+
+    public boolean getKeepAlive() throws SocketException {
+        return ch.socket().getKeepAlive();
+    }
+
+    public void setKeepAlive(boolean on) throws SocketException {
+        ch.socket().setKeepAlive(on);
+    }
+
+    public boolean getOOBInline() throws SocketException {
+        return ch.socket().getOOBInline();
+    }
+
+    public void setOOBInline(boolean on) throws SocketException {
+        ch.socket().setOOBInline(on);
+    }
+
+    public boolean getReuseAddress() throws SocketException {
+        return ch.socket().getReuseAddress();
+    }
+
+    public void setReuseAddress(boolean on) throws SocketException {
+        ch.socket().setReuseAddress(on);
+    }
+
+    public int getSoLinger() throws SocketException {
+        return ch.socket().getSoLinger();
+    }
+
+    public void setSoLinger(boolean on, int linger)
+                     throws SocketException {
+        ch.socket().setSoLinger(on, linger);
+    }
+
+    public boolean getTcpNoDelay() throws SocketException {
+        return ch.socket().getTcpNoDelay();
+    }
+
+    public void setTcpNoDelay(boolean on) throws SocketException {
+        ch.socket().setTcpNoDelay(on);
+    }
+
+    public int getTrafficClass() throws SocketException {
+        return ch.socket().getTrafficClass();
+    }
+
+    public void setTrafficClass(int tc) throws SocketException {
+        ch.socket().setTrafficClass(tc);
+    }
+}

Modified: 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java&r2=109795
==============================================================================
--- 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java
   (original)
+++ 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java
   Sat Dec  4 04:30:00 2004
@@ -30,7 +30,7 @@
  *
  * @author [EMAIL PROTECTED]
  * @author [EMAIL PROTECTED]
- * @version $Rev: 56478 $, $Date$
+ * @version $Rev$, $Date$
  */
 public interface ServiceRegistry {
     void bind(Service service,

Modified: 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java
Url: 
http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java&r2=109795
==============================================================================
--- 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java
    (original)
+++ 
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java
    Sat Dec  4 04:30:00 2004
@@ -29,7 +29,7 @@
  * @version $Rev$, $Date$
  */
 public interface SessionHandler {
-    void sessionEstablished(Session session);
+    void sessionOpened(Session session);
 
     void sessionClosed(Session session);
 

Reply via email to