Author: trustin
Date: Mon Feb 19 23:18:46 2007
New Revision: 509442

URL: http://svn.apache.org/viewvc?view=rev&rev=509442
Log:
* Optimized ExecutorFilter in trunk a little bit
* Reverted back the ExecutorFilter in branches/1.1 to use synchronized block
** This change fixes DIRMINA-352 (races in ExecutorFilter)


Modified:
    
mina/branches/1.1/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
    
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java

Modified: 
mina/branches/1.1/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
URL: 
http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java?view=diff&rev=509442&r1=509441&r2=509442
==============================================================================
--- 
mina/branches/1.1/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
 (original)
+++ 
mina/branches/1.1/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
 Mon Feb 19 23:18:46 2007
@@ -6,33 +6,30 @@
  *  to you under the Apache License, Version 2.0 (the
  *  "License"); you may not use this file except in compliance
  *  with the License.  You may obtain a copy of the License at
- *
+ *  
  *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ *  
  *  Unless required by applicable law or agreed to in writing,
  *  software distributed under the License is distributed on an
  *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  *  KIND, either express or implied.  See the License for the
  *  specific language governing permissions and limitations
- *  under the License.
- *
+ *  under the License. 
+ *  
  */
 package org.apache.mina.filter.executor;
 
+import java.util.LinkedList;
 import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.Executor;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.mina.common.IdleStatus;
 import org.apache.mina.common.IoFilterAdapter;
 import org.apache.mina.common.IoFilterChain;
 import org.apache.mina.common.IoSession;
-import org.apache.mina.common.ThreadModel;
-import org.apache.mina.util.ByteBufferUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,14 +37,11 @@
  * A filter that forward events to [EMAIL PROTECTED] Executor} in
  * <a 
href="http://dcl.mathcs.emory.edu/util/backport-util-concurrent/";>backport-util-concurrent</a>.
  * You can apply various thread model by inserting this filter to the [EMAIL 
PROTECTED] IoFilterChain}.
- * This filter is usually inserted by [EMAIL PROTECTED] ThreadModel} 
automatically, so you don't need
- * to add this filter in most cases.
- * <p/>
+ * <p>
  * Please note that this filter doesn't manage the life cycle of the underlying
  * [EMAIL PROTECTED] Executor}.  You have to destroy or stop it by yourself.
- * <p/>
- * <a href="mailto:[email protected]";>Apache Directory Project</a>
  *
+ * @author The Apache MINA Project ([email protected])
  * @version $Rev: 350169 $, $Date: 2005-12-01 00:17:41 -0500 (Thu, 01 Dec 
2005) $
  */
 public class ExecutorFilter extends IoFilterAdapter
@@ -61,9 +55,9 @@
      */
     public ExecutorFilter()
     {
-        this( new ThreadPoolExecutor( 16, 16, 60, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>() ) );
+        this( new ThreadPoolExecutor(16, 16, 60, TimeUnit.SECONDS, new 
LinkedBlockingQueue<Runnable>() ) );
     }
-
+    
     /**
      * Creates a new instance with the specified <tt>executor</tt>.
      */
@@ -91,11 +85,24 @@
         Event event = new Event( type, nextFilter, data );
         SessionBuffer buf = SessionBuffer.getSessionBuffer( session );
 
-        buf.eventQueue.add( event );
-
-        if( buf.processingCompleted.compareAndSet( true, false ) )
+        boolean execute;
+        synchronized( buf.eventQueue )
+        {
+            buf.eventQueue.offer( event );
+            if( buf.processingCompleted )
+            {
+                buf.processingCompleted = false;
+                execute = true;
+            }
+            else
+            {
+               execute = false;
+            }
+        }
+        
+        if( execute )
         {
-            if( logger.isDebugEnabled() )
+            if ( logger.isDebugEnabled() )
             {
                 logger.debug( "Launching thread for " + 
session.getRemoteAddress() );
             }
@@ -123,8 +130,8 @@
         }
 
         private final IoSession session;
-        private final Queue<Event> eventQueue = new 
ConcurrentLinkedQueue<Event>();
-        private AtomicBoolean processingCompleted = new AtomicBoolean( true );
+        private final Queue<Event> eventQueue = new LinkedList<Event>();
+        private boolean processingCompleted = true;
 
         private SessionBuffer( IoSession session )
         {
@@ -132,16 +139,35 @@
         }
     }
 
-    protected static enum EventType
+    protected static class EventType
     {
-        OPENED,
-        CLOSED,
-        READ,
-        WRITTEN,
-        RECEIVED,
-        SENT,
-        IDLE,
-        EXCEPTION
+        public static final EventType OPENED = new EventType( "OPENED" );
+
+        public static final EventType CLOSED = new EventType( "CLOSED" );
+
+        public static final EventType READ = new EventType( "READ" );
+
+        public static final EventType WRITTEN = new EventType( "WRITTEN" );
+
+        public static final EventType RECEIVED = new EventType( "RECEIVED" );
+
+        public static final EventType SENT = new EventType( "SENT" );
+
+        public static final EventType IDLE = new EventType( "IDLE" );
+
+        public static final EventType EXCEPTION = new EventType( "EXCEPTION" );
+
+        private final String value;
+
+        private EventType( String value )
+        {
+            this.value = value;
+        }
+
+        public String toString()
+        {
+            return value;
+        }
     }
 
     protected static class Event
