Author: elecharny
Date: Sat Feb 20 17:04:32 2010
New Revision: 912149
URL: http://svn.apache.org/viewvc?rev=912149&view=rev
Log:
o added a isEncoded() method in the WriteRequest interface, to avoid calling
the messageSent method if we are dealing with an encoded message
o The created ProtocolCodecFactory when using the classes to initialize the
filter now use local references, instead of creating new instances
o The initCodec() method has been removed
o The getDecoder0 and getEncoder0 methods have been removed
o The ProtocolEncoderOutputImpl.flushWithoutFuture method has been removed
o Loop on the queue instead of using a for(;;)
o Minor refactoring
Modified:
mina/trunk/core/src/main/java/org/apache/mina/core/filterchain/DefaultIoFilterChain.java
mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
mina/trunk/core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java
mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequest.java
mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java
mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/core/filterchain/DefaultIoFilterChain.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/filterchain/DefaultIoFilterChain.java?rev=912149&r1=912148&r2=912149&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/core/filterchain/DefaultIoFilterChain.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/core/filterchain/DefaultIoFilterChain.java
Sat Feb 20 17:04:32 2010
@@ -448,7 +448,10 @@
}
Entry head = this.head;
- callNextMessageSent(head, session, request);
+
+ if (!request.isEncoded()) {
+ callNextMessageSent(head, session, request);
+ }
}
private void callNextMessageSent(Entry entry, IoSession session,
Modified:
mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java?rev=912149&r1=912148&r2=912149&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/core/polling/AbstractPollingIoProcessor.java
Sat Feb 20 17:04:32 2010
@@ -679,7 +679,8 @@
private void read(T session) {
IoSessionConfig config = session.getConfig();
- IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());
+ int bufferSize = config.getReadBufferSize();
+ IoBuffer buf = IoBuffer.allocate(bufferSize);
final boolean hasFragmentation = session.getTransportMetadata()
.hasFragmentation();
@@ -690,6 +691,7 @@
try {
if (hasFragmentation) {
+
while ((ret = read(session, buf)) > 0) {
readBytes += ret;
Modified:
mina/trunk/core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java?rev=912149&r1=912148&r2=912149&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/core/write/DefaultWriteRequest.java
Sat Feb 20 17:04:32 2010
@@ -182,4 +182,9 @@
return sb.toString();
}
+
+ public boolean isEncoded()
+ {
+ return false;
+ }
}
\ No newline at end of file
Modified:
mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequest.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequest.java?rev=912149&r1=912148&r2=912149&view=diff
==============================================================================
--- mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequest.java
(original)
+++ mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequest.java
Sat Feb 20 17:04:32 2010
@@ -53,4 +53,11 @@
* @return <tt>null</tt> for the default destination
*/
SocketAddress getDestination();
+
+ /**
+ * Tells if the current message has been encoded
+ *
+ * @return true if the message has already been encoded
+ */
+ boolean isEncoded();
}
\ No newline at end of file
Modified:
mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java?rev=912149&r1=912148&r2=912149&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/core/write/WriteRequestWrapper.java
Sat Feb 20 17:04:32 2010
@@ -69,4 +69,9 @@
public String toString() {
return "WR Wrapper" + parentRequest.toString();
}
+
+ public boolean isEncoded()
+ {
+ return false;
+ }
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java?rev=912149&r1=912148&r2=912149&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/filter/codec/ProtocolCodecFilter.java
Sat Feb 20 17:04:32 2010
@@ -72,6 +72,7 @@
if (factory == null) {
throw new NullPointerException("factory");
}
+
this.factory = factory;
}
@@ -144,15 +145,32 @@
"decoderClass doesn't have a public default constructor.");
}
- // Create the inner factory based on the two parameters. We instantiate
- // the encoder and decoder locally.
+ final ProtocolEncoder encoder;
+
+ try {
+ encoder = encoderClass.newInstance();
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ "encoderClass cannot be initialized");
+ }
+
+ final ProtocolDecoder decoder;
+
+ try {
+ decoder = decoderClass.newInstance();
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ "decoderClass cannot be initialized");
+ }
+
+ // Create the inner factory based on the two parameters.
this.factory = new ProtocolCodecFactory() {
public ProtocolEncoder getEncoder(IoSession session) throws
Exception {
- return encoderClass.newInstance();
+ return encoder;
}
public ProtocolDecoder getDecoder(IoSession session) throws
Exception {
- return decoderClass.newInstance();
+ return decoder;
}
};
}
@@ -175,9 +193,6 @@
throw new IllegalArgumentException(
"You can't add the same filter instance more than once.
Create another instance and add it.");
}
-
- // Initialize the encoder and decoder
- initCodec(parent.getSession());
}
@Override
@@ -210,7 +225,7 @@
}
IoBuffer in = (IoBuffer) message;
- ProtocolDecoder decoder = getDecoder0(session);
+ ProtocolDecoder decoder = factory.getDecoder(session);
ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
// Loop until we don't have anymore byte in the buffer,
@@ -219,6 +234,7 @@
// data in the buffer
while (in.hasRemaining()) {
int oldPos = in.position();
+
try {
synchronized (decoderOut) {
// Call the decoder with the read bytes
@@ -282,13 +298,13 @@
// Bypass the encoding if the message is contained in a IoBuffer,
// as it has already been encoded before
- if (message instanceof IoBuffer || message instanceof FileRegion) {
+ if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {
nextFilter.filterWrite(session, writeRequest);
return;
}
// Get the encoder in the session
- ProtocolEncoder encoder = getEncoder0(session);
+ ProtocolEncoder encoder = factory.getEncoder(session);
ProtocolEncoderOutput encoderOut = getEncoderOut(session,
nextFilter, writeRequest);
@@ -306,7 +322,21 @@
encoder.encode(session, message, encoderOut);
// Send it directly
- ((ProtocolEncoderOutputImpl)encoderOut).flushWithoutFuture();
+ Queue<Object> bufferQueue =
((AbstractProtocolEncoderOutput)encoderOut).getMessageQueue();
+
+ // Write all the encoded messages now
+ while (!bufferQueue.isEmpty()) {
+ Object encodedMessage = bufferQueue.poll();
+
+ // Flush only when the buffer has remaining.
+ if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer)
encodedMessage).hasRemaining()) {
+ SocketAddress destination = writeRequest.getDestination();
+ WriteRequest encodedWriteRequest = new
EncodedWriteRequest(encodedMessage, null, destination);
+
+ nextFilter.filterWrite(session, encodedWriteRequest);
+ }
+ }
+
// Call the next filter
nextFilter.filterWrite(session, new MessageWriteRequest(
@@ -330,7 +360,7 @@
public void sessionClosed(NextFilter nextFilter, IoSession session)
throws Exception {
// Call finishDecode() first when a connection is closed.
- ProtocolDecoder decoder = getDecoder0(session);
+ ProtocolDecoder decoder = factory.getDecoder(session);
ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
try {
@@ -358,6 +388,10 @@
WriteFuture future, SocketAddress destination) {
super(encodedMessage, future, destination);
}
+
+ public boolean isEncoded() {
+ return true;
+ }
}
private static class MessageWriteRequest extends WriteRequestWrapper {
@@ -384,6 +418,7 @@
public void flush(NextFilter nextFilter, IoSession session) {
Queue<Object> messageQueue = getMessageQueue();
+
while (!messageQueue.isEmpty()) {
nextFilter.messageReceived(session, messageQueue.poll());
}
@@ -408,15 +443,12 @@
public WriteFuture flush() {
Queue<Object> bufferQueue = getMessageQueue();
WriteFuture future = null;
- for (;;) {
+
+ while (!bufferQueue.isEmpty()) {
Object encodedMessage = bufferQueue.poll();
- if (encodedMessage == null) {
- break;
- }
// Flush only when the buffer has remaining.
- if (!(encodedMessage instanceof IoBuffer) ||
- ((IoBuffer) encodedMessage).hasRemaining()) {
+ if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer)
encodedMessage).hasRemaining()) {
future = new DefaultWriteFuture(session);
nextFilter.filterWrite(session, new
EncodedWriteRequest(encodedMessage,
future, writeRequest.getDestination()));
@@ -430,43 +462,10 @@
return future;
}
-
- public void flushWithoutFuture() {
- Queue<Object> bufferQueue = getMessageQueue();
- for (;;) {
- Object encodedMessage = bufferQueue.poll();
- if (encodedMessage == null) {
- break;
- }
-
- // Flush only when the buffer has remaining.
- if (!(encodedMessage instanceof IoBuffer) ||
- ((IoBuffer) encodedMessage).hasRemaining()) {
- SocketAddress destination = writeRequest.getDestination();
- WriteRequest writeRequest = new EncodedWriteRequest(
- encodedMessage, null, destination);
- nextFilter.filterWrite(session, writeRequest);
- }
- }
- }
}
//----------- Helper methods ---------------------------------------------
/**
- * Initialize the encoder and the decoder, storing them in the
- * session attributes.
- */
- private void initCodec(IoSession session) throws Exception {
- // Creates the decoder and stores it into the newly created session
- ProtocolDecoder decoder = factory.getDecoder(session);
- session.setAttribute(DECODER, decoder);
-
- // Creates the encoder and stores it into the newly created session
- ProtocolEncoder encoder = factory.getEncoder(session);
- session.setAttribute(ENCODER, encoder);
- }
-
- /**
* Dispose the encoder, decoder, and the callback for the decoded
* messages.
*/
@@ -537,28 +536,6 @@
return out;
}
- private ProtocolEncoder getEncoder0(IoSession session) throws Exception {
- ProtocolEncoder encoder = (ProtocolEncoder) session
- .getAttribute(ENCODER);
- if (encoder == null) {
- encoder = factory.getEncoder(session);
- session.setAttribute(ENCODER, encoder);
- }
- return encoder;
- }
-
- private ProtocolDecoder getDecoder0(IoSession session) throws Exception {
- ProtocolDecoder decoder = (ProtocolDecoder) session
- .getAttribute(DECODER);
-
- if (decoder == null) {
- decoder = factory.getDecoder(session);
- session.setAttribute(DECODER, decoder);
- }
-
- return decoder;
- }
-
private ProtocolEncoderOutput getEncoderOut(IoSession session,
NextFilter nextFilter, WriteRequest writeRequest) {
ProtocolEncoderOutput out = (ProtocolEncoderOutput)
session.getAttribute(ENCODER_OUT);