Author: trustin
Date: Sat Sep 29 14:13:48 2007
New Revision: 580644
URL: http://svn.apache.org/viewvc?rev=580644&view=rev
Log:
* Added WriteRequestFilter that helps implementation of data transformation
filters
* Changed CompressionFilter and RequestResponseFilter to extend
WriteRequestFilter (NOTE: SslFilter and ProtocolCodecFilter is somewhat
different from usual data transformation filters in that it can generate more
than two messages.)
Added:
mina/trunk/core/src/main/java/org/apache/mina/filter/util/WriteRequestFilter.java
(with props)
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java
mina/trunk/filter-compression/src/main/java/org/apache/mina/filter/compression/CompressionFilter.java
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java?rev=580644&r1=580643&r2=580644&view=diff
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java
Sat Sep 29 14:13:48 2007
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -30,19 +31,18 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.IoSessionLogger;
import org.apache.mina.common.WriteRequest;
-import org.apache.mina.common.WriteRequestWrapper;
+import org.apache.mina.filter.util.WriteRequestFilter;
/**
*
* @author The Apache MINA Project ([EMAIL PROTECTED])
* @version $Rev$, $Date$
*/
-public class RequestResponseFilter extends IoFilterAdapter {
+public class RequestResponseFilter extends WriteRequestFilter {
private static final String RESPONSE_INSPECTOR =
RequestResponseFilter.class
.getName()
@@ -179,20 +179,18 @@
nextFilter.messageReceived(session, response);
}
}
-
+
@Override
- public void filterWrite(NextFilter nextFilter, IoSession session,
- WriteRequest writeRequest) throws Exception {
+ protected Object doFilterWrite(
+ final NextFilter nextFilter, IoSession session, WriteRequest
writeRequest) throws Exception {
Object message = writeRequest.getMessage();
if (!(message instanceof Request)) {
- nextFilter.filterWrite(session, writeRequest);
- return;
+ return null;
}
-
- Request request = (Request) message;
+
+ final Request request = (Request) message;
if (request.getTimeoutFuture() != null) {
- nextFilter.exceptionCaught(session, new IllegalArgumentException(
- "Request can not be reused."));
+ throw new IllegalArgumentException("Request can not be reused.");
}
Map<Object, Request> requestStore = getRequestStore(session);
@@ -205,60 +203,44 @@
}
}
if (oldValue != null) {
- nextFilter.exceptionCaught(session, new IllegalStateException(
- "Duplicate request ID: " + request.getId()));
+ throw new IllegalStateException(
+ "Duplicate request ID: " + request.getId());
+ }
+
+ // Schedule a task to be executed on timeout.
+ // Find the timeout date avoiding overflow.
+ Date timeoutDate = new Date(System.currentTimeMillis());
+ if (Long.MAX_VALUE - request.getTimeoutMillis() < timeoutDate
+ .getTime()) {
+ timeoutDate.setTime(Long.MAX_VALUE);
+ } else {
+ timeoutDate.setTime(timeoutDate.getTime()
+ + request.getTimeoutMillis());
}
- nextFilter.filterWrite(session, new RequestWriteRequest(writeRequest));
- }
-
- @Override
- public void messageSent(final NextFilter nextFilter,
- final IoSession session, WriteRequest writeRequest)
- throws Exception {
- if (writeRequest instanceof RequestWriteRequest) {
- // Schedule a task to be executed on timeout.
- RequestWriteRequest wrappedRequest = (RequestWriteRequest)
writeRequest;
- WriteRequest actualRequest = wrappedRequest.getWriteRequest();
- final Request request = (Request) actualRequest.getMessage();
-
- // Find the timeout date avoiding overflow.
- Date timeoutDate = new Date(System.currentTimeMillis());
- if (Long.MAX_VALUE - request.getTimeoutMillis() < timeoutDate
- .getTime()) {
- timeoutDate.setTime(Long.MAX_VALUE);
- } else {
- timeoutDate.setTime(timeoutDate.getTime()
- + request.getTimeoutMillis());
- }
-
- TimeoutTask timeoutTask = new TimeoutTask(nextFilter, request,
- session);
-
- // Schedule the timeout task.
- ScheduledFuture<?> timeoutFuture = timeoutScheduler.schedule(
- timeoutTask, request.getTimeoutMillis(),
- TimeUnit.MILLISECONDS);
- request.setTimeoutTask(timeoutTask);
- request.setTimeoutFuture(timeoutFuture);
+ TimeoutTask timeoutTask = new TimeoutTask(
+ nextFilter, request, session);
- // Add the timtoue task to the unfinished task set.
- Set<Request> unrespondedRequests =
getUnrespondedRequestStore(session);
- synchronized (unrespondedRequests) {
- unrespondedRequests.add(request);
- }
+ // Schedule the timeout task.
+ ScheduledFuture<?> timeoutFuture = timeoutScheduler.schedule(
+ timeoutTask, request.getTimeoutMillis(),
+ TimeUnit.MILLISECONDS);
+ request.setTimeoutTask(timeoutTask);
+ request.setTimeoutFuture(timeoutFuture);
- // and forward the original write request.
- nextFilter.messageSent(session, wrappedRequest.getWriteRequest());
- } else {
- nextFilter.messageSent(session, writeRequest);
+ // Add the timeout task to the unfinished task set.
+ Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
+ synchronized (unrespondedRequests) {
+ unrespondedRequests.add(request);
}
+
+ return request.getMessage();
}
@Override
public void sessionClosed(NextFilter nextFilter, IoSession session)
throws Exception {
- // Copy the unifished task set to avoid unnecessary lock acquisition.
+ // Copy the unfinished task set to avoid unnecessary lock acquisition.
// Copying will be cheap because there won't be that many requests
queued.
Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
List<Request> unrespondedRequestsCopy;
@@ -268,7 +250,7 @@
unrespondedRequests.clear();
}
- // Generate timeout artifically.
+ // Generate timeout artificially.
for (Request r : unrespondedRequestsCopy) {
if (r.getTimeoutFuture().cancel(false)) {
r.getTimeoutTask().run();
@@ -387,17 +369,6 @@
request.signal(e);
filter.exceptionCaught(session, e);
}
- }
- }
-
- private static class RequestWriteRequest extends WriteRequestWrapper {
- public RequestWriteRequest(WriteRequest writeRequest) {
- super(writeRequest);
- }
-
- @Override
- public Object getMessage() {
- return ((Request) super.getMessage()).getMessage();
}
}
}
Added:
mina/trunk/core/src/main/java/org/apache/mina/filter/util/WriteRequestFilter.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/util/WriteRequestFilter.java?rev=580644&view=auto
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/filter/util/WriteRequestFilter.java
(added)
+++
mina/trunk/core/src/main/java/org/apache/mina/filter/util/WriteRequestFilter.java
Sat Sep 29 14:13:48 2007
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.mina.filter.util;
+
+import org.apache.mina.common.IoEventType;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteRequest;
+import org.apache.mina.common.WriteRequestWrapper;
+
+/**
+ * An abstract [EMAIL PROTECTED] IoFilter} that simplifies the implementation
of
+ * an [EMAIL PROTECTED] IoFilter} that filters an [EMAIL PROTECTED]
IoEventType#WRITE} event.
+ *
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$
+ *
+ */
+public abstract class WriteRequestFilter extends IoFilterAdapter {
+
+ @Override
+ public void exceptionCaught(NextFilter nextFilter, IoSession session,
+ Throwable cause) throws Exception {
+ nextFilter.exceptionCaught(session, cause);
+ }
+
+ @Override
+ public void filterWrite(NextFilter nextFilter, IoSession session,
+ WriteRequest writeRequest) throws Exception {
+ Object filteredMessage = doFilterWrite(nextFilter, session,
writeRequest);
+ if (filteredMessage != null && filteredMessage !=
writeRequest.getMessage()) {
+ nextFilter.filterWrite(
+ session, new FilteredWriteRequest(
+ filteredMessage, writeRequest));
+ } else {
+ nextFilter.filterWrite(session, writeRequest);
+ }
+ }
+
+ @Override
+ public void messageSent(NextFilter nextFilter, IoSession session,
+ WriteRequest writeRequest) throws Exception {
+ if (writeRequest instanceof FilteredWriteRequest) {
+ FilteredWriteRequest req = (FilteredWriteRequest) writeRequest;
+ if (req.getParent() == this) {
+ nextFilter.messageSent(session, req.getWriteRequest());
+ return;
+ }
+ }
+
+ nextFilter.messageSent(session, writeRequest);
+ }
+
+ protected abstract Object doFilterWrite(
+ NextFilter nextFilter, IoSession session, WriteRequest
writeRequest) throws Exception;
+
+ private class FilteredWriteRequest extends WriteRequestWrapper {
+ private final Object filteredMessage;
+
+ public FilteredWriteRequest(Object filteredMessage, WriteRequest
writeRequest) {
+ super(writeRequest);
+
+ if (filteredMessage == null) {
+ throw new NullPointerException("filteredMessage");
+ }
+ this.filteredMessage = filteredMessage;
+ }
+
+ public WriteRequestFilter getParent() {
+ return WriteRequestFilter.this;
+ }
+
+ @Override
+ public Object getMessage() {
+ return filteredMessage;
+ }
+ }
+}
Propchange:
mina/trunk/core/src/main/java/org/apache/mina/filter/util/WriteRequestFilter.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
mina/trunk/core/src/main/java/org/apache/mina/filter/util/WriteRequestFilter.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified:
mina/trunk/filter-compression/src/main/java/org/apache/mina/filter/compression/CompressionFilter.java
URL:
http://svn.apache.org/viewvc/mina/trunk/filter-compression/src/main/java/org/apache/mina/filter/compression/CompressionFilter.java?rev=580644&r1=580643&r2=580644&view=diff
==============================================================================
---
mina/trunk/filter-compression/src/main/java/org/apache/mina/filter/compression/CompressionFilter.java
(original)
+++
mina/trunk/filter-compression/src/main/java/org/apache/mina/filter/compression/CompressionFilter.java
Sat Sep 29 14:13:48 2007
@@ -22,12 +22,11 @@
import java.io.IOException;
import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.DefaultWriteRequest;
import org.apache.mina.common.IoFilter;
-import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoSession;
import org.apache.mina.common.WriteRequest;
+import org.apache.mina.filter.util.WriteRequestFilter;
/**
* An [EMAIL PROTECTED] IoFilter} which compresses all data using
@@ -56,7 +55,7 @@
* @author The Apache MINA Project ([EMAIL PROTECTED])
* @version $Rev$, $Date$
*/
-public class CompressionFilter extends IoFilterAdapter {
+public class CompressionFilter extends WriteRequestFilter {
/**
* Max compression level. Will give the highest compression ratio, but
* will also take more cpu time and is the slowest.
@@ -166,18 +165,17 @@
* @see
org.apache.mina.common.IoFilter#filterWrite(org.apache.mina.common.IoFilter.NextFilter,
org.apache.mina.common.IoSession, org.apache.mina.common.IoFilter.WriteRequest)
*/
@Override
- public void filterWrite(NextFilter nextFilter, IoSession session,
+ protected Object doFilterWrite(
+ NextFilter nextFilter, IoSession session,
WriteRequest writeRequest) throws IOException {
if (!compressOutbound) {
- nextFilter.filterWrite(session, writeRequest);
- return;
+ return null;
}
if (session.containsAttribute(DISABLE_COMPRESSION_ONCE)) {
// Remove the marker attribute because it is temporary.
session.removeAttribute(DISABLE_COMPRESSION_ONCE);
- nextFilter.filterWrite(session, writeRequest);
- return;
+ return null;
}
Zlib deflater = (Zlib) session.getAttribute(DEFLATER);
@@ -188,11 +186,9 @@
ByteBuffer inBuffer = (ByteBuffer) writeRequest.getMessage();
if (!inBuffer.hasRemaining()) {
// Ignore empty buffers
- nextFilter.filterWrite(session, writeRequest);
+ return null;
} else {
- ByteBuffer outBuf = deflater.deflate(inBuffer);
- nextFilter.filterWrite(session, new DefaultWriteRequest(outBuf,
- writeRequest.getFuture(), writeRequest.getDestination()));
+ return deflater.deflate(inBuffer);
}
}