@@ -173,90 +199,80 @@
         }
     }
 
-    @Override
     public void sessionCreated( NextFilter nextFilter, IoSession session )
     {
         nextFilter.sessionCreated( session );
     }
 
-    @Override
     public void sessionOpened( NextFilter nextFilter,
                                IoSession session )
     {
         fireEvent( nextFilter, session, EventType.OPENED, null );
     }
 
-    @Override
     public void sessionClosed( NextFilter nextFilter,
                                IoSession session )
     {
         fireEvent( nextFilter, session, EventType.CLOSED, null );
     }
 
-    @Override
     public void sessionIdle( NextFilter nextFilter,
                              IoSession session, IdleStatus status )
     {
         fireEvent( nextFilter, session, EventType.IDLE, status );
     }
 
-    @Override
     public void exceptionCaught( NextFilter nextFilter,
                                  IoSession session, Throwable cause )
     {
         fireEvent( nextFilter, session, EventType.EXCEPTION, cause );
     }
 
-    @Override
     public void messageReceived( NextFilter nextFilter,
                                  IoSession session, Object message )
     {
-        ByteBufferUtil.acquireIfPossible( message );
         fireEvent( nextFilter, session, EventType.RECEIVED, message );
     }
 
-    @Override
     public void messageSent( NextFilter nextFilter,
                              IoSession session, Object message )
     {
-        ByteBufferUtil.acquireIfPossible( message );
         fireEvent( nextFilter, session, EventType.SENT, message );
     }
 
     protected void processEvent( NextFilter nextFilter, IoSession session, 
EventType type, Object data )
     {
-        switch( type )
+        if( type == EventType.RECEIVED )
+        {
+            nextFilter.messageReceived( session, data );
+        }
+        else if( type == EventType.SENT )
         {
-            case RECEIVED:
-                nextFilter.messageReceived( session, data );
-                ByteBufferUtil.releaseIfPossible( data );
-                break;
-            case SENT:
-                nextFilter.messageSent( session, data );
-                ByteBufferUtil.releaseIfPossible( data );
-                break;
-            case EXCEPTION:
-                nextFilter.exceptionCaught( session, ( Throwable ) data );
-                break;
-            case IDLE:
-                nextFilter.sessionIdle( session, ( IdleStatus ) data );
-                break;
-            case OPENED:
-                nextFilter.sessionOpened( session );
-                break;
-            case CLOSED:
-                nextFilter.sessionClosed( session );
-                break;
+            nextFilter.messageSent( session, data );
+        }
+        else if( type == EventType.EXCEPTION )
+        {
+            nextFilter.exceptionCaught( session, (Throwable)data );
+        }
+        else if( type == EventType.IDLE )
+        {
+            nextFilter.sessionIdle( session, (IdleStatus)data );
+        }
+        else if( type == EventType.OPENED )
+        {
+            nextFilter.sessionOpened( session );
+        }
+        else if( type == EventType.CLOSED )
+        {
+            nextFilter.sessionClosed( session );
         }
     }
 
-    @Override
     public void filterWrite( NextFilter nextFilter, IoSession session, 
WriteRequest writeRequest )
     {
         nextFilter.filterWrite( session, writeRequest );
     }
 
-    @Override
     public void filterClose( NextFilter nextFilter, IoSession session ) throws 
Exception
     {
         nextFilter.filterClose( session );
@@ -275,19 +291,23 @@
         {
             while( true )
             {
-                Event event = buffer.eventQueue.poll();
+                Event event;
 
-                if( null == event )
+                synchronized( buffer.eventQueue )
                 {
-                    buffer.processingCompleted.compareAndSet( false, true );
-                    break;
+                    event = buffer.eventQueue.poll();
+                    
+                    if( event == null )
+                    {
+                        buffer.processingCompleted = true;
+                        break;
+                    }
                 }
 
                 processEvent( event.getNextFilter(), buffer.session, 
event.getType(), event.getData() );
             }
 
-            if( logger.isDebugEnabled() )
-            {
+            if ( logger.isDebugEnabled() ) {
                 logger.debug( "Exiting since queue is empty for " + 
buffer.session.getRemoteAddress() );
             }
         }

Modified: 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
URL: 
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java?view=diff&rev=509442&r1=509441&r2=509442
==============================================================================
--- 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
 (original)
+++ 
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
 Mon Feb 19 23:18:46 2007
@@ -85,18 +85,29 @@
         Event event = new Event( type, nextFilter, data );
         SessionBuffer buf = SessionBuffer.getSessionBuffer( session );
 
+        boolean execute;
         synchronized( buf.eventQueue )
         {
             buf.eventQueue.offer( event );
             if( buf.processingCompleted )
             {
                 buf.processingCompleted = false;
-                if ( logger.isDebugEnabled() ) {
-                    logger.debug( "Launching thread for " + 
session.getRemoteAddress() );
-                }
-
-                executor.execute( new ProcessEventsRunnable( buf ) );
+                execute = true;
+            }
+            else
+            {
+               execute = false;
             }
+        }
+        
+        if( execute )
+        {
+            if ( logger.isDebugEnabled() )
+            {
+                logger.debug( "Launching thread for " + 
session.getRemoteAddress() );
+            }
+
+            executor.execute( new ProcessEventsRunnable( buf ) );
         }
     }
 


Reply via email to