Author: trustin
Date: Wed Nov 7 17:22:06 2007
New Revision: 592993
URL: http://svn.apache.org/viewvc?rev=592993&view=rev
Log:
Applied Generics to IoProcessor
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
mina/trunk/core/src/main/java/org/apache/mina/common/IoProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketSession.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java
mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprSessionImpl.java
mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSessionImpl.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java?rev=592993&r1=592992&r2=592993&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoProcessor.java
Wed Nov 7 17:22:06 2007
@@ -37,7 +37,7 @@
* @author The Apache MINA Project ([EMAIL PROTECTED])
* @version $Rev$, $Date$
*/
-public abstract class AbstractIoProcessor implements IoProcessor {
+public abstract class AbstractIoProcessor<T extends AbstractIoSession>
implements IoProcessor<T> {
/**
* The maximum loop count for a write operation until
@@ -51,14 +51,10 @@
private final String threadName;
private final Executor executor;
- private final Queue<AbstractIoSession> newSessions =
- new ConcurrentLinkedQueue<AbstractIoSession>();
- private final Queue<AbstractIoSession> removingSessions =
- new ConcurrentLinkedQueue<AbstractIoSession>();
- private final Queue<AbstractIoSession> flushingSessions =
- new ConcurrentLinkedQueue<AbstractIoSession>();
- private final Queue<AbstractIoSession> trafficControllingSessions =
- new ConcurrentLinkedQueue<AbstractIoSession>();
+ private final Queue<T> newSessions = new ConcurrentLinkedQueue<T>();
+ private final Queue<T> removingSessions = new ConcurrentLinkedQueue<T>();
+ private final Queue<T> flushingSessions = new ConcurrentLinkedQueue<T>();
+ private final Queue<T> trafficControllingSessions = new
ConcurrentLinkedQueue<T>();
private Worker worker;
private long lastIdleCheckTime;
@@ -78,11 +74,11 @@
protected abstract void wakeup();
- protected abstract Iterator<AbstractIoSession> allSessions() throws
Exception;
+ protected abstract Iterator<T> allSessions() throws Exception;
- protected abstract Iterator<AbstractIoSession> selectedSessions() throws
Exception;
+ protected abstract Iterator<T> selectedSessions() throws Exception;
- protected abstract SessionState state(IoSession session);
+ protected abstract SessionState state(T session);
/**
@@ -91,7 +87,7 @@
* @return true is ready, false if not ready
* @throws Exception if some low level IO error occurs
*/
- protected abstract boolean isWritable(IoSession session) throws Exception;
+ protected abstract boolean isWritable(T session) throws Exception;
/**
* Is the session ready for reading
@@ -99,14 +95,14 @@
* @return true is ready, false if not ready
* @throws Exception if some low level IO error occurs
*/
- protected abstract boolean isReadable(IoSession session) throws Exception;
+ protected abstract boolean isReadable(T session) throws Exception;
/**
* register a session for writing
* @param session the session registered
* @param value true for registering, false for removing
* @throws Exception if some low level IO error occurs
*/
- protected abstract void setOpWrite(IoSession session,boolean value) throws
Exception;
+ protected abstract void setOpWrite(T session, boolean value) throws
Exception;
/**
* register a session for reading
@@ -114,7 +110,7 @@
* @param value true for registering, false for removing
* @throws Exception if some low level IO error occurs
*/
- protected abstract void setOpRead(IoSession session,boolean value) throws
Exception;
+ protected abstract void setOpRead(T session, boolean value) throws
Exception;
/**
* is this session registered for reading
@@ -122,7 +118,7 @@
* @return true is registered for reading
* @throws Exception if some low level IO error occurs
*/
- protected abstract boolean isOpRead(IoSession session) throws Exception;
+ protected abstract boolean isOpRead(T session) throws Exception;
/**
* is this session registered for writing
@@ -130,37 +126,37 @@
* @return true is registered for writing
* @throws Exception if some low level IO error occurs
*/
- protected abstract boolean isOpWrite(IoSession session) throws Exception;
+ protected abstract boolean isOpWrite(T session) throws Exception;
- protected abstract void doAdd(IoSession session) throws Exception;
+ protected abstract void doAdd(T session) throws Exception;
- protected abstract void doRemove(IoSession session) throws Exception;
+ protected abstract void doRemove(T session) throws Exception;
- protected abstract int read(IoSession session, IoBuffer buf) throws
Exception;
+ protected abstract int read(T session, IoBuffer buf) throws Exception;
- protected abstract int write(IoSession session, IoBuffer buf) throws
Exception;
+ protected abstract int write(T session, IoBuffer buf) throws Exception;
- protected abstract long transferFile(IoSession session, FileRegion region)
throws Exception;
+ protected abstract long transferFile(T session, FileRegion region) throws
Exception;
- public void add(IoSession session) {
- newSessions.add((AbstractIoSession) session);
+ public void add(T session) {
+ newSessions.add(session);
startupWorker();
}
- public void remove(IoSession session) {
- scheduleRemove((AbstractIoSession) session);
+ public void remove(T session) {
+ scheduleRemove(session);
startupWorker();
}
- public void flush(IoSession session) {
+ public void flush(T session) {
boolean needsWakeup = flushingSessions.isEmpty();
- if (scheduleFlush((AbstractIoSession) session) && needsWakeup) {
+ if (scheduleFlush(session) && needsWakeup) {
wakeup();
}
}
- public void updateTrafficMask(IoSession session) {
- scheduleTrafficControl((AbstractIoSession) session);
+ public void updateTrafficMask(T session) {
+ scheduleTrafficControl(session);
wakeup();
}
@@ -174,11 +170,11 @@
wakeup();
}
- private void scheduleRemove(AbstractIoSession session) {
+ private void scheduleRemove(T session) {
removingSessions.add(session);
}
- private boolean scheduleFlush(AbstractIoSession session) {
+ private boolean scheduleFlush(T session) {
if (session.setScheduledForFlush(true)) {
flushingSessions.add(session);
return true;
@@ -186,14 +182,14 @@
return false;
}
- private void scheduleTrafficControl(AbstractIoSession session) {
+ private void scheduleTrafficControl(T session) {
trafficControllingSessions.add(session);
}
private int add() {
int addedSessions = 0;
for (; ;) {
- AbstractIoSession session = newSessions.poll();
+ T session = newSessions.poll();
if (session == null) {
break;
@@ -236,7 +232,7 @@
private int remove() {
int removedSessions = 0;
for (; ;) {
- AbstractIoSession session = removingSessions.poll();
+ T session = removingSessions.poll();
if (session == null) {
break;
@@ -272,13 +268,13 @@
}
private void process() throws Exception {
- for (Iterator<AbstractIoSession> i = selectedSessions(); i.hasNext();)
{
+ for (Iterator<T> i = selectedSessions(); i.hasNext();) {
process(i.next());
i.remove();
}
}
- private void process(AbstractIoSession session) throws Exception {
+ private void process(T session) throws Exception {
if (isReadable(session) && session.getTrafficMask().isReadable()) {
read(session);
@@ -289,7 +285,7 @@
}
}
- private void read(AbstractIoSession session) {
+ private void read(T session) {
IoSessionConfig config = session.getConfig();
IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());
@@ -344,9 +340,9 @@
}
}
- private void flush() {
+ private void write() {
for (; ;) {
- AbstractIoSession session = flushingSessions.poll();
+ T session = flushingSessions.poll();
if (session == null) {
break;
@@ -363,7 +359,7 @@
switch (state) {
case OPEN:
try {
- boolean flushedAll = flush(session);
+ boolean flushedAll = write(session);
if (flushedAll &&
!session.getWriteRequestQueue().isEmpty(session) &&
!session.isScheduledForFlush()) {
scheduleFlush(session);
@@ -387,7 +383,7 @@
}
}
- private void clearWriteRequestQueue(AbstractIoSession session) {
+ private void clearWriteRequestQueue(T session) {
WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
WriteRequest req;
@@ -427,7 +423,7 @@
}
}
- private boolean flush(AbstractIoSession session) throws Exception {
+ private boolean write(T session) throws Exception {
if (!session.isConnected()) {
scheduleRemove(session);
return false;
@@ -502,7 +498,7 @@
private void updateTrafficMask() {
for (; ;) {
- AbstractIoSession session = trafficControllingSessions.poll();
+ T session = trafficControllingSessions.poll();
if (session == null) {
break;
@@ -558,7 +554,7 @@
process();
}
- flush();
+ write();
nSessions -= remove();
notifyIdleSessions();
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java?rev=592993&r1=592992&r2=592993&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/AbstractIoSession.java
Wed Nov 7 17:22:06 2007
@@ -98,6 +98,7 @@
closeFuture.addListener(SCHEDULED_COUNTER_RESETTER);
}
+ @SuppressWarnings("unchecked")
protected abstract IoProcessor getProcessor();
public boolean isConnected() {
@@ -146,6 +147,7 @@
return closeFuture;
}
+ @SuppressWarnings("unchecked")
public CloseFuture closeOnFlush() {
getWriteRequestQueue().offer(this, CLOSE_REQUEST);
getProcessor().flush(this);
Modified:
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java?rev=592993&r1=592992&r2=592993&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/common/DefaultIoFilterChain.java
Wed Nov 7 17:22:06 2007
@@ -607,6 +607,7 @@
nextFilter.messageSent(session, writeRequest);
}
+ @SuppressWarnings("unchecked")
@Override
public void filterWrite(NextFilter nextFilter, IoSession session,
WriteRequest writeRequest) throws Exception {
@@ -638,19 +639,20 @@
}
}
+ @SuppressWarnings("unchecked")
@Override
public void filterClose(NextFilter nextFilter, IoSession session)
throws Exception {
- AbstractIoSession s = (AbstractIoSession) session;
- s.getProcessor().remove(s);
+ ((AbstractIoSession) session).getProcessor().remove(session);
}
+ @SuppressWarnings("unchecked")
@Override
public void filterSetTrafficMask(NextFilter nextFilter,
IoSession session, TrafficMask trafficMask) throws Exception {
AbstractIoSession s = (AbstractIoSession) session;
s.setTrafficMaskNow(trafficMask);
- s.getProcessor().updateTrafficMask(session);
+ s.getProcessor().updateTrafficMask(s);
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java?rev=592993&r1=592992&r2=592993&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/DummySession.java Wed
Nov 7 17:22:06 2007
@@ -47,7 +47,7 @@
private volatile IoService service;
- private volatile IoProcessor processor;
+ private volatile IoProcessor<IoSession> processor;
private volatile IoSessionConfig config = new AbstractIoSessionConfig() {
@Override
@@ -99,7 +99,7 @@
this.service = acceptor;
- this.processor = new IoProcessor() {
+ this.processor = new IoProcessor<IoSession>() {
public void add(IoSession session) {
}
@@ -226,11 +226,11 @@
}
@Override
- public IoProcessor getProcessor() {
+ public IoProcessor<IoSession> getProcessor() {
return processor;
}
- public void setProcessor(IoProcessor processor) {
+ public void setProcessor(IoProcessor<IoSession> processor) {
if (processor == null) {
throw new NullPointerException("processor");
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/common/IoProcessor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/common/IoProcessor.java?rev=592993&r1=592992&r2=592993&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/common/IoProcessor.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/common/IoProcessor.java Wed
Nov 7 17:22:06 2007
@@ -27,26 +27,28 @@
*
* @author Apache MINA Project ([EMAIL PROTECTED])
* @version $Rev$, $Date$
+ *
+ * @param <T> the type of the [EMAIL PROTECTED] IoSession} this processor can
handle
*/
-public interface IoProcessor {
+public interface IoProcessor<T extends IoSession> {
/**
* Adds the specified [EMAIL PROTECTED] session} to the I/O processor so
that
* the I/O processor starts to perform any I/O operations related
* with the [EMAIL PROTECTED] session}.
*/
- void add(IoSession session);
+ void add(T session);
/**
* Flushes the internal write request queue of the specified
* [EMAIL PROTECTED] session}.
*/
- void flush(IoSession session);
+ void flush(T session);
/**
* Controls the traffic of the specified [EMAIL PROTECTED] session} as
specified
* in [EMAIL PROTECTED] IoSession#getTrafficMask()}.
*/
- void updateTrafficMask(IoSession session);
+ void updateTrafficMask(T session);
/**
* Removes and closes the specified [EMAIL PROTECTED] session} from the I/O
@@ -54,5 +56,5 @@
* associated with the [EMAIL PROTECTED] session} and releases any other
related
* resources.
*/
- void remove(IoSession session);
+ void remove(T session);
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java?rev=592993&r1=592992&r2=592993&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramAcceptor.java
Wed Nov 7 17:22:06 2007
@@ -65,7 +65,7 @@
private final Executor executor;
private final int id = nextId++;
private final Selector selector;
- private final IoProcessor processor = new DatagramAcceptorProcessor();
+ private final IoProcessor<NioSession> processor = new
DatagramAcceptorProcessor();
private final Queue<ServiceOperationFuture> registerQueue = new
ConcurrentLinkedQueue<ServiceOperationFuture>();
private final Queue<ServiceOperationFuture> cancelQueue = new
ConcurrentLinkedQueue<ServiceOperationFuture>();
private final Queue<NioDatagramSession> flushingSessions = new
ConcurrentLinkedQueue<NioDatagramSession>();
@@ -230,16 +230,16 @@
return super.getListeners();
}
- IoProcessor getProcessor() {
+ IoProcessor<NioSession> getProcessor() {
return processor;
}
- private class DatagramAcceptorProcessor implements IoProcessor {
+ private class DatagramAcceptorProcessor implements IoProcessor<NioSession>
{
- public void add(IoSession session) {
+ public void add(NioSession session) {
}
- public void flush(IoSession session) {
+ public void flush(NioSession session) {
if (scheduleFlush((NioDatagramSession) session)) {
Selector selector = NioDatagramAcceptor.this.selector;
if (selector != null) {
@@ -248,11 +248,11 @@
}
}
- public void remove(IoSession session) {
+ public void remove(NioSession session) {
getListeners().fireSessionDestroyed(session);
}
- public void updateTrafficMask(IoSession session) {
+ public void updateTrafficMask(NioSession session) {
}
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java?rev=592993&r1=592992&r2=592993&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramConnector.java
Wed Nov 7 17:22:06 2007
@@ -29,7 +29,6 @@
import org.apache.mina.common.DefaultConnectFuture;
import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IoConnector;
-import org.apache.mina.common.IoSession;
import org.apache.mina.common.TransportMetadata;
import org.apache.mina.transport.socket.DatagramConnector;
import org.apache.mina.transport.socket.DatagramSessionConfig;
@@ -123,7 +122,7 @@
ch.connect(remoteAddress);
NioProcessor processor = nextProcessor();
- final IoSession session = new NioDatagramSession(this, ch,
processor);
+ final NioSession session = new NioDatagramSession(this, ch,
processor);
ConnectFuture future = new DefaultConnectFuture();
finishSessionInitialization(session, future);
processor.add(session);
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java?rev=592993&r1=592992&r2=592993&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioDatagramSession.java
Wed Nov 7 17:22:06 2007
@@ -70,7 +70,7 @@
private final InetSocketAddress remoteAddress;
- private final IoProcessor processor;
+ private final IoProcessor<NioSession> processor;
private SelectionKey key;
@@ -78,7 +78,7 @@
* Creates a new acceptor-side session instance.
*/
NioDatagramSession(IoService service,
- DatagramChannel ch, IoProcessor processor,
+ DatagramChannel ch, IoProcessor<NioSession> processor,
SocketAddress remoteAddress) {
this.service = service;
this.ch = ch;
@@ -95,7 +95,7 @@
* Creates a new connector-side session instance.
*/
NioDatagramSession(IoService service,
- DatagramChannel ch, IoProcessor processor) {
+ DatagramChannel ch, IoProcessor<NioSession> processor)
{
this(service, ch, processor, ch.socket().getRemoteSocketAddress());
}
@@ -104,7 +104,7 @@
}
@Override
- protected IoProcessor getProcessor() {
+ protected IoProcessor<NioSession> getProcessor() {
return processor;
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java?rev=592993&r1=592992&r2=592993&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioProcessor.java
Wed Nov 7 17:22:06 2007
@@ -29,11 +29,9 @@
import java.util.concurrent.Executor;
import org.apache.mina.common.AbstractIoProcessor;
-import org.apache.mina.common.AbstractIoSession;
import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.FileRegion;
import org.apache.mina.common.IoBuffer;
-import org.apache.mina.common.IoSession;
import org.apache.mina.common.RuntimeIoException;
/**
@@ -41,7 +39,7 @@
* @author Apache MINA Project ([EMAIL PROTECTED])
* @version $Rev$, $Date$
*/
-class NioProcessor extends AbstractIoProcessor {
+class NioProcessor extends AbstractIoProcessor<NioSession> {
protected final Selector selector;
@@ -76,28 +74,26 @@
}
@Override
- protected Iterator<AbstractIoSession> allSessions() throws Exception {
+ protected Iterator<NioSession> allSessions() throws Exception {
return new IoSessionIterator(selector.keys());
}
@Override
- protected Iterator<AbstractIoSession> selectedSessions() throws Exception {
+ protected Iterator<NioSession> selectedSessions() throws Exception {
return new IoSessionIterator(selector.selectedKeys());
}
@Override
- protected void doAdd(IoSession session) throws Exception {
- SelectableChannel ch = (SelectableChannel) getChannel(session);
+ protected void doAdd(NioSession session) throws Exception {
+ SelectableChannel ch = (SelectableChannel) session.getChannel();
ch.configureBlocking(false);
- setSelectionKey(
- session,
- ch.register(selector, SelectionKey.OP_READ, session));
+ session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ,
session));
}
@Override
- protected void doRemove(IoSession session) throws Exception {
- ByteChannel ch = getChannel(session);
- SelectionKey key = getSelectionKey(session);
+ protected void doRemove(NioSession session) throws Exception {
+ ByteChannel ch = session.getChannel();
+ SelectionKey key = session.getSelectionKey();
if (key != null) {
key.cancel();
}
@@ -105,8 +101,8 @@
}
@Override
- protected SessionState state(IoSession session) {
- SelectionKey key = getSelectionKey(session);
+ protected SessionState state(NioSession session) {
+ SelectionKey key = session.getSelectionKey();
if (key == null) {
return SessionState.PREPARING;
}
@@ -115,30 +111,30 @@
}
@Override
- protected boolean isReadable(IoSession session) throws Exception {
- SelectionKey key = getSelectionKey(session);
+ protected boolean isReadable(NioSession session) throws Exception {
+ SelectionKey key = session.getSelectionKey();
return key.isValid() && key.isReadable();
}
@Override
- protected boolean isWritable(IoSession session) throws Exception {
- SelectionKey key = getSelectionKey(session);
+ protected boolean isWritable(NioSession session) throws Exception {
+ SelectionKey key = session.getSelectionKey();
return key.isValid() && key.isWritable();
}
@Override
- protected boolean isOpRead(IoSession session) throws Exception {
- return (getSelectionKey(session).interestOps() & SelectionKey.OP_READ)
!= 0;
+ protected boolean isOpRead(NioSession session) throws Exception {
+ return (session.getSelectionKey().interestOps() &
SelectionKey.OP_READ) != 0;
}
@Override
- protected boolean isOpWrite(IoSession session) throws Exception {
- return (getSelectionKey(session).interestOps() &
SelectionKey.OP_WRITE) != 0;
+ protected boolean isOpWrite(NioSession session) throws Exception {
+ return (session.getSelectionKey().interestOps() &
SelectionKey.OP_WRITE) != 0;
}
@Override
- protected void setOpRead(IoSession session, boolean value) throws
Exception {
- SelectionKey key = getSelectionKey(session);
+ protected void setOpRead(NioSession session, boolean value) throws
Exception {
+ SelectionKey key = session.getSelectionKey();
if (value) {
key.interestOps(key.interestOps() | SelectionKey.OP_READ);
} else {
@@ -147,9 +143,9 @@
}
@Override
- protected void setOpWrite(IoSession session, boolean value)
+ protected void setOpWrite(NioSession session, boolean value)
throws Exception {
- SelectionKey key = getSelectionKey(session);
+ SelectionKey key = session.getSelectionKey();
if (value) {
key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);
} else {
@@ -158,45 +154,32 @@
}
@Override
- protected int read(IoSession session, IoBuffer buf) throws Exception {
- return getChannel(session).read(buf.buf());
+ protected int read(NioSession session, IoBuffer buf) throws Exception {
+ return session.getChannel().read(buf.buf());
}
@Override
- protected int write(IoSession session, IoBuffer buf) throws Exception {
- return getChannel(session).write(buf.buf());
+ protected int write(NioSession session, IoBuffer buf) throws Exception {
+ return session.getChannel().write(buf.buf());
}
@Override
- protected long transferFile(IoSession session, FileRegion region) throws
Exception {
- return region.getFileChannel().transferTo(region.getPosition(),
region.getCount(), getChannel(session));
+ protected long transferFile(NioSession session, FileRegion region) throws
Exception {
+ return region.getFileChannel().transferTo(region.getPosition(),
region.getCount(), session.getChannel());
}
- private ByteChannel getChannel(IoSession session) {
- return ((NioSession) session).getChannel();
- }
-
- private SelectionKey getSelectionKey(IoSession session) {
- return ((NioSession) session).getSelectionKey();
- }
-
- private void setSelectionKey(IoSession session, SelectionKey key) {
- ((NioSession) session).setSelectionKey(key);
- }
-
-
- protected static class IoSessionIterator implements
Iterator<AbstractIoSession> {
+ protected static class IoSessionIterator implements Iterator<NioSession> {
private final Iterator<SelectionKey> i;
private IoSessionIterator(Set<SelectionKey> keys) {
- i = keys.iterator();
+ i = keys.iterator();
}
public boolean hasNext() {
return i.hasNext();
}
- public AbstractIoSession next() {
+ public NioSession next() {
SelectionKey key = i.next();
- return (AbstractIoSession) key.attachment();
+ return (NioSession) key.attachment();
}
public void remove() {
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketSession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketSession.java?rev=592993&r1=592992&r2=592993&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketSession.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/socket/nio/NioSocketSession.java
Wed Nov 7 17:22:06 2007
@@ -59,7 +59,7 @@
private final SocketSessionConfig config = new SessionConfigImpl();
- private final IoProcessor processor;
+ private final IoProcessor<NioSession> processor;
private final IoFilterChain filterChain = new DefaultIoFilterChain(this);
@@ -71,7 +71,7 @@
private int readBufferSize = 1024;
- NioSocketSession(IoService service, IoProcessor processor, SocketChannel
ch) {
+ NioSocketSession(IoService service, IoProcessor<NioSession> processor,
SocketChannel ch) {
this.service = service;
this.processor = processor;
this.ch = ch;
@@ -88,7 +88,7 @@
}
@Override
- protected IoProcessor getProcessor() {
+ protected IoProcessor<NioSession> getProcessor() {
return processor;
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java?rev=592993&r1=592992&r2=592993&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeFilterChain.java
Wed Nov 7 17:22:06 2007
@@ -43,7 +43,7 @@
class VmPipeFilterChain extends DefaultIoFilterChain {
private final Queue<IoEvent> eventQueue = new
ConcurrentLinkedQueue<IoEvent>();
- private final IoProcessor processor = new VmPipeIoProcessor();
+ private final IoProcessor<VmPipeSessionImpl> processor = new
VmPipeIoProcessor();
private volatile boolean flushEnabled;
private volatile boolean sessionOpened;
@@ -52,7 +52,7 @@
super(session);
}
- IoProcessor getProcessor() {
+ IoProcessor<VmPipeSessionImpl> getProcessor() {
return processor;
}
@@ -166,18 +166,17 @@
pushEvent(new IoEvent(IoEventType.MESSAGE_RECEIVED, getSession(),
message));
}
- private class VmPipeIoProcessor implements IoProcessor {
- public void flush(IoSession session) {
- VmPipeSessionImpl s = (VmPipeSessionImpl) session;
- WriteRequestQueue queue = s.getWriteRequestQueue();
- if (queue.isEmpty(s)) {
+ private class VmPipeIoProcessor implements IoProcessor<VmPipeSessionImpl> {
+ public void flush(VmPipeSessionImpl session) {
+ WriteRequestQueue queue = session.getWriteRequestQueue();
+ if (queue.isEmpty(session)) {
return;
}
- if (s.isConnected()) {
- if (s.getLock().tryLock()) {
+ if (session.isConnected()) {
+ if (session.getLock().tryLock()) {
try {
WriteRequest req;
- while ((req = queue.poll(s)) != null) {
+ while ((req = queue.poll(session)) != null) {
Object message = req.getMessage();
Object messageCopy = message;
if (message instanceof IoBuffer) {
@@ -190,20 +189,20 @@
messageCopy = wb;
}
-
s.getRemoteSession().getFilterChain().fireMessageReceived(
+
session.getRemoteSession().getFilterChain().fireMessageReceived(
messageCopy);
- s.getFilterChain().fireMessageSent(req);
+ session.getFilterChain().fireMessageSent(req);
}
} finally {
- s.getLock().unlock();
+ session.getLock().unlock();
}
- flushPendingDataQueues(s);
+ flushPendingDataQueues(session);
}
} else {
List<WriteRequest> failedRequests = new
ArrayList<WriteRequest>();
WriteRequest req;
- while ((req = queue.poll(s)) != null) {
+ while ((req = queue.poll(session)) != null) {
failedRequests.add(req);
}
@@ -212,39 +211,37 @@
for (WriteRequest r: failedRequests) {
r.getFuture().setException(cause);
}
- s.getFilterChain().fireExceptionCaught(cause);
+ session.getFilterChain().fireExceptionCaught(cause);
}
}
}
- public void remove(IoSession session) {
- VmPipeSessionImpl s = (VmPipeSessionImpl) session;
+ public void remove(VmPipeSessionImpl session) {
try {
- s.getLock().lock();
+ session.getLock().lock();
if (!session.getCloseFuture().isClosed()) {
- s.getServiceListeners().fireSessionDestroyed(s);
- s.getRemoteSession().close();
+
session.getServiceListeners().fireSessionDestroyed(session);
+ session.getRemoteSession().close();
}
} finally {
- s.getLock().unlock();
+ session.getLock().unlock();
}
}
- public void add(IoSession session) {
+ public void add(VmPipeSessionImpl session) {
}
- public void updateTrafficMask(IoSession session) {
- VmPipeSessionImpl s = (VmPipeSessionImpl) session;
- if (s.getTrafficMask().isReadable()) {
+ public void updateTrafficMask(VmPipeSessionImpl session) {
+ if (session.getTrafficMask().isReadable()) {
List<Object> data = new ArrayList<Object>();
- s.receivedMessageQueue.drainTo(data);
+ session.receivedMessageQueue.drainTo(data);
for (Object aData : data) {
VmPipeFilterChain.this.fireMessageReceived(aData);
}
}
- if (s.getTrafficMask().isWritable()) {
- flush(s); // The second parameter is unused.
+ if (session.getTrafficMask().isWritable()) {
+ flush(session); // The second parameter is unused.
}
}
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java?rev=592993&r1=592992&r2=592993&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/vmpipe/VmPipeSessionImpl.java
Wed Nov 7 17:22:06 2007
@@ -110,7 +110,7 @@
}
@Override
- protected IoProcessor getProcessor() {
+ protected IoProcessor<VmPipeSessionImpl> getProcessor() {
return filterChain.getProcessor();
}
Modified:
mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java?rev=592993&r1=592992&r2=592993&view=diff
==============================================================================
---
mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java
(original)
+++
mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprIoProcessor.java
Wed Nov 7 17:22:06 2007
@@ -26,10 +26,8 @@
import java.util.concurrent.Executor;
import org.apache.mina.common.AbstractIoProcessor;
-import org.apache.mina.common.AbstractIoSession;
import org.apache.mina.common.FileRegion;
import org.apache.mina.common.IoBuffer;
-import org.apache.mina.common.IoSession;
import org.apache.tomcat.jni.Error;
import org.apache.tomcat.jni.Poll;
import org.apache.tomcat.jni.Pool;
@@ -45,9 +43,9 @@
* @version $Rev$, $Date$
*/
-class AprIoProcessor extends AbstractIoProcessor {
+class AprIoProcessor extends AbstractIoProcessor<AprSessionImpl> {
- protected static class IoSessionIterator implements
Iterator<AbstractIoSession> {
+ protected static class IoSessionIterator implements
Iterator<AprSessionImpl> {
private final Iterator<AprSessionImpl> i;
private IoSessionIterator(Collection<AprSessionImpl> sessions) {
i = sessions.iterator();
@@ -56,7 +54,7 @@
return i.hasNext();
}
- public AbstractIoSession next() {
+ public AprSessionImpl next() {
AprSessionImpl sess = i.next();
return sess;
}
@@ -66,7 +64,7 @@
}
}
- protected class PollSetIterator implements Iterator<AbstractIoSession> {
+ protected class PollSetIterator implements Iterator<AprSessionImpl> {
private long[] pollResult;
int index=0;
@@ -78,10 +76,10 @@
return index*2< pollResult.length;
}
- public AbstractIoSession next() {
- AbstractIoSession sess=managedSessions.get(pollResult[index*2+1]);
+ public AprSessionImpl next() {
+ AprSessionImpl sess=managedSessions.get(pollResult[index*2+1]);
index++;
- System.err.println("sess :
"+((AprSessionImpl)sess).getAPRSocket());
+ System.err.println("sess : "+ sess.getAPRSocket());
return sess;
}
@@ -122,15 +120,14 @@
}
@Override
- protected Iterator<AbstractIoSession> allSessions() throws Exception {
+ protected Iterator<AprSessionImpl> allSessions() throws Exception {
return new IoSessionIterator(managedSessions.values());
}
@Override
- protected void doAdd(IoSession sess) throws Exception {
+ protected void doAdd(AprSessionImpl session) throws Exception {
logger.debug("doAdd");
- AprSessionImpl session = (AprSessionImpl) sess;
int rv;
rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN);
if (rv == Status.APR_SUCCESS) {
@@ -142,17 +139,16 @@
}
@Override
- protected void doRemove(IoSession session) throws Exception {
+ protected void doRemove(AprSessionImpl session) throws Exception {
logger.debug("doRemove");
- int ret=Poll.remove(pollset, ((AprSessionImpl)session).getAPRSocket());
+ int ret=Poll.remove(pollset, session.getAPRSocket());
if(ret!=Status.APR_SUCCESS) {
logger.error("removing of pollset error");
}
- ret=Socket.close(((AprSessionImpl)session).getAPRSocket());
+ ret=Socket.close(session.getAPRSocket());
if(ret!=Status.APR_SUCCESS) {
logger.error("closing socket error");
}
-
}
@Override
@@ -163,25 +159,21 @@
}
@Override
- protected boolean isOpRead(IoSession sess) throws Exception {
- logger.debug("isOpRead?");
- AprSessionImpl session=(AprSessionImpl)sess;
+ protected boolean isOpRead(AprSessionImpl session) throws Exception {
logger.debug("isOpRead : "+session.isOpRead());
return session.isOpRead();
}
@Override
- protected boolean isOpWrite(IoSession sess) throws Exception {
- logger.debug("isOpWrite?");
- AprSessionImpl session=(AprSessionImpl)sess;
+ protected boolean isOpWrite(AprSessionImpl session) throws Exception {
logger.debug("isOpWrite : "+session.isOpWrite());
return session.isOpWrite();
}
@Override
- protected boolean isReadable(IoSession session) throws Exception {
+ protected boolean isReadable(AprSessionImpl session) throws Exception {
logger.debug("isReadable?");
- long socket=((AprSessionImpl)session).getAPRSocket();
+ long socket= session.getAPRSocket();
for(int i=0;i<pollResult.length/2;i++) {
if(pollResult[i*2+1]==socket) {
if( (pollResult[i*2]&Poll.APR_POLLIN) >0 ) {
@@ -198,9 +190,8 @@
}
@Override
- protected boolean isWritable(IoSession session) throws Exception {
- logger.debug("isWritable?");
- long socket=((AprSessionImpl)session).getAPRSocket();
+ protected boolean isWritable(AprSessionImpl session) throws Exception {
+ long socket= session.getAPRSocket();
for(int i=0;i<pollResult.length/2;i++) {
if(pollResult[i*2+1]==socket) {
if( (pollResult[i*2]&Poll.APR_POLLOUT) >0 ) {
@@ -217,10 +208,7 @@
}
@Override
- protected int read(IoSession sess, IoBuffer buffer) throws Exception {
- logger.debug("read");
- AprSessionImpl session=(AprSessionImpl)sess;
-
+ protected int read(AprSessionImpl session, IoBuffer buffer) throws
Exception {
byte[] buf = session.getReadBuffer();
// FIXME : hardcoded read value for testing
int bytes = Socket.recv(session.getAPRSocket(), buf, 0, 1024);
@@ -253,14 +241,13 @@
}
@Override
- protected Iterator<AbstractIoSession> selectedSessions() throws Exception {
+ protected Iterator<AprSessionImpl> selectedSessions() throws Exception {
return new PollSetIterator(pollResult);
}
@Override
- protected void setOpRead(IoSession sess, boolean value) throws Exception {
+ protected void setOpRead(AprSessionImpl session, boolean value) throws
Exception {
logger.debug("setOpRead : "+value);
- AprSessionImpl session=(AprSessionImpl)sess;
int rv = Poll.remove(pollset, session.getAPRSocket());
if (rv != Status.APR_SUCCESS) {
System.err.println("poll.remove Error : " + Error.strerror(rv));
@@ -279,10 +266,9 @@
}
@Override
- protected void setOpWrite(IoSession sess, boolean value)
+ protected void setOpWrite(AprSessionImpl session, boolean value)
throws Exception {
logger.debug("setOpWrite : "+value);
- AprSessionImpl session=(AprSessionImpl)sess;
int rv = Poll.remove(pollset, session.getAPRSocket());
if (rv != Status.APR_SUCCESS) {
System.err.println("poll.remove Error : " + Error.strerror(rv));
@@ -301,9 +287,9 @@
}
@Override
- protected SessionState state(IoSession session) {
+ protected SessionState state(AprSessionImpl session) {
logger.debug("state?");
- long socket=((AprSessionImpl)session).getAPRSocket();
+ long socket=session.getAPRSocket();
if(socket>0)
return SessionState.OPEN;
else if(managedSessions.get(socket)!=null)
@@ -313,7 +299,7 @@
}
@Override
- protected long transferFile(IoSession session, FileRegion region)
+ protected long transferFile(AprSessionImpl session, FileRegion region)
throws Exception {
throw new UnsupportedOperationException("Not supposed for APR (TODO)");
}
@@ -326,7 +312,7 @@
}
@Override
- protected int write(IoSession session, IoBuffer buf) throws Exception {
+ protected int write(AprSessionImpl session, IoBuffer buf) throws Exception
{
logger.debug("write");
// be sure APR_SO_NONBLOCK was set, or it will block
int toWrite = buf.remaining();
@@ -334,10 +320,10 @@
int writtenBytes;
// APR accept ByteBuffer, only if they are Direct ones, due to native
code
if (buf.isDirect()) {
- writtenBytes = Socket.sendb(
((AprSessionImpl)session).getAPRSocket(), buf.buf(),
+ writtenBytes = Socket.sendb( session.getAPRSocket(), buf.buf(),
0, toWrite);
} else {
- writtenBytes = Socket.send(
((AprSessionImpl)session).getAPRSocket(), buf.array(),
+ writtenBytes = Socket.send( session.getAPRSocket(), buf.array(),
0, toWrite);
// FIXME : kludgy ?
buf.position(buf.position() + writtenBytes);
Modified:
mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprSessionImpl.java
URL:
http://svn.apache.org/viewvc/mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprSessionImpl.java?rev=592993&r1=592992&r2=592993&view=diff
==============================================================================
---
mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprSessionImpl.java
(original)
+++
mina/trunk/transport-apr/src/main/java/org/apache/mina/transport/socket/apr/AprSessionImpl.java
Wed Nov 7 17:22:06 2007
@@ -22,9 +22,9 @@
import java.net.InetSocketAddress;
import org.apache.mina.common.AbstractIoSession;
-import org.apache.mina.common.IoBuffer;
import org.apache.mina.common.DefaultIoFilterChain;
import org.apache.mina.common.DefaultTransportMetadata;
+import org.apache.mina.common.IoBuffer;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoProcessor;
@@ -221,7 +221,7 @@
}
@Override
- protected IoProcessor getProcessor() {
+ protected IoProcessor<AprSessionImpl> getProcessor() {
return ioProcessor;
}
Modified:
mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSessionImpl.java
URL:
http://svn.apache.org/viewvc/mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSessionImpl.java?rev=592993&r1=592992&r2=592993&view=diff
==============================================================================
---
mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSessionImpl.java
(original)
+++
mina/trunk/transport-serial/src/main/java/org/apache/mina/transport/serial/SerialSessionImpl.java
Wed Nov 7 17:22:06 2007
@@ -29,16 +29,15 @@
import java.util.TooManyListenersException;
import org.apache.mina.common.AbstractIoSession;
-import org.apache.mina.common.IoBuffer;
import org.apache.mina.common.DefaultIoFilterChain;
import org.apache.mina.common.DefaultTransportMetadata;
import org.apache.mina.common.ExceptionMonitor;
import org.apache.mina.common.IdleStatusChecker;
+import org.apache.mina.common.IoBuffer;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoProcessor;
import org.apache.mina.common.IoService;
-import org.apache.mina.common.IoSession;
import org.apache.mina.common.TransportMetadata;
import org.apache.mina.common.WriteRequest;
import org.slf4j.Logger;
@@ -51,7 +50,7 @@
* @version $Rev$, $Date$
*/
class SerialSessionImpl extends AbstractIoSession implements
- SerialSession, SerialPortEventListener, IoProcessor {
+ SerialSession, SerialPortEventListener, IoProcessor<SerialSessionImpl>
{
private SerialSessionConfig config = new DefaultSerialSessionConfig();
@@ -234,14 +233,14 @@
}
@Override
- protected IoProcessor getProcessor() {
+ protected IoProcessor<SerialSessionImpl> getProcessor() {
return this;
}
- public void add(IoSession session) {
+ public void add(SerialSessionImpl session) {
}
- public void flush(IoSession session) {
+ public void flush(SerialSessionImpl session) {
if (writeWorker == null) {
writeWorker = new WriteWorker();
writeWorker.start();
@@ -252,7 +251,7 @@
}
}
- public void remove(IoSession session) {
+ public void remove(SerialSessionImpl session) {
try {
inputStream.close();
} catch (IOException e) {
@@ -274,7 +273,7 @@
this);
}
- public void updateTrafficMask(IoSession session) {
+ public void updateTrafficMask(SerialSessionImpl session) {
throw new UnsupportedOperationException();
}
}