Author: trustin
Date: Sat Sep 29 14:13:48 2007
New Revision: 580644

URL: http://svn.apache.org/viewvc?rev=580644&view=rev
Log:
* Added WriteRequestFilter that helps implementation of data transformation 
filters
* Changed CompressionFilter and RequestResponseFilter to extend 
WriteRequestFilter (NOTE: SslFilter and ProtocolCodecFilter is somewhat 
different from usual data transformation filters in that it can generate more 
than two messages.)


Added:
    
mina/trunk/core/src/main/java/org/apache/mina/filter/util/WriteRequestFilter.java
   (with props)
Modified:
    
mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java
    
mina/trunk/filter-compression/src/main/java/org/apache/mina/filter/compression/CompressionFilter.java

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java?rev=580644&r1=580643&r2=580644&view=diff
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/filter/reqres/RequestResponseFilter.java
 Sat Sep 29 14:13:48 2007
@@ -22,6 +22,7 @@
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
@@ -30,19 +31,18 @@
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.mina.common.IoFilterAdapter;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.IoSessionLogger;
 import org.apache.mina.common.WriteRequest;
-import org.apache.mina.common.WriteRequestWrapper;
+import org.apache.mina.filter.util.WriteRequestFilter;
 
 /**
  *
  * @author The Apache MINA Project ([EMAIL PROTECTED])
  * @version $Rev$, $Date$
  */
-public class RequestResponseFilter extends IoFilterAdapter {
+public class RequestResponseFilter extends WriteRequestFilter {
 
     private static final String RESPONSE_INSPECTOR = 
RequestResponseFilter.class
             .getName()
@@ -179,20 +179,18 @@
             nextFilter.messageReceived(session, response);
         }
     }
-
+    
     @Override
-    public void filterWrite(NextFilter nextFilter, IoSession session,
-            WriteRequest writeRequest) throws Exception {
+    protected Object doFilterWrite(
+            final NextFilter nextFilter, IoSession session, WriteRequest 
writeRequest) throws Exception {
         Object message = writeRequest.getMessage();
         if (!(message instanceof Request)) {
-            nextFilter.filterWrite(session, writeRequest);
-            return;
+            return null;
         }
-
-        Request request = (Request) message;
+        
+        final Request request = (Request) message;
         if (request.getTimeoutFuture() != null) {
-            nextFilter.exceptionCaught(session, new IllegalArgumentException(
-                    "Request can not be reused."));
+            throw new IllegalArgumentException("Request can not be reused.");
         }
 
         Map<Object, Request> requestStore = getRequestStore(session);
@@ -205,60 +203,44 @@
             }
         }
         if (oldValue != null) {
-            nextFilter.exceptionCaught(session, new IllegalStateException(
-                    "Duplicate request ID: " + request.getId()));
+            throw new IllegalStateException(
+                    "Duplicate request ID: " + request.getId());
+        }
+        
+        // Schedule a task to be executed on timeout.
+        // Find the timeout date avoiding overflow.
+        Date timeoutDate = new Date(System.currentTimeMillis());
+        if (Long.MAX_VALUE - request.getTimeoutMillis() < timeoutDate
+                .getTime()) {
+            timeoutDate.setTime(Long.MAX_VALUE);
+        } else {
+            timeoutDate.setTime(timeoutDate.getTime()
+                    + request.getTimeoutMillis());
         }
 
