Author: jvermillard
Date: Sun Jul 10 20:10:38 2011
New Revision: 1144953
URL: http://svn.apache.org/viewvc?rev=1144953&view=rev
Log:
created, open and read even pushed using filter chain
Modified:
mina/branches/3.0/core/src/main/java/org/apache/mina/api/IoFilterChain.java
mina/branches/3.0/core/src/main/java/org/apache/mina/api/IoSession.java
mina/branches/3.0/core/src/main/java/org/apache/mina/filterchain/DefaultIoFilterChain.java
mina/branches/3.0/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
Modified:
mina/branches/3.0/core/src/main/java/org/apache/mina/api/IoFilterChain.java
URL:
http://svn.apache.org/viewvc/mina/branches/3.0/core/src/main/java/org/apache/mina/api/IoFilterChain.java?rev=1144953&r1=1144952&r2=1144953&view=diff
==============================================================================
--- mina/branches/3.0/core/src/main/java/org/apache/mina/api/IoFilterChain.java
(original)
+++ mina/branches/3.0/core/src/main/java/org/apache/mina/api/IoFilterChain.java
Sun Jul 10 20:10:38 2011
@@ -121,6 +121,12 @@ public interface IoFilterChain {
void processSessionCreated(IoSession session);
/**
+ * Call this method for processing a session open event using this chain.
+ * @param session {@link IoSession} the opened session
+ */
+ void processSessionOpen(IoSession session);
+
+ /**
* Call this method for processing a received message using this chain.
* This processing is done in reverse order.
* @param session {@link IoSession} associated with this message
Modified:
mina/branches/3.0/core/src/main/java/org/apache/mina/api/IoSession.java
URL:
http://svn.apache.org/viewvc/mina/branches/3.0/core/src/main/java/org/apache/mina/api/IoSession.java?rev=1144953&r1=1144952&r2=1144953&view=diff
==============================================================================
--- mina/branches/3.0/core/src/main/java/org/apache/mina/api/IoSession.java
(original)
+++ mina/branches/3.0/core/src/main/java/org/apache/mina/api/IoSession.java Sun
Jul 10 20:10:38 2011
@@ -53,239 +53,245 @@ import org.apache.mina.session.WriteRequ
*/
public interface IoSession {
- /**
- * The unique identifier of this session.
- *
- * @return the session's unique identifier
- */
- long getId();
-
- /* ADDRESSES */
-
- /**
- * Returns the socket address of remote peer.
- *
- * @return the remote socket address
- */
- SocketAddress getRemoteAddress();
-
- /**
- * Gets the local address of the local peer.
- *
- * @return the socket address of local machine which is associated with
this
- * session.
- */
- SocketAddress getLocalAddress();
-
- /**
- * Gets the service this session is attached to.
- *
- * @return the {@link IoService} which provides {@link IoSession} to
this
- * session.
- */
- IoService getService();
-
- /* READ / WRITE / CLOSE */
- /**
- * Tells if the session is currently connected and able to process
incoming
- * requests and to send outgoing responses.
- *
- * @return <code>true</code> if this session is connected with remote
peer.
- */
- boolean isConnected();
-
- /**
- * Tells if the session is being closed, but is not yet in Closed state.
- *
- * @return <code>true</tt> if and only if this session is being closed
- * (but not disconnected yet) or is closed.
- */
- boolean isClosing();
-
- /**
- * Closes this session immediately or after all queued write requests
are
- * flushed. This operation is asynchronous. Wait for the returned
- * {@link IoFuture} if you want to wait for the session actually closed.
- * Once this method has been called, no incoming request will be
accepted.
- *
- * @param immediately
- * {@code true} to close this session immediately. {@code
false}
- * to close this session after all queued write requests are
- * flushed.
- * @return A {@link IoFuture} that will contains the session's state
- */
- IoFuture<Void> close(boolean immediately);
-
- /* READ/WRITE PAUSE MANAGEMENT */
- /**
- * Suspends read operations for this session.
- */
- void suspendRead();
-
- /**
- * Suspends write operations for this session.
- */
- void suspendWrite();
-
- /**
- * Resumes read operations for this session.
- */
- void resumeRead();
-
- /**
- * Resumes write operations for this session.
- */
- void resumeWrite();
-
- /**
- * Is read operation is suspended for this session.
- *
- * @return <code>true</code> if suspended
- */
- boolean isReadSuspended();
-
- /**
- * Is write operation is suspended for this session.
- *
- * @return <code>true</code> if suspended
- */
- boolean isWriteSuspended();
-
- /* BASIC STATS */
- /**
- * Gets the total number of bytes read for this session since it was
- * created.
- *
- * Returns the total number of bytes which were read from this session.
- */
- long getReadBytes();
-
- /**
- * Gets the total number of bytes written for this session since it was
- * created.
- *
- * @return the total number of bytes which were written to this session.
- */
- long getWrittenBytes();
-
- /* IDLE management */
- /**
- * Gets the session configuration, it where the idle timeout are set and
- * other transport specific configuration.
- *
- * @return the session's configuration
- */
- IoSessionConfig getConfig();
-
- /**
- * The session's creation time.
- *
- * @return the session's creation time in milliseconds
- */
- long getCreationTime();
-
- /**
- * Returns the time in millisecond when I/O occurred lastly (either
read or
- * write).
- *
- * @return the time of the last read or write done for this session
- */
- long getLastIoTime();
-
- /**
- * Returns the time in millisecond when the last I/O read occurred.
- *
- * Returns the time in millisecond when read operation occurred lastly.
- */
- long getLastReadTime();
-
- /**
- * Returns the time in millisecond when the last I/O write occurred.
- *
- * Returns the time in millisecond when write operation occurred lastly.
- */
- long getLastWriteTime();
-
- /* Session context management */
- /**
- * Returns the value of the user-defined attribute for this session.
- *
- * @param name
- * the attribute's name
- * @return <tt>null</tt> if there is no attribute with the specified
name
- */
- Object getAttribute(Object name);
-
- /**
- * Sets a user-defined attribute.
- *
- * @param name
- * the attribute's name
- * @param value
- * the attribute's value
- * @return The old attribute's value. <tt>null</tt> if there is no
previous
- * value or if the value is null
- */
- Object setAttribute(Object name, Object value);
-
- /**
- * Removes a user-defined attribute with the specified name.
- *
- * @param name
- * the attribute's name
- * @return The old attribute's value. <tt>null</tt> if not found or if
the
- * attribute had no value
- */
- Object removeAttribute(Object name);
-
- /**
- * Tells if the session has an attached attribute.
- *
- * @return <tt>true</tt> if this session contains the attribute with the
- * specified <tt>name</tt>.
- */
- boolean containsAttribute(Object name);
-
- /**
- * Gets the set of attributes stored within the session.
- *
- * @return the set of names of all user-defined attributes.
- */
- Set<Object> getAttributeNames();
-
- SessionState getState();
-
- /**
- * State of a {@link IoSession}
- *
- * @author <a href="http://mina.apache.org">Apache MINA Project</a>
- *
- */
- public enum SessionState {
- CREATED, CONNECTED, CLOSING, CLOSED
- }
-
- /* SESSION WRITING */
- /**
- * Enqueue a message for writing. This method wont block ! The message
will
- * by asynchronously processed by the filter chain and wrote to socket
by
- * the {@link SelectorProcessor}.
- *
- */
- public void write(Object message);
-
- /**
- * Same as {@link IoSession#write(Object)}, but provide a
- * {@link IoFuture} for tracking the completion of this write.
- *
- * @param message the message to be processed and written
- * @return the {@link IoFuture} for tracking this asynchronous operation
- */
- public IoFuture<Void> writeWithFuture(Object message);
-
- /**
- * Get the {@link Queue} of this session. The write queue contains the
pending writes.
- * @return the write queue of this session
- */
- public Queue<WriteRequest> getWriteQueue();
+ /**
+ * The unique identifier of this session.
+ *
+ * @return the session's unique identifier
+ */
+ long getId();
+
+ /* ADDRESSES */
+
+ /**
+ * Returns the socket address of remote peer.
+ *
+ * @return the remote socket address
+ */
+ SocketAddress getRemoteAddress();
+
+ /**
+ * Gets the local address of the local peer.
+ *
+ * @return the socket address of local machine which is associated with
this
+ * session.
+ */
+ SocketAddress getLocalAddress();
+
+ /**
+ * Gets the service this session is attached to.
+ *
+ * @return the {@link IoService} which provides {@link IoSession} to this
+ * session.
+ */
+ IoService getService();
+
+ /* READ / WRITE / CLOSE */
+ /**
+ * Tells if the session is currently connected and able to process incoming
+ * requests and to send outgoing responses.
+ *
+ * @return <code>true</code> if this session is connected with remote peer.
+ */
+ boolean isConnected();
+
+ /**
+ * Tells if the session is being closed, but is not yet in Closed state.
+ *
+ * @return <code>true</tt> if and only if this session is being closed
+ * (but not disconnected yet) or is closed.
+ */
+ boolean isClosing();
+
+ /**
+ * Closes this session immediately or after all queued write requests are
+ * flushed. This operation is asynchronous. Wait for the returned
+ * {@link IoFuture} if you want to wait for the session actually closed.
+ * Once this method has been called, no incoming request will be accepted.
+ *
+ * @param immediately
+ * {@code true} to close this session immediately. {@code false}
+ * to close this session after all queued write requests are
+ * flushed.
+ * @return A {@link IoFuture} that will contains the session's state
+ */
+ IoFuture<Void> close(boolean immediately);
+
+ /* READ/WRITE PAUSE MANAGEMENT */
+ /**
+ * Suspends read operations for this session.
+ */
+ void suspendRead();
+
+ /**
+ * Suspends write operations for this session.
+ */
+ void suspendWrite();
+
+ /**
+ * Resumes read operations for this session.
+ */
+ void resumeRead();
+
+ /**
+ * Resumes write operations for this session.
+ */
+ void resumeWrite();
+
+ /**
+ * Is read operation is suspended for this session.
+ *
+ * @return <code>true</code> if suspended
+ */
+ boolean isReadSuspended();
+
+ /**
+ * Is write operation is suspended for this session.
+ *
+ * @return <code>true</code> if suspended
+ */
+ boolean isWriteSuspended();
+
+ /* BASIC STATS */
+ /**
+ * Gets the total number of bytes read for this session since it was
+ * created.
+ *
+ * Returns the total number of bytes which were read from this session.
+ */
+ long getReadBytes();
+
+ /**
+ * Gets the total number of bytes written for this session since it was
+ * created.
+ *
+ * @return the total number of bytes which were written to this session.
+ */
+ long getWrittenBytes();
+
+ /* IDLE management */
+ /**
+ * Gets the session configuration, it where the idle timeout are set and
+ * other transport specific configuration.
+ *
+ * @return the session's configuration
+ */
+ IoSessionConfig getConfig();
+
+ /**
+ * The session's creation time.
+ *
+ * @return the session's creation time in milliseconds
+ */
+ long getCreationTime();
+
+ /**
+ * Returns the time in millisecond when I/O occurred lastly (either read or
+ * write).
+ *
+ * @return the time of the last read or write done for this session
+ */
+ long getLastIoTime();
+
+ /**
+ * Returns the time in millisecond when the last I/O read occurred.
+ *
+ * Returns the time in millisecond when read operation occurred lastly.
+ */
+ long getLastReadTime();
+
+ /**
+ * Returns the time in millisecond when the last I/O write occurred.
+ *
+ * Returns the time in millisecond when write operation occurred lastly.
+ */
+ long getLastWriteTime();
+
+ /* Session context management */
+ /**
+ * Returns the value of the user-defined attribute for this session.
+ *
+ * @param name
+ * the attribute's name
+ * @return <tt>null</tt> if there is no attribute with the specified name
+ */
+ Object getAttribute(Object name);
+
+ /**
+ * Sets a user-defined attribute.
+ *
+ * @param name
+ * the attribute's name
+ * @param value
+ * the attribute's value
+ * @return The old attribute's value. <tt>null</tt> if there is no previous
+ * value or if the value is null
+ */
+ Object setAttribute(Object name, Object value);
+
+ /**
+ * Removes a user-defined attribute with the specified name.
+ *
+ * @param name
+ * the attribute's name
+ * @return The old attribute's value. <tt>null</tt> if not found or if the
+ * attribute had no value
+ */
+ Object removeAttribute(Object name);
+
+ /**
+ * Tells if the session has an attached attribute.
+ *
+ * @return <tt>true</tt> if this session contains the attribute with the
+ * specified <tt>name</tt>.
+ */
+ boolean containsAttribute(Object name);
+
+ /**
+ * Gets the set of attributes stored within the session.
+ *
+ * @return the set of names of all user-defined attributes.
+ */
+ Set<Object> getAttributeNames();
+
+ SessionState getState();
+
+ /**
+ * State of a {@link IoSession}
+ *
+ * @author <a href="http://mina.apache.org">Apache MINA Project</a>
+ *
+ */
+ public enum SessionState {
+ CREATED, CONNECTED, CLOSING, CLOSED
+ }
+
+ /* SESSION WRITING */
+ /**
+ * Enqueue a message for writing. This method wont block ! The message will
+ * by asynchronously processed by the filter chain and wrote to socket by
+ * the {@link SelectorProcessor}.
+ *
+ */
+ public void write(Object message);
+
+ /**
+ * Same as {@link IoSession#write(Object)}, but provide a
+ * {@link IoFuture} for tracking the completion of this write.
+ *
+ * @param message the message to be processed and written
+ * @return the {@link IoFuture} for tracking this asynchronous operation
+ */
+ public IoFuture<Void> writeWithFuture(Object message);
+
+ /**
+ * Get the {@link Queue} of this session. The write queue contains the
pending writes.
+ * @return the write queue of this session
+ */
+ public Queue<WriteRequest> getWriteQueue();
+
+ /**
+ * Get the filter chain in charge of filtering events generated by this
session.
+ * @return the filter chain for this session
+ */
+ public IoFilterChain getFilterChain();
}
\ No newline at end of file
Modified:
mina/branches/3.0/core/src/main/java/org/apache/mina/filterchain/DefaultIoFilterChain.java
URL:
http://svn.apache.org/viewvc/mina/branches/3.0/core/src/main/java/org/apache/mina/filterchain/DefaultIoFilterChain.java?rev=1144953&r1=1144952&r2=1144953&view=diff
==============================================================================
---
mina/branches/3.0/core/src/main/java/org/apache/mina/filterchain/DefaultIoFilterChain.java
(original)
+++
mina/branches/3.0/core/src/main/java/org/apache/mina/filterchain/DefaultIoFilterChain.java
Sun Jul 10 20:10:38 2011
@@ -26,7 +26,6 @@ import java.util.concurrent.CopyOnWriteA
import org.apache.mina.api.IoFilter;
import org.apache.mina.api.IoFilterChain;
import org.apache.mina.api.IoSession;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -112,6 +111,19 @@ public class DefaultIoFilterChain implem
}
@Override
+ public void processSessionOpen(IoSession session) {
+ for (IoFilter filter : chain) {
+ try {
+ filter.sessionOpened(session);
+ } catch (Exception e) {
+ LOG.error("Exception caught during processing session open
event", e);
+ // we re-forward the catched Exception
+ processExceptionCaught(session, e);
+ }
+ }
+ }
+
+ @Override
public Object processMessageReceived(IoSession session, Object message) {
for (IoFilter filter : chain) {
try {
Modified:
mina/branches/3.0/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
URL:
http://svn.apache.org/viewvc/mina/branches/3.0/core/src/main/java/org/apache/mina/session/AbstractIoSession.java?rev=1144953&r1=1144952&r2=1144953&view=diff
==============================================================================
---
mina/branches/3.0/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
(original)
+++
mina/branches/3.0/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
Sun Jul 10 20:10:38 2011
@@ -26,9 +26,11 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.mina.api.IoFilterChain;
import org.apache.mina.api.IoFuture;
import org.apache.mina.api.IoService;
import org.apache.mina.api.IoSession;
+import org.apache.mina.filterchain.DefaultIoFilterChain;
import org.apache.mina.service.SelectorProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,6 +85,9 @@ public abstract class AbstractIoSession
/** the queue of pending writes for the session, to be dequeued by the
{@link SelectorProcessor} */
private Queue<WriteRequest> writeQueue = new DefaultWriteQueue();
+ /** the chain for filtering the events of this session */
+ private IoFilterChain filterChain = new DefaultIoFilterChain();
+
/**
* Create an {@link org.apache.mina.api.IoSession} with a unique
identifier (
* {@link org.apache.mina.api.IoSession#getId()}) and an associated {@link
IoService}
@@ -219,7 +224,7 @@ public abstract class AbstractIoSession
LOG.debug("writing message {} to session {}", message, this);
if (state == SessionState.CLOSED || state == SessionState.CLOSING) {
// TODO actually we just just shallow the message if the session
is closed/closing
- LOG.error("writing to closed or cloing session");
+ LOG.error("writing to closed or closing session");
return;
}
writeQueue.add(new DefaultWriteRequest(message));
@@ -242,4 +247,9 @@ public abstract class AbstractIoSession
public Queue<WriteRequest> getWriteQueue() {
return writeQueue;
}
+
+ @Override
+ public IoFilterChain getFilterChain() {
+ return filterChain;
+ }
}
\ No newline at end of file
Modified:
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
URL:
http://svn.apache.org/viewvc/mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java?rev=1144953&r1=1144952&r2=1144953&view=diff
==============================================================================
---
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
(original)
+++
mina/branches/3.0/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
Sun Jul 10 20:10:38 2011
@@ -166,7 +166,8 @@ public class NioSelectorProcessor implem
LOGGER.error("Unexpected exception, while configuring socket as
non blocking", e);
}
- // TODO : event session created
+ // event session created
+ session.getFilterChain().processSessionCreated(session);
// add the session to the queue for being added to the selector
sessionsToConnect.add(session);
@@ -232,6 +233,7 @@ public class NioSelectorProcessor implem
session.setConnected();
// fire the event
((AbstractIoService)
session.getService()).fireSessionCreated(session);
+
session.getFilterChain().processSessionOpen(session);
}
}
@@ -280,9 +282,9 @@ public class NioSelectorProcessor implem
sessionsToClose.add(session);
} else {
// we have read some data
- // TODO : push to the chain
-
- readBuffer.rewind();
+ // limit at the current position & rewind
buffer back to start & push to the chain
+ readBuffer.flip();
+
session.getFilterChain().processMessageReceived(session, readBuffer);
}
}