Author: jvermillard
Date: Sat Dec 24 09:09:07 2011
New Revision: 1222940
URL: http://svn.apache.org/viewvc?rev=1222940&view=rev
Log:
removed the chain controller, it's now the AbstractIOSession the chain expert
Removed:
mina/trunk/core/src/main/java/org/apache/mina/filterchain/DefaultIoFilterController.java
mina/trunk/core/src/main/java/org/apache/mina/filterchain/IoFilterController.java
mina/trunk/core/src/test/java/org/apache/mina/filterchain/
Modified:
mina/trunk/core/src/main/java/org/apache/mina/api/DefaultIoFilter.java
mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java
mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
mina/trunk/core/src/main/java/org/apache/mina/filter/logging/LoggingFilter.java
mina/trunk/core/src/main/java/org/apache/mina/filterchain/ReadFilterChainController.java
mina/trunk/core/src/main/java/org/apache/mina/filterchain/WriteFilterChainController.java
mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
mina/trunk/core/src/main/java/org/apache/mina/session/SslHelper.java
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java
mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java
mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerCodec.java
mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerDecoder.java
mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerEncoder.java
mina/trunk/ldap/src/main/java/org/apache/mina/ldap/LdapCodec.java
mina/trunk/ldap/src/main/java/org/apache/mina/ldap/LdapProtocolDecoder.java
mina/trunk/ldap/src/main/java/org/apache/mina/ldap/LdapProtocolEncoder.java
Modified: mina/trunk/core/src/main/java/org/apache/mina/api/DefaultIoFilter.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/api/DefaultIoFilter.java?rev=1222940&r1=1222939&r2=1222940&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/api/DefaultIoFilter.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/api/DefaultIoFilter.java Sat
Dec 24 09:09:07 2011
@@ -47,12 +47,12 @@ public abstract class DefaultIoFilter im
@Override
public void messageReceived(IoSession session, Object message,
ReadFilterChainController controller) {
- controller.callReadNextFilter(session, message);
+ controller.callReadNextFilter(message);
}
@Override
public void messageWriting(IoSession session, Object message,
WriteFilterChainController controller) {
- controller.callWriteNextFilter(session, message);
+ controller.callWriteNextFilter(message);
}
}
Modified: mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java?rev=1222940&r1=1222939&r2=1222940&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java (original)
+++ mina/trunk/core/src/main/java/org/apache/mina/api/IoSession.java Sat Dec 24
09:09:07 2011
@@ -27,7 +27,6 @@ import java.util.Set;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException;
-import org.apache.mina.filterchain.IoFilterController;
import org.apache.mina.service.SelectorProcessor;
import org.apache.mina.session.AttributeKey;
import org.apache.mina.session.SslHelper;
@@ -357,7 +356,7 @@ public interface IoSession {
public IoFuture<Void> writeWithFuture(Object message);
/**
- * Internal method for enqueue write request after {@link
IoFilterController} processing
+ * Internal method for enqueue write request after filter chain processing
*
* @param message the message to put in the write request
* @return the created write request
@@ -388,10 +387,4 @@ public interface IoSession {
*/
public void releaseWriteQueue();
- /**
- * Get the filter chain in charge of filtering events generated by this
session.
- *
- * @return the filter chain for this session
- */
- public IoFilterController getFilterChain();
}
\ No newline at end of file
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java?rev=1222940&r1=1222939&r2=1222940&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
Sat Dec 24 09:09:07 2011
@@ -202,7 +202,7 @@ public class ProtocolCodecFilter extends
LOGGER.debug("Processing a MESSAGE_RECEIVED for session {}", session);
if (!(message instanceof ByteBuffer)) {
- controller.callReadNextFilter(session, message);
+ controller.callReadNextFilter(message);
return;
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/logging/LoggingFilter.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/logging/LoggingFilter.java?rev=1222940&r1=1222939&r2=1222940&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/filter/logging/LoggingFilter.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/filter/logging/LoggingFilter.java
Sat Dec 24 09:09:07 2011
@@ -195,7 +195,7 @@ public class LoggingFilter implements Io
log(messageReceivedLevel, "RECEIVED: {}", message);
}
- controller.callReadNextFilter(session, message);
+ controller.callReadNextFilter(message);
}
/**
@@ -208,7 +208,7 @@ public class LoggingFilter implements Io
} else {
log(messageReceivedLevel, "WRITTING: {}", message);
}
- controller.callWriteNextFilter(session, message);
+ controller.callWriteNextFilter(message);
}
//=========================
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filterchain/ReadFilterChainController.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filterchain/ReadFilterChainController.java?rev=1222940&r1=1222939&r2=1222940&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/filterchain/ReadFilterChainController.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/filterchain/ReadFilterChainController.java
Sat Dec 24 09:09:07 2011
@@ -19,7 +19,6 @@
*/
package org.apache.mina.filterchain;
-import org.apache.mina.api.IoSession;
/**
* Chain controller used by a filter for calling the next filter in read order.
@@ -29,5 +28,5 @@ import org.apache.mina.api.IoSession;
*/
public interface ReadFilterChainController {
- void callReadNextFilter(IoSession session, Object message);
+ void callReadNextFilter(Object message);
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filterchain/WriteFilterChainController.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filterchain/WriteFilterChainController.java?rev=1222940&r1=1222939&r2=1222940&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/filterchain/WriteFilterChainController.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/filterchain/WriteFilterChainController.java
Sat Dec 24 09:09:07 2011
@@ -19,7 +19,6 @@
*/
package org.apache.mina.filterchain;
-import org.apache.mina.api.IoSession;
/**
* Chain controller used by a filter for calling the next filter in write
order.
@@ -28,5 +27,6 @@ import org.apache.mina.api.IoSession;
*
*/
public interface WriteFilterChainController {
- void callWriteNextFilter(IoSession session, Object message);
+
+ void callWriteNextFilter(Object message);
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java?rev=1222940&r1=1222939&r2=1222940&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/service/SelectorProcessor.java
Sat Dec 24 09:09:07 2011
@@ -26,6 +26,7 @@ import java.net.SocketAddress;
import org.apache.mina.api.IoServer;
import org.apache.mina.api.IoService;
import org.apache.mina.api.IoSession;
+import org.apache.mina.session.AbstractIoSession;
/**
* A processor in charge of a group of client session and server sockets.
@@ -60,6 +61,6 @@ public interface SelectorProcessor {
* Schedule a session for flushing, will be called after a {@link
IoSession#write(Object)}.
* @param session the session to flush
*/
- void flush(IoSession session);
+ void flush(AbstractIoSession session);
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java?rev=1222940&r1=1222939&r2=1222940&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/session/AbstractIoSession.java
Sat Dec 24 09:09:07 2011
@@ -19,6 +19,7 @@
*/
package org.apache.mina.session;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Queue;
import java.util.Set;
@@ -35,8 +36,8 @@ import org.apache.mina.api.IoFilter;
import org.apache.mina.api.IoFuture;
import org.apache.mina.api.IoService;
import org.apache.mina.api.IoSession;
-import org.apache.mina.filterchain.DefaultIoFilterController;
-import org.apache.mina.filterchain.IoFilterController;
+import org.apache.mina.filterchain.ReadFilterChainController;
+import org.apache.mina.filterchain.WriteFilterChainController;
import org.apache.mina.service.SelectorProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -46,10 +47,13 @@ import org.slf4j.LoggerFactory;
*
* @author <a href="http://mina.apache.org">Apache MINA Project</a>
*/
-public abstract class AbstractIoSession implements IoSession {
+public abstract class AbstractIoSession implements IoSession,
ReadFilterChainController, WriteFilterChainController {
/** The logger for this class */
private static final Logger LOG =
LoggerFactory.getLogger(AbstractIoSession.class);
+ /** unique identifier generator */
+ private static final AtomicLong NEXT_ID = new AtomicLong(0);
+
/** The session's unique identifier */
private final long id;
@@ -59,9 +63,16 @@ public abstract class AbstractIoSession
/** The service this session is associated with */
private final IoService service;
+ /** attributes map */
+ private final AttributeContainer attributes = new
DefaultAttributeContainer();
+
/** The {@link SelectorProcessor} used for handling this session writing */
protected SelectorProcessor writeProcessor;
+ //------------------------------------------------------------------------
+ // Basic statistics
+ //------------------------------------------------------------------------
+
/** The number of bytes read since this session has been created */
private volatile long readBytes;
@@ -74,13 +85,9 @@ public abstract class AbstractIoSession
/** Last time something was written for this session */
private volatile long lastWriteTime;
- /** attributes map */
- private final AttributeContainer attributes = new
DefaultAttributeContainer();
-
- /** unique identifier generator */
- private static final AtomicLong NEXT_ID = new AtomicLong(0);
-
- protected final Object stateMonitor = new Object();
+ //------------------------------------------------------------------------
+ // Session state
+ //------------------------------------------------------------------------
/** The session's state : one of CREATED, CONNECTED, CLOSING, CLOSED,
SECURING, CONNECTED_SECURED */
protected volatile SessionState state;
@@ -100,6 +107,10 @@ public abstract class AbstractIoSession
/** is this session registered for being polled for write ready events */
private AtomicBoolean registeredForWrite = new AtomicBoolean();
+ //------------------------------------------------------------------------
+ // Write queue
+ //------------------------------------------------------------------------
+
/** the queue of pending writes for the session, to be dequeued by the
{@link SelectorProcessor} */
private Queue<WriteRequest> writeQueue = new DefaultWriteQueue();
@@ -112,8 +123,21 @@ public abstract class AbstractIoSession
/** A Write lock on the reentrant writeQueue lock */
private final Lock writeQueueWriteLock = writeQueueLock.writeLock();
- /** The controller for the {@link IoFilter} chain of this session */
- private IoFilterController filterProcessor;
+ //------------------------------------------------------------------------
+ // Filter chain
+ //------------------------------------------------------------------------
+
+ /** The list of {@link IoFilter} implementing this chain. */
+ private final IoFilter[] chain;
+
+ /** the current position in the write chain for this thread */
+ private int writeChainPosition;
+
+ /** the current position in the read chain for this thread */
+ private int readChainPosition;
+
+ /** hold the last WriteRequest created for the high level message
currently written (can be null) */
+ private WriteRequest lastWriteRequest;
/**
* Create an {@link org.apache.mina.api.IoSession} with a unique
identifier (
@@ -128,7 +152,7 @@ public abstract class AbstractIoSession
creationTime = System.currentTimeMillis();
this.service = service;
this.writeProcessor = writeProcessor;
- this.filterProcessor = new
DefaultIoFilterController(service.getFilters());
+ this.chain = service.getFilters();
LOG.debug("Created new session with id : {}", id);
@@ -474,8 +498,7 @@ public abstract class AbstractIoSession
}
// process the queue
- IoFilterController chain = getFilterChain();
- chain.processMessageWriting(this, message, future);
+ processMessageWriting(message, future);
}
/**
@@ -537,11 +560,130 @@ public abstract class AbstractIoSession
writeQueueWriteLock.unlock();
}
+ //------------------------------------------------------------------------
+ // Event processing using the filter chain
+ //------------------------------------------------------------------------
+
+ /**
+ * process session create event using the filter chain. To be called by
the session {@link SelectorProcessor} .
+ */
+ public void processSessionCreated() {
+ LOG.debug("processing session created event for session {}", this);
+
+ for (IoFilter filter : chain) {
+ filter.sessionCreated(this);
+ }
+ }
+
+ /**
+ * process session opened event using the filter chain. To be called by
the session {@link SelectorProcessor} .
+ */
+ public void processSessionOpened() {
+ LOG.debug("processing session open event");
+
+ for (IoFilter filter : chain) {
+ filter.sessionOpened(this);
+ }
+ }
+
+ /**
+ * process session closed event using the filter chain. To be called by
the session {@link SelectorProcessor} .
+ */
+ public void processSessionClosed() {
+ LOG.debug("processing session closed event");
+
+ for (IoFilter filter : chain) {
+ filter.sessionClosed(this);
+ }
+ }
+
+ /**
+ * process session message received event using the filter chain. To be
called by the session {@link SelectorProcessor} .
+ * @param message the received message
+ */
+ public void processMessageReceived(ByteBuffer message) {
+ LOG.debug("processing message '{}' received event for session {}",
message, this);
+
+ if (chain.length < 1) {
+ LOG.debug("Nothing to do, the chain is empty");
+ } else {
+ readChainPosition = 0;
+ // we call the first filter, it's supposed to call the next ones
using the filter chain controller
+ chain[readChainPosition].messageReceived(this, message, this);
+ }
+ }
+
+ /**
+ * process session message writing event using the filter chain. To be
called by the session {@link SelectorProcessor} .
+ * @param message the wrote message, should be transformed into ByteBuffer
at the end of the filter chain
+ */
+ public void processMessageWriting(Object message, IoFuture<Void> future) {
+ LOG.debug("processing message '{}' writing event for session {}",
message, this);
+
+ lastWriteRequest = null;
+
+ if (chain.length < 1) {
+ enqueueFinalWriteMessage(message);
+ } else {
+ writeChainPosition = chain.length - 1;
+ // we call the first filter, it's supposed to call the next ones
using the filter chain controller
+ int position = writeChainPosition;
+ IoFilter nextFilter = chain[position];
+ nextFilter.messageWriting(this, message, this);
+ }
+
+ // put the future in the last write request
+ if (future != null) {
+ WriteRequest request = lastWriteRequest;
+
+ if (request != null) {
+ ((DefaultWriteRequest) request).setFuture(future);
+ }
+ }
+ }
+
+ /**
+ * process session message received event using the filter chain. To be
called by the session {@link SelectorProcessor} .
+ * @param message the received message
+ */
+ public void callWriteNextFilter(Object message) {
+ LOG.debug("calling next filter for writing for message '{}' position :
{}", message, writeChainPosition);
+
+ writeChainPosition--;
+
+ if (writeChainPosition < 0 || chain.length == 0) {
+ // end of chain processing
+ enqueueFinalWriteMessage(message);
+ } else {
+ chain[writeChainPosition].messageWriting(this, message, this);
+ }
+
+ writeChainPosition++;
+ ;
+ }
+
+ /**
+ * At the end of write chain processing, enqueue final encoded {@link
ByteBuffer} message in the session
+ */
+ private void enqueueFinalWriteMessage(Object message) {
+ LOG.debug("end of write chan we enqueue the message in the session :
{}", message);
+ lastWriteRequest = enqueueWriteRequest(message);
+ }
+
/**
* {@inheritDoc}
*/
@Override
- public IoFilterController getFilterChain() {
- return filterProcessor;
+ public void callReadNextFilter(Object message) {
+ readChainPosition++;
+
+ if (readChainPosition >= chain.length) {
+ // end of chain processing
+ } else {
+ chain[readChainPosition].messageReceived(this, message, this);
+ }
+
+ readChainPosition--;
}
+
}
\ No newline at end of file
Modified: mina/trunk/core/src/main/java/org/apache/mina/session/SslHelper.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/session/SslHelper.java?rev=1222940&r1=1222939&r2=1222940&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/session/SslHelper.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/session/SslHelper.java Sat
Dec 24 09:09:07 2011
@@ -267,7 +267,7 @@ public class SslHelper {
* @param readBuffer The data we get from the channel
* @throws SSLException If the unwrapping or handshaking failed
*/
- public void processRead(IoSession session, ByteBuffer readBuffer) throws
SSLException {
+ public void processRead(AbstractIoSession session, ByteBuffer readBuffer)
throws SSLException {
if (session.isConnectedSecured()) {
// Unwrap the incoming data
processUnwrap(session, readBuffer);
@@ -281,7 +281,7 @@ public class SslHelper {
* Unwrap a SSL/TLS message. The message might not be encrypted (if we are
processing
* a Handshake message or an Alert message).
*/
- private void processUnwrap(IoSession session, ByteBuffer inBuffer) throws
SSLException {
+ private void processUnwrap(AbstractIoSession session, ByteBuffer inBuffer)
throws SSLException {
// Blind guess : once uncompressed, the resulting buffer will be 3
times bigger
ByteBuffer appBuffer = ByteBuffer.allocate(inBuffer.limit() * 3);
SSLEngineResult result = unwrap(inBuffer, appBuffer);
@@ -290,7 +290,7 @@ public class SslHelper {
case OK:
// Ok, go through the chain now
appBuffer.flip();
- session.getFilterChain().processMessageReceived(session,
appBuffer);
+ session.processMessageReceived(appBuffer);
break;
case CLOSED:
Modified:
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java?rev=1222940&r1=1222939&r2=1222940&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/transport/tcp/NioSelectorProcessor.java
Sat Dec 24 09:09:07 2011
@@ -47,6 +47,7 @@ import org.apache.mina.api.RuntimeIoExce
import org.apache.mina.service.AbstractIoService;
import org.apache.mina.service.SelectorProcessor;
import org.apache.mina.service.SelectorStrategy;
+import org.apache.mina.session.AbstractIoSession;
import org.apache.mina.session.DefaultWriteFuture;
import org.apache.mina.session.SslHelper;
import org.apache.mina.session.WriteRequest;
@@ -240,7 +241,7 @@ public class NioSelectorProcessor implem
}
// event session created
- session.getFilterChain().processSessionCreated(session);
+ session.processSessionCreated();
// add the session to the queue for being added to the selector
sessionsToConnect.add(session);
@@ -251,7 +252,7 @@ public class NioSelectorProcessor implem
* {@inheritDoc}
*/
@Override
- public void flush(IoSession session) {
+ public void flush(AbstractIoSession session) {
LOGGER.debug("scheduling session {} for writing", session);
// add the session to the list of session to be registered for writing
flushingSessions.add((NioTcpSession) session);
@@ -412,7 +413,7 @@ public class NioSelectorProcessor implem
// fire the event
((AbstractIoService)
session.getService()).fireSessionCreated(session);
- session.getFilterChain().processSessionOpened(session);
+ session.processSessionOpened();
}
}
}
@@ -430,7 +431,7 @@ public class NioSelectorProcessor implem
// closing underlying socket
session.getSocketChannel().close();
// fire the event
- session.getFilterChain().processSessionClosed(session);
+ session.processSessionClosed();
((AbstractIoService)
session.getService()).fireSessionDestroyed(session);
}
}
@@ -482,7 +483,7 @@ public class NioSelectorProcessor implem
sslHelper.processRead(session, readBuffer);
} else {
// Plain message, not encrypted : go directly to the chain
- session.getFilterChain().processMessageReceived(session,
readBuffer);
+ session.processMessageReceived(readBuffer);
}
}
}
Modified:
mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java?rev=1222940&r1=1222939&r2=1222940&view=diff
==============================================================================
---
mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java
(original)
+++
mina/trunk/core/src/test/java/org/apache/mina/session/AbstractIoSessionTest.java
Sat Dec 24 09:09:07 2011
@@ -18,20 +18,23 @@
*/
package org.apache.mina.session;
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNotNull;
-import static junit.framework.Assert.assertNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static junit.framework.Assert.*;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
import java.net.SocketAddress;
+import java.nio.ByteBuffer;
import junit.framework.Assert;
+import org.apache.mina.api.DefaultIoFilter;
import org.apache.mina.api.IoFilter;
import org.apache.mina.api.IoFuture;
import org.apache.mina.api.IoService;
import org.apache.mina.api.IoSessionConfig;
+import org.apache.mina.filterchain.ReadFilterChainController;
+import org.apache.mina.filterchain.WriteFilterChainController;
+import org.apache.mina.service.SelectorProcessor;
import org.junit.Before;
import org.junit.Test;
@@ -42,9 +45,11 @@ import org.junit.Test;
*/
public class AbstractIoSessionTest {
- private static final class DummySession extends AbstractIoSession {
+ private SelectorProcessor processor = mock(SelectorProcessor.class);
+
+ private class DummySession extends AbstractIoSession {
private DummySession(IoService service) {
- super(service, null);
+ super(service, processor);
}
@Override
@@ -117,10 +122,17 @@ public class AbstractIoSessionTest {
private IoService service = null;
+ private IoFilter filter1 = spy(new PassthruFilter());
+
+ private IoFilter filter2 = spy(new PassthruFilter());
+
+ private IoFilter filter3 = spy(new PassthruFilter());
+
@Before
public void setup() {
service = mock(IoService.class);
- when(service.getFilters()).thenReturn(new IoFilter[] {});
+
+ when(service.getFilters()).thenReturn(new IoFilter[] { filter1,
filter2, filter3 });
}
@Test
@@ -157,4 +169,56 @@ public class AbstractIoSessionTest {
assertEquals(null, aio.getAttribute(key, null));
assertNotNull(aio.getService());
}
+
+ @Test
+ public void chain_reads() {
+ DummySession session = new DummySession(service);
+ ByteBuffer buffer = mock(ByteBuffer.class);
+ session.processMessageReceived(buffer);
+ verify(filter1).messageReceived(eq(session), eq(buffer),
any(ReadFilterChainController.class));
+ verify(filter2).messageReceived(eq(session), eq(buffer),
any(ReadFilterChainController.class));
+ verify(filter3).messageReceived(eq(session), eq(buffer),
any(ReadFilterChainController.class));
+ }
+
+ @Test
+ public void chain_writes() {
+ DummySession session = new DummySession(service);
+ ByteBuffer buffer = mock(ByteBuffer.class);
+ session.processMessageWriting(buffer, null);
+ verify(filter1).messageWriting(eq(session), eq(buffer),
any(WriteFilterChainController.class));
+ verify(filter2).messageWriting(eq(session), eq(buffer),
any(WriteFilterChainController.class));
+ verify(filter3).messageWriting(eq(session), eq(buffer),
any(WriteFilterChainController.class));
+ verify(processor).flush(eq(session));
+ }
+
+ @Test
+ public void chain_created() {
+ DummySession session = new DummySession(service);
+ session.processSessionCreated();
+ verify(filter1).sessionCreated(eq(session));
+ verify(filter2).sessionCreated(eq(session));
+ verify(filter3).sessionCreated(eq(session));
+ }
+
+ @Test
+ public void chain_open() {
+ DummySession session = new DummySession(service);
+ session.processSessionOpened();
+ verify(filter1).sessionOpened(eq(session));
+ verify(filter2).sessionOpened(eq(session));
+ verify(filter3).sessionOpened(eq(session));
+ }
+
+ @Test
+ public void chain_close() {
+ DummySession session = new DummySession(service);
+ session.processSessionClosed();
+ verify(filter1).sessionClosed(eq(session));
+ verify(filter2).sessionClosed(eq(session));
+ verify(filter3).sessionClosed(eq(session));
+ }
+
+ private class PassthruFilter extends DefaultIoFilter {
+
+ }
}
Modified:
mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java
URL:
http://svn.apache.org/viewvc/mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java?rev=1222940&r1=1222939&r2=1222940&view=diff
==============================================================================
---
mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java
(original)
+++
mina/trunk/examples/src/main/java/org/apache/mina/examples/echoserver/NioEchoServer.java
Sat Dec 24 09:09:07 2011
@@ -83,7 +83,7 @@ public class NioEchoServer {
@Override
public void messageWriting(IoSession session, Object message,
WriteFilterChainController controller) {
// we just push the message in the chain
- controller.callWriteNextFilter(session, message);
+ controller.callWriteNextFilter(message);
}
@Override
Modified:
mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerCodec.java
URL:
http://svn.apache.org/viewvc/mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerCodec.java?rev=1222940&r1=1222939&r2=1222940&view=diff
==============================================================================
--- mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerCodec.java
(original)
+++ mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerCodec.java Sat
Dec 24 09:09:07 2011
@@ -34,7 +34,8 @@ public class HttpServerCodec extends Pro
private static final Logger LOG =
LoggerFactory.getLogger(HttpServerCodec.class);
/** Key for decoder current state */
- private static final AttributeKey<String> DECODER_STATE_ATT =
createKey(String.class, "internal_http.ds");
+ private static final AttributeKey<DecoderState> DECODER_STATE_ATT =
createKey(DecoderState.class,
+ "internal_http.ds");
/** Key for the partial HTTP requests head */
private static final AttributeKey<String> PARTIAL_HEAD_ATT =
createKey(String.class, "internal_http.ph");
Modified:
mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerDecoder.java
URL:
http://svn.apache.org/viewvc/mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerDecoder.java?rev=1222940&r1=1222939&r2=1222940&view=diff
==============================================================================
--- mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerDecoder.java
(original)
+++ mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerDecoder.java
Sat Dec 24 09:09:07 2011
@@ -101,7 +101,7 @@ public class HttpServerDecoder implement
session.setAttribute(PARTIAL_HEAD_ATT, partial);
session.setAttribute(DECODER_STATE_ATT, DecoderState.HEAD);
} else {
- controller.callReadNextFilter(session, rq);
+ controller.callReadNextFilter(rq);
// is it a request with some body content ?
if (rq.getMethod() == HttpMethod.POST || rq.getMethod() ==
HttpMethod.PUT) {
@@ -129,14 +129,14 @@ public class HttpServerDecoder implement
LOG.debug("decoding BODY");
int chunkSize = msg.remaining();
// send the chunk of body
- controller.callReadNextFilter(session, msg);
+ controller.callReadNextFilter(msg);
// do we have reach end of body ?
int remaining = session.getAttribute(BODY_REMAINING_BYTES, null);
remaining -= chunkSize;
if (remaining <= 0) {
LOG.debug("end of HTTP body");
- controller.callReadNextFilter(session, new HttpEndOfContent());
+ controller.callReadNextFilter(new HttpEndOfContent());
session.setAttribute(DECODER_STATE_ATT, DecoderState.NEW);
session.removeAttribute(BODY_REMAINING_BYTES);
} else {
Modified:
mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerEncoder.java
URL:
http://svn.apache.org/viewvc/mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerEncoder.java?rev=1222940&r1=1222939&r2=1222940&view=diff
==============================================================================
--- mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerEncoder.java
(original)
+++ mina/trunk/http/src/main/java/org/apache/mina/http/HttpServerEncoder.java
Sat Dec 24 09:09:07 2011
@@ -44,9 +44,9 @@ public class HttpServerEncoder implement
}
sb.append("\r\n");
byte[] bytes = sb.toString().getBytes(Charset.forName("UTF-8"));
- controller.callWriteNextFilter(session, ByteBuffer.wrap(bytes));
+ controller.callWriteNextFilter(ByteBuffer.wrap(bytes));
} else if (message instanceof ByteBuffer) {
- controller.callWriteNextFilter(session, message);
+ controller.callWriteNextFilter(message);
} else if (message instanceof HttpEndOfContent) {
// end of HTTP content
// keep alive ?
Modified: mina/trunk/ldap/src/main/java/org/apache/mina/ldap/LdapCodec.java
URL:
http://svn.apache.org/viewvc/mina/trunk/ldap/src/main/java/org/apache/mina/ldap/LdapCodec.java?rev=1222940&r1=1222939&r2=1222940&view=diff
==============================================================================
--- mina/trunk/ldap/src/main/java/org/apache/mina/ldap/LdapCodec.java (original)
+++ mina/trunk/ldap/src/main/java/org/apache/mina/ldap/LdapCodec.java Sat Dec
24 09:09:07 2011
@@ -19,7 +19,6 @@
*/
package org.apache.mina.ldap;
-
import static org.apache.mina.session.AttributeKey.createKey;
import java.nio.ByteBuffer;
@@ -50,15 +49,13 @@ import org.apache.mina.session.Attribute
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* A LDAP message codec.
*
* @author <a href="mailto:[email protected]">Apache Directory
Project</a>
*/
-public class LdapCodec extends ProtocolCodecFilter
-{
- private static final Logger LOG = LoggerFactory.getLogger( LdapCodec.class
);
+public class LdapCodec extends ProtocolCodecFilter {
+ private static final Logger LOG = LoggerFactory.getLogger(LdapCodec.class);
/** The LDAP decoder instance */
private static ProtocolDecoder ldapDecoder = new LdapProtocolDecoder();
@@ -70,95 +67,60 @@ public class LdapCodec extends ProtocolC
private static LdapApiService codec = LdapApiServiceFactory.getSingleton();
/** The Message Container attribute */
- public static final AttributeKey<LdapMessageContainer>
MESSAGE_CONTAINER_AT = createKey(
- LdapMessageContainer.class, "internal_messageContainer" );
-
+ public static final AttributeKey<LdapMessageContainer>
MESSAGE_CONTAINER_AT = createKey(LdapMessageContainer.class,
+ "internal_messageContainer");
- public LdapCodec()
- {
- super( ldapEncoder, ldapDecoder );
+ public LdapCodec() {
+ super(ldapEncoder, ldapDecoder);
}
-
@Override
- public void sessionCreated( IoSession session )
- {
+ public void sessionCreated(IoSession session) {
LdapMessageContainer<MessageDecorator<? extends Message>> container =
new LdapMessageContainer<MessageDecorator<? extends Message>>(
- codec );
- session.setAttribute( MESSAGE_CONTAINER_AT, container );
+ codec);
+ session.setAttribute(MESSAGE_CONTAINER_AT, container);
}
-
@Override
- public void sessionOpened( IoSession session )
- {
+ public void sessionOpened(IoSession session) {
}
-
@Override
- public void sessionClosed( IoSession session )
- {
- session.removeAttribute( MESSAGE_CONTAINER_AT );
+ public void sessionClosed(IoSession session) {
+ session.removeAttribute(MESSAGE_CONTAINER_AT);
}
-
@Override
- public void sessionIdle( IoSession session, IdleStatus status )
- {
+ public void sessionIdle(IoSession session, IdleStatus status) {
// TODO Auto-generated method stub
}
-
@Override
- public void messageWriting( IoSession session, Object message,
WriteFilterChainController controller )
- {
- if ( message instanceof AddResponse )
- {
- ldapEncoder.encode( session, ( AddResponse ) message, controller );
- }
- else if ( message instanceof BindResponse )
- {
- ldapEncoder.encode( session, ( BindResponse ) message, controller
);
- }
- else if ( message instanceof DeleteResponse )
- {
- ldapEncoder.encode( session, ( DeleteResponse ) message,
controller );
- }
- else if ( message instanceof CompareResponse )
- {
- ldapEncoder.encode( session, ( CompareResponse ) message,
controller );
- }
- else if ( message instanceof ExtendedResponse )
- {
- ldapEncoder.encode( session, ( ExtendedResponse ) message,
controller );
- }
- else if ( message instanceof IntermediateResponse )
- {
- ldapEncoder.encode( session, ( IntermediateResponse ) message,
controller );
- }
- else if ( message instanceof ModifyResponse )
- {
- ldapEncoder.encode( session, ( ModifyResponse ) message,
controller );
- }
- else if ( message instanceof ModifyDnResponse )
- {
- ldapEncoder.encode( session, ( ModifyDnResponse ) message,
controller );
- }
- else if ( message instanceof SearchResultDone )
- {
- ldapEncoder.encode( session, ( SearchResultDone ) message,
controller );
- }
- else if ( message instanceof SearchResultEntry )
- {
- ldapEncoder.encode( session, ( SearchResultEntry ) message,
controller );
- }
- else if ( message instanceof SearchResultReference )
- {
- ldapEncoder.encode( session, ( SearchResultReference ) message,
controller );
- }
- else if ( message instanceof ByteBuffer )
- {
- controller.callWriteNextFilter( session, message );
+ public void messageWriting(IoSession session, Object message,
WriteFilterChainController controller) {
+ if (message instanceof AddResponse) {
+ ldapEncoder.encode(session, (AddResponse) message, controller);
+ } else if (message instanceof BindResponse) {
+ ldapEncoder.encode(session, (BindResponse) message, controller);
+ } else if (message instanceof DeleteResponse) {
+ ldapEncoder.encode(session, (DeleteResponse) message, controller);
+ } else if (message instanceof CompareResponse) {
+ ldapEncoder.encode(session, (CompareResponse) message, controller);
+ } else if (message instanceof ExtendedResponse) {
+ ldapEncoder.encode(session, (ExtendedResponse) message,
controller);
+ } else if (message instanceof IntermediateResponse) {
+ ldapEncoder.encode(session, (IntermediateResponse) message,
controller);
+ } else if (message instanceof ModifyResponse) {
+ ldapEncoder.encode(session, (ModifyResponse) message, controller);
+ } else if (message instanceof ModifyDnResponse) {
+ ldapEncoder.encode(session, (ModifyDnResponse) message,
controller);
+ } else if (message instanceof SearchResultDone) {
+ ldapEncoder.encode(session, (SearchResultDone) message,
controller);
+ } else if (message instanceof SearchResultEntry) {
+ ldapEncoder.encode(session, (SearchResultEntry) message,
controller);
+ } else if (message instanceof SearchResultReference) {
+ ldapEncoder.encode(session, (SearchResultReference) message,
controller);
+ } else if (message instanceof ByteBuffer) {
+ controller.callWriteNextFilter(message);
}
}
}
\ No newline at end of file
Modified:
mina/trunk/ldap/src/main/java/org/apache/mina/ldap/LdapProtocolDecoder.java
URL:
http://svn.apache.org/viewvc/mina/trunk/ldap/src/main/java/org/apache/mina/ldap/LdapProtocolDecoder.java?rev=1222940&r1=1222939&r2=1222940&view=diff
==============================================================================
--- mina/trunk/ldap/src/main/java/org/apache/mina/ldap/LdapProtocolDecoder.java
(original)
+++ mina/trunk/ldap/src/main/java/org/apache/mina/ldap/LdapProtocolDecoder.java
Sat Dec 24 09:09:07 2011
@@ -19,7 +19,6 @@
*/
package org.apache.mina.ldap;
-
import static org.apache.mina.session.AttributeKey.createKey;
import java.nio.ByteBuffer;
@@ -40,16 +39,14 @@ import org.apache.mina.session.Attribute
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* A LDAP message decoder. It is based on shared-ldap decoder.
*
* @author <a href="mailto:[email protected]">Apache Directory
Project</a>
*/
-public class LdapProtocolDecoder implements ProtocolDecoder
-{
+public class LdapProtocolDecoder implements ProtocolDecoder {
/** The logger */
- private static Logger LOG = LoggerFactory.getLogger(
LdapProtocolDecoder.class );
+ private static Logger LOG =
LoggerFactory.getLogger(LdapProtocolDecoder.class);
/** A speedup for logger */
private static final boolean IS_DEBUG = LOG.isDebugEnabled();
@@ -58,54 +55,44 @@ public class LdapProtocolDecoder impleme
private Asn1Decoder asn1Decoder;
/** Key for the partial HTTP requests head */
- private static final AttributeKey<Integer> MAX_PDU_SIZE_AT = createKey(
Integer.class, "internal_max_pdu_size" );
-
+ private static final AttributeKey<Integer> MAX_PDU_SIZE_AT =
createKey(Integer.class, "internal_max_pdu_size");
/**
* Creates a new instance of LdapProtocolEncoder.
*
* @param codec The LDAP codec service associated with this encoder.
*/
- public LdapProtocolDecoder()
- {
+ public LdapProtocolDecoder() {
asn1Decoder = new Asn1Decoder();
}
-
/**
* {@inheritDoc}
*/
- public Object decode( IoSession session, ByteBuffer in,
ReadFilterChainController controller )
- {
+ public Object decode(IoSession session, ByteBuffer in,
ReadFilterChainController controller) {
@SuppressWarnings("unchecked")
- LdapMessageContainer<MessageDecorator<? extends Message>>
messageContainer = ( LdapMessageContainer<MessageDecorator<? extends Message>>
) session
- .getAttribute( LdapCodec.MESSAGE_CONTAINER_AT, null );
+ LdapMessageContainer<MessageDecorator<? extends Message>>
messageContainer = (LdapMessageContainer<MessageDecorator<? extends Message>>)
session
+ .getAttribute(LdapCodec.MESSAGE_CONTAINER_AT, null);
- int maxPDUSize = session.getAttribute( MAX_PDU_SIZE_AT, 0 );
+ int maxPDUSize = session.getAttribute(MAX_PDU_SIZE_AT, 0);
- messageContainer.setMaxPDUSize( maxPDUSize );
+ messageContainer.setMaxPDUSize(maxPDUSize);
- try
- {
+ try {
Object message = null;
- do
- {
- message = decode( in, messageContainer );
+ do {
+ message = decode(in, messageContainer);
- controller.callReadNextFilter( session, message );
- }
- while ( message != null );
- }
- catch ( DecoderException lde )
- {
+ controller.callReadNextFilter(message);
+ } while (message != null);
+ } catch (DecoderException lde) {
// Do something
}
return null;
}
-
/**
* Decode an incoming buffer into LDAP messages. The result can be 0, 1 or
many LDAP messages, which will be stored
* into the array the caller has created.
@@ -116,16 +103,12 @@ public class LdapProtocolDecoder impleme
* @param decodedMessages The list of decoded messages
* @throws Exception If the decoding failed
*/
- private Object decode( ByteBuffer buffer,
LdapMessageContainer<MessageDecorator<? extends Message>> messageContainer )
- throws DecoderException
- {
- while ( buffer.hasRemaining() )
- {
- try
- {
- if ( IS_DEBUG )
- {
- LOG.debug( "Decoding the PDU : " );
+ private Object decode(ByteBuffer buffer,
LdapMessageContainer<MessageDecorator<? extends Message>> messageContainer)
+ throws DecoderException {
+ while (buffer.hasRemaining()) {
+ try {
+ if (IS_DEBUG) {
+ LOG.debug("Decoding the PDU : ");
int size = buffer.limit();
int position = buffer.position();
@@ -133,25 +116,20 @@ public class LdapProtocolDecoder impleme
byte[] array = new byte[pduLength];
- System.arraycopy( buffer.array(), position, array, 0,
pduLength );
+ System.arraycopy(buffer.array(), position, array, 0,
pduLength);
- if ( array.length == 0 )
- {
- LOG.debug( "NULL buffer, what the HELL ???" );
- }
- else
- {
- LOG.debug( Strings.dumpBytes( array ) );
+ if (array.length == 0) {
+ LOG.debug("NULL buffer, what the HELL ???");
+ } else {
+ LOG.debug(Strings.dumpBytes(array));
}
}
- asn1Decoder.decode( buffer, messageContainer );
+ asn1Decoder.decode(buffer, messageContainer);
- if ( messageContainer.getState() == TLVStateEnum.PDU_DECODED )
- {
- if ( IS_DEBUG )
- {
- LOG.debug( "Decoded LdapMessage : " +
messageContainer.getMessage() );
+ if (messageContainer.getState() == TLVStateEnum.PDU_DECODED) {
+ if (IS_DEBUG) {
+ LOG.debug("Decoded LdapMessage : " +
messageContainer.getMessage());
}
Message message = messageContainer.getMessage();
@@ -159,24 +137,19 @@ public class LdapProtocolDecoder impleme
return message;
}
- }
- catch ( DecoderException de )
- {
+ } catch (DecoderException de) {
buffer.clear();
messageContainer.clean();
- if ( de instanceof ResponseCarryingException )
- {
+ if (de instanceof ResponseCarryingException) {
// Transform the DecoderException message to a
MessageException
- ResponseCarryingMessageException rcme = new
ResponseCarryingMessageException( de.getMessage() );
- rcme.setResponse( ( ( ResponseCarryingException ) de
).getResponse() );
+ ResponseCarryingMessageException rcme = new
ResponseCarryingMessageException(de.getMessage());
+ rcme.setResponse(((ResponseCarryingException)
de).getResponse());
throw rcme;
- }
- else
- {
+ } else {
// TODO : This is certainly not the way we should handle
such an exception !
- throw new ResponseCarryingException( de.getMessage() );
+ throw new ResponseCarryingException(de.getMessage());
}
}
}
@@ -184,21 +157,17 @@ public class LdapProtocolDecoder impleme
return null;
}
-
/**
* {@inheritDoc}
*/
- public Object finishDecode( IoSession session ) throws Exception
- {
+ public Object finishDecode(IoSession session) throws Exception {
return null;
}
-
/**
* {@inheritDoc}
*/
- public void dispose( IoSession session ) throws Exception
- {
+ public void dispose(IoSession session) throws Exception {
// Nothing to do
}
}
Modified:
mina/trunk/ldap/src/main/java/org/apache/mina/ldap/LdapProtocolEncoder.java
URL:
http://svn.apache.org/viewvc/mina/trunk/ldap/src/main/java/org/apache/mina/ldap/LdapProtocolEncoder.java?rev=1222940&r1=1222939&r2=1222940&view=diff
==============================================================================
--- mina/trunk/ldap/src/main/java/org/apache/mina/ldap/LdapProtocolEncoder.java
(original)
+++ mina/trunk/ldap/src/main/java/org/apache/mina/ldap/LdapProtocolEncoder.java
Sat Dec 24 09:09:07 2011
@@ -53,7 +53,7 @@ public class LdapProtocolEncoder impleme
try {
ByteBuffer buffer = encoder.encodeMessage((Message) message);
- controller.callWriteNextFilter(session, buffer);
+ controller.callWriteNextFilter(buffer);
} catch (Exception e) {
return null;
}