Author: trustin Date: Sat Dec 4 01:56:28 2004 New Revision: 109794 URL: http://svn.apache.org/viewcvs?view=rev&rev=109794 Log: Implementing downstream... Added: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java
Added: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java?view=auto&rev=109794 ============================================================================== --- (empty file) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java Sat Dec 4 01:56:28 2004 @@ -0,0 +1,51 @@ +/* + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.netty.common.util; + +import java.nio.ByteBuffer; + + +/** + * TODO Insert type comment. + * + * @version $Rev: 47 $, $Date: 2004-11-24 14:58:31 +0900 (Wed, 24 Nov 2004) $ + * @author Trustin Lee (http://gleamynode.net/dev/) + */ +public class ByteBufferPool { + static final int DEFAULT_BUF_SIZE = 8192; + private static Queue buffers = new Queue(16); + + static { + buffers.open(); + } + + public static synchronized ByteBuffer open() { + ByteBuffer buf = (ByteBuffer) buffers.pop(); + + if (buf == null) { + buf = ByteBuffer.allocateDirect(DEFAULT_BUF_SIZE); + } else { + buf.clear(); + } + + return buf; + } + + public static synchronized void close(ByteBuffer buf) { + buffers.push(buf); + } +} Added: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java?view=auto&rev=109794 ============================================================================== --- (empty file) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java Sat Dec 4 01:56:28 2004 @@ -0,0 +1,155 @@ +/* + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +/* + * @(#) $Id: Queue.java 47 2004-11-24 05:58:31Z trustin $ + */ +package org.apache.netty.common.util; + +import java.io.Serializable; + +import java.util.Arrays; + + +/** + * <p> + * A simple queue class. This class is <b>NOT </b> thread-safe. + * </p> + * + * @author Trustin Lee (http://gleamynode.net/dev/) + * href="http://projects.gleamynode.net/">http://projects.gleamynode.net/ + * </a>) + * + * @version $Rev: 47 $, $Date: 2004-11-24 14:58:31 +0900 (Wed, 24 Nov 2004) $ + */ +public class Queue implements Serializable { + private Object[] items; + private int first = 0; + private int last = 0; + private int size = 0; + private boolean open = false; + + /** + * Construct a new, empty <code>Queue</code> with the specified initial + * capacity. + */ + public Queue(int initialCapacity) { + items = new Object[initialCapacity]; + } + + public void open() { + clear(); + open = true; + } + + public void close() { + open = false; + clear(); + } + + /** + * Clears this queue. + */ + private void clear() { + Arrays.fill(items, null); + first = 0; + last = 0; + size = 0; + } + + /** + * Dequeues from this queue. + * + * @return <code>null</code>, if this queue is empty or the element is + * really <code>null</code>. + */ + public Object pop() { + if (size == 0) { + return null; + } + + Object ret = items[first]; + items[first] = null; + first = (first + 1) % items.length; + + size--; + + return ret; + } + + /** + * Enqueue into this queue. + */ + public boolean push(Object obj) { + if (!open) { + return false; + } + + if (size == items.length) { + // expand queue + final int oldLen = items.length; + Object[] tmp = new Object[oldLen * 2]; + + if (first < last) { + System.arraycopy(items, first, tmp, 0, last - first); + } else { + System.arraycopy(items, first, tmp, 0, oldLen - first); + System.arraycopy(items, 0, tmp, oldLen - first, last); + } + + first = 0; + last = oldLen; + items = tmp; + } + + items[last] = obj; + last = (last + 1) % items.length; + size++; + return true; + } + + /** + * Returns the first element of the queue. + * + * @return <code>null</code>, if the queue is empty, or the element is + * really <code>null</code>. + */ + public Object first() { + if (!open) { + return null; + } + + if (size == 0) { + return null; + } + + return items[first]; + } + + /** + * Returns <code>true</code> if the queue is empty. + */ + public boolean isEmpty() { + return (size == 0); + } + + /** + * Returns the number of elements in the queue. + */ + public int size() { + return size; + } +} Added: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java?view=auto&rev=109794 ============================================================================== --- (empty file) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java Sat Dec 4 01:56:28 2004 @@ -0,0 +1,145 @@ +/* + * Copyright 2004 The Apache Software Foundation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +/* + * @(#) $Id$ + */ +package org.apache.netty.downstream.impl.tcp; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.nio.channels.SelectionKey; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +import org.apache.commons.lang.Validate; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.netty.downstream.Acceptor; +import org.apache.netty.downstream.SessionHandler; + + +/** + * TODO Insert type comment. + * + * @author Trustin Lee ([EMAIL PROTECTED]) + * @version $Rev$, $Date$ + */ +public class TcpAcceptor implements Acceptor { + private static volatile int nextId = 0; + private static final Log log = LogFactory.getLog(TcpAcceptor.class); + private final int id = nextId++; + private final Selector selector; + private final Map channels = new HashMap(); + private Worker worker; + + /** + * Creates a new instance. + * @throws IOException + * + * + */ + public TcpAcceptor() throws IOException { + selector = Selector.open(); + } + + public void bind(SocketAddress address, SessionHandler defaultHandler) + throws IOException { + this.bind(address, 50, defaultHandler); + } + + public synchronized void bind(SocketAddress address, int backlog, + SessionHandler defaultHandler) + throws IOException { + Validate.notNull(address); + Validate.notNull(defaultHandler); + if (!(address instanceof InetSocketAddress)) + throw new IllegalArgumentException("Unexpected address type: " + + address.getClass()); + + ServerSocketChannel ssc = ServerSocketChannel.open(); + ssc.configureBlocking(false); + ssc.socket().bind(address, backlog); + ssc.register(selector, SelectionKey.OP_ACCEPT, defaultHandler); + + channels.put(address, ssc); + + if (worker == null) { + worker = new Worker(); + worker.start(); + } + } + + public synchronized void unbind(SocketAddress address) { + Validate.notNull(address); + + ServerSocketChannel ssc = (ServerSocketChannel) channels.get(address); + + if (ssc == null) + throw new IllegalArgumentException("Unknown address: " + address); + + SelectionKey key = ssc.keyFor(selector); + key.cancel(); + channels.remove(address); + + try { + ssc.close(); + } catch (IOException e) { + log.error("Unexpected exception", e); + } + } + + private class Worker extends Thread { + public Worker() { + super("TcpAcceptor-" + id); + } + + public void run() { + for (;;) { + try { + int nKeys = selector.select(); + + if (nKeys == 0) + continue; + + Iterator it = selector.selectedKeys().iterator(); + while (it.hasNext()) { + SelectionKey key = (SelectionKey) it.next(); + it.remove(); + + if (!key.isAcceptable()) + continue; + + ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); + SocketChannel ch = ssc.accept(); + if (ch == null) + continue; + + TcpSession session = new TcpSession(ch, (SessionHandler) key.attachment()); + session.start(); + } + } catch (IOException e) { + log.error("Unexpected exception.", e); + } + } + } + } +} Added: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java?view=auto&rev=109794 ============================================================================== --- (empty file) +++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java Sat Dec 4 01:56:28 2004 @@ -0,0 +1,98 @@ +/* + * @(#) $Id$ + */ +package org.apache.netty.downstream.impl.tcp; + +import java.net.SocketAddress; +import java.nio.ByteBuffer; +import java.nio.channels.SocketChannel; + +import org.apache.commons.lang.Validate; +import org.apache.netty.common.SessionConfig; +import org.apache.netty.downstream.Session; +import org.apache.netty.downstream.SessionHandler; + +/** + * TODO Insert type comment. + * + * @author Trustin Lee ([EMAIL PROTECTED]) + * @version $Rev$, $Date$ + */ +class TcpSession implements Session { + + private final SocketChannel ch; + private final SessionConfig config = new SimpleSessionConfig(); + private SessionHandler handler; + + /** + * Creates a new instance. + * + * + */ + TcpSession(SocketChannel ch, SessionHandler defaultHandler) { + this.ch = ch; + this.handler = defaultHandler; + } + + public SessionHandler getHandler() { + return handler; + } + + public void setHandler(SessionHandler handler) { + Validate.notNull(handler); + this.handler = handler; + } + + public void close() { + } + + public ByteBuffer getWriteBuffer() { + return null; + } + + public void setMark(Object mark) { + } + + public void flush() { + } + + public boolean isConnected() { + return ch.isConnected(); + } + + public boolean isClosed() { + return !ch.isConnected(); + } + + public SessionConfig getConfig() { + return config; + } + + public SocketAddress getRemoteAddress() { + return ch.socket().getRemoteSocketAddress(); + } + + public SocketAddress getLocalAddress() { + return ch.socket().getLocalSocketAddress(); + } + + public long getReadBytes() { + return readBytes; + } + + public long getWrittenBytes() { + return writtenBytes; + } + + public long getLastIoTime() { + return Math.max(lastReadtime, lastWriteTime); + } + + public long getLastReadTime() { + return lastReadTime; + } + + public long getLastWriteTime() { + return lastWriteTime; + } +}