-        nextFilter.filterWrite(session, new RequestWriteRequest(writeRequest));
-    }
-
-    @Override
-    public void messageSent(final NextFilter nextFilter,
-            final IoSession session, WriteRequest writeRequest)
-            throws Exception {
-        if (writeRequest instanceof RequestWriteRequest) {
-            // Schedule a task to be executed on timeout.
-            RequestWriteRequest wrappedRequest = (RequestWriteRequest) 
writeRequest;
-            WriteRequest actualRequest = wrappedRequest.getWriteRequest();
-            final Request request = (Request) actualRequest.getMessage();
-
-            // Find the timeout date avoiding overflow.
-            Date timeoutDate = new Date(System.currentTimeMillis());
-            if (Long.MAX_VALUE - request.getTimeoutMillis() < timeoutDate
-                    .getTime()) {
-                timeoutDate.setTime(Long.MAX_VALUE);
-            } else {
-                timeoutDate.setTime(timeoutDate.getTime()
-                        + request.getTimeoutMillis());
-            }
-
-            TimeoutTask timeoutTask = new TimeoutTask(nextFilter, request,
-                    session);
-
-            // Schedule the timeout task.
-            ScheduledFuture<?> timeoutFuture = timeoutScheduler.schedule(
-                    timeoutTask, request.getTimeoutMillis(),
-                    TimeUnit.MILLISECONDS);
-            request.setTimeoutTask(timeoutTask);
-            request.setTimeoutFuture(timeoutFuture);
+        TimeoutTask timeoutTask = new TimeoutTask(
+                nextFilter, request, session);
 
-            // Add the timtoue task to the unfinished task set.
-            Set<Request> unrespondedRequests = 
getUnrespondedRequestStore(session);
-            synchronized (unrespondedRequests) {
-                unrespondedRequests.add(request);
-            }
+        // Schedule the timeout task.
+        ScheduledFuture<?> timeoutFuture = timeoutScheduler.schedule(
+                timeoutTask, request.getTimeoutMillis(),
+                TimeUnit.MILLISECONDS);
+        request.setTimeoutTask(timeoutTask);
+        request.setTimeoutFuture(timeoutFuture);
 
-            // and forward the original write request.
-            nextFilter.messageSent(session, wrappedRequest.getWriteRequest());
-        } else {
-            nextFilter.messageSent(session, writeRequest);
+        // Add the timeout task to the unfinished task set.
+        Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
+        synchronized (unrespondedRequests) {
+            unrespondedRequests.add(request);
         }
+        
+        return request.getMessage();
     }
 
     @Override
     public void sessionClosed(NextFilter nextFilter, IoSession session)
             throws Exception {
-        // Copy the unifished task set to avoid unnecessary lock acquisition.
+        // Copy the unfinished task set to avoid unnecessary lock acquisition.
         // Copying will be cheap because there won't be that many requests 
queued.
         Set<Request> unrespondedRequests = getUnrespondedRequestStore(session);
         List<Request> unrespondedRequestsCopy;
@@ -268,7 +250,7 @@
             unrespondedRequests.clear();
         }
 
-        // Generate timeout artifically.
+        // Generate timeout artificially.
         for (Request r : unrespondedRequestsCopy) {
             if (r.getTimeoutFuture().cancel(false)) {
                 r.getTimeoutTask().run();
@@ -387,17 +369,6 @@
                 request.signal(e);
                 filter.exceptionCaught(session, e);
             }
-        }
-    }
-
-    private static class RequestWriteRequest extends WriteRequestWrapper {
-        public RequestWriteRequest(WriteRequest writeRequest) {
-            super(writeRequest);
-        }
-
-        @Override
-        public Object getMessage() {
-            return ((Request) super.getMessage()).getMessage();
         }
     }
 }

Added: 
mina/trunk/core/src/main/java/org/apache/mina/filter/util/WriteRequestFilter.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/util/WriteRequestFilter.java?rev=580644&view=auto
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/filter/util/WriteRequestFilter.java
 (added)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/filter/util/WriteRequestFilter.java
 Sat Sep 29 14:13:48 2007
@@ -0,0 +1,95 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ */
+package org.apache.mina.filter.util;
+
+import org.apache.mina.common.IoEventType;
+import org.apache.mina.common.IoFilterAdapter;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.WriteRequest;
+import org.apache.mina.common.WriteRequestWrapper;
+
+/**
+ * An abstract [EMAIL PROTECTED] IoFilter} that simplifies the implementation 
of
+ * an [EMAIL PROTECTED] IoFilter} that filters an [EMAIL PROTECTED] 
IoEventType#WRITE} event.
+ * 
+ * @author The Apache MINA Project ([EMAIL PROTECTED])
+ * @version $Rev$, $Date$
+ *
+ */
+public abstract class WriteRequestFilter extends IoFilterAdapter {
+
+    @Override
+    public void exceptionCaught(NextFilter nextFilter, IoSession session,
+            Throwable cause) throws Exception {
+        nextFilter.exceptionCaught(session, cause);
+    }
+
+    @Override
+    public void filterWrite(NextFilter nextFilter, IoSession session,
+            WriteRequest writeRequest) throws Exception {
+        Object filteredMessage = doFilterWrite(nextFilter, session, 
writeRequest);
+        if (filteredMessage != null && filteredMessage != 
writeRequest.getMessage()) {
+            nextFilter.filterWrite(
+                    session, new FilteredWriteRequest(
+                            filteredMessage, writeRequest));
+        } else {
+            nextFilter.filterWrite(session, writeRequest);
+        }
+    }
+
+    @Override
+    public void messageSent(NextFilter nextFilter, IoSession session,
+            WriteRequest writeRequest) throws Exception {
+        if (writeRequest instanceof FilteredWriteRequest) {
+            FilteredWriteRequest req = (FilteredWriteRequest) writeRequest;
+            if (req.getParent() == this) {
+                nextFilter.messageSent(session, req.getWriteRequest());
+                return;
+            }
+        }
+        
+        nextFilter.messageSent(session, writeRequest);
+    }
+    
+    protected abstract Object doFilterWrite(
+            NextFilter nextFilter, IoSession session, WriteRequest 
writeRequest) throws Exception;
+
+    private class FilteredWriteRequest extends WriteRequestWrapper {
+        private final Object filteredMessage;
+        
+        public FilteredWriteRequest(Object filteredMessage, WriteRequest 
writeRequest) {
+            super(writeRequest);
+            
+            if (filteredMessage == null) {
+                throw new NullPointerException("filteredMessage");
+            }
+            this.filteredMessage = filteredMessage;
+        }
+        
+        public WriteRequestFilter getParent() {
+            return WriteRequestFilter.this;
+        }
+
+        @Override
+        public Object getMessage() {
+            return filteredMessage;
+        }
+    }
+}

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/filter/util/WriteRequestFilter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
mina/trunk/core/src/main/java/org/apache/mina/filter/util/WriteRequestFilter.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: 
mina/trunk/filter-compression/src/main/java/org/apache/mina/filter/compression/CompressionFilter.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/filter-compression/src/main/java/org/apache/mina/filter/compression/CompressionFilter.java?rev=580644&r1=580643&r2=580644&view=diff
==============================================================================
--- 
mina/trunk/filter-compression/src/main/java/org/apache/mina/filter/compression/CompressionFilter.java
 (original)
+++ 
mina/trunk/filter-compression/src/main/java/org/apache/mina/filter/compression/CompressionFilter.java
 Sat Sep 29 14:13:48 2007
@@ -22,12 +22,11 @@
 import java.io.IOException;
 
 import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.DefaultWriteRequest;
 import org.apache.mina.common.IoFilter;
-import org.apache.mina.common.IoFilterAdapter;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoSession;
 import org.apache.mina.common.WriteRequest;
+import org.apache.mina.filter.util.WriteRequestFilter;
 
 /**
  * An [EMAIL PROTECTED] IoFilter} which compresses all data using
@@ -56,7 +55,7 @@
  * @author The Apache MINA Project ([EMAIL PROTECTED])
  * @version $Rev$, $Date$
  */
-public class CompressionFilter extends IoFilterAdapter {
+public class CompressionFilter extends WriteRequestFilter {
     /**
      * Max compression level.  Will give the highest compression ratio, but
      * will also take more cpu time and is the slowest.
@@ -166,18 +165,17 @@
      * @see 
org.apache.mina.common.IoFilter#filterWrite(org.apache.mina.common.IoFilter.NextFilter,
 org.apache.mina.common.IoSession, org.apache.mina.common.IoFilter.WriteRequest)
      */
     @Override
-    public void filterWrite(NextFilter nextFilter, IoSession session,
+    protected Object doFilterWrite(
+            NextFilter nextFilter, IoSession session,
             WriteRequest writeRequest) throws IOException {
         if (!compressOutbound) {
-            nextFilter.filterWrite(session, writeRequest);
-            return;
+            return null;
         }
 
         if (session.containsAttribute(DISABLE_COMPRESSION_ONCE)) {
             // Remove the marker attribute because it is temporary.
             session.removeAttribute(DISABLE_COMPRESSION_ONCE);
-            nextFilter.filterWrite(session, writeRequest);
-            return;
+            return null;
         }
 
         Zlib deflater = (Zlib) session.getAttribute(DEFLATER);
@@ -188,11 +186,9 @@
         ByteBuffer inBuffer = (ByteBuffer) writeRequest.getMessage();
         if (!inBuffer.hasRemaining()) {
             // Ignore empty buffers
-            nextFilter.filterWrite(session, writeRequest);
+            return null;
         } else {
-            ByteBuffer outBuf = deflater.deflate(inBuffer);
-            nextFilter.filterWrite(session, new DefaultWriteRequest(outBuf,
-                    writeRequest.getFuture(), writeRequest.getDestination()));
+            return deflater.deflate(inBuffer);
         }
     }
 


Reply via email to