Author: trustin
Date: Mon Feb 19 23:18:46 2007
New Revision: 509442
URL: http://svn.apache.org/viewvc?view=rev&rev=509442
Log:
* Optimized ExecutorFilter in trunk a little bit
* Reverted back the ExecutorFilter in branches/1.1 to use synchronized block
** This change fixes DIRMINA-352 (races in ExecutorFilter)
Modified:
mina/branches/1.1/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
Modified:
mina/branches/1.1/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
URL:
http://svn.apache.org/viewvc/mina/branches/1.1/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java?view=diff&rev=509442&r1=509441&r2=509442
==============================================================================
---
mina/branches/1.1/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
(original)
+++
mina/branches/1.1/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
Mon Feb 19 23:18:46 2007
@@ -6,33 +6,30 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
- *
+ * under the License.
+ *
*/
package org.apache.mina.filter.executor;
+import java.util.LinkedList;
import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.mina.common.IdleStatus;
import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoSession;
-import org.apache.mina.common.ThreadModel;
-import org.apache.mina.util.ByteBufferUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,14 +37,11 @@
* A filter that forward events to [EMAIL PROTECTED] Executor} in
* <a
href="http://dcl.mathcs.emory.edu/util/backport-util-concurrent/">backport-util-concurrent</a>.
* You can apply various thread model by inserting this filter to the [EMAIL
PROTECTED] IoFilterChain}.
- * This filter is usually inserted by [EMAIL PROTECTED] ThreadModel}
automatically, so you don't need
- * to add this filter in most cases.
- * <p/>
+ * <p>
* Please note that this filter doesn't manage the life cycle of the underlying
* [EMAIL PROTECTED] Executor}. You have to destroy or stop it by yourself.
- * <p/>
- * <a href="mailto:[email protected]">Apache Directory Project</a>
*
+ * @author The Apache MINA Project ([email protected])
* @version $Rev: 350169 $, $Date: 2005-12-01 00:17:41 -0500 (Thu, 01 Dec
2005) $
*/
public class ExecutorFilter extends IoFilterAdapter
@@ -61,9 +55,9 @@
*/
public ExecutorFilter()
{
- this( new ThreadPoolExecutor( 16, 16, 60, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>() ) );
+ this( new ThreadPoolExecutor(16, 16, 60, TimeUnit.SECONDS, new
LinkedBlockingQueue<Runnable>() ) );
}
-
+
/**
* Creates a new instance with the specified <tt>executor</tt>.
*/
@@ -91,11 +85,24 @@
Event event = new Event( type, nextFilter, data );
SessionBuffer buf = SessionBuffer.getSessionBuffer( session );
- buf.eventQueue.add( event );
-
- if( buf.processingCompleted.compareAndSet( true, false ) )
+ boolean execute;
+ synchronized( buf.eventQueue )
+ {
+ buf.eventQueue.offer( event );
+ if( buf.processingCompleted )
+ {
+ buf.processingCompleted = false;
+ execute = true;
+ }
+ else
+ {
+ execute = false;
+ }
+ }
+
+ if( execute )
{
- if( logger.isDebugEnabled() )
+ if ( logger.isDebugEnabled() )
{
logger.debug( "Launching thread for " +
session.getRemoteAddress() );
}
@@ -123,8 +130,8 @@
}
private final IoSession session;
- private final Queue<Event> eventQueue = new
ConcurrentLinkedQueue<Event>();
- private AtomicBoolean processingCompleted = new AtomicBoolean( true );
+ private final Queue<Event> eventQueue = new LinkedList<Event>();
+ private boolean processingCompleted = true;
private SessionBuffer( IoSession session )
{
@@ -132,16 +139,35 @@
}
}
- protected static enum EventType
+ protected static class EventType
{
- OPENED,
- CLOSED,
- READ,
- WRITTEN,
- RECEIVED,
- SENT,
- IDLE,
- EXCEPTION
+ public static final EventType OPENED = new EventType( "OPENED" );
+
+ public static final EventType CLOSED = new EventType( "CLOSED" );
+
+ public static final EventType READ = new EventType( "READ" );
+
+ public static final EventType WRITTEN = new EventType( "WRITTEN" );
+
+ public static final EventType RECEIVED = new EventType( "RECEIVED" );
+
+ public static final EventType SENT = new EventType( "SENT" );
+
+ public static final EventType IDLE = new EventType( "IDLE" );
+
+ public static final EventType EXCEPTION = new EventType( "EXCEPTION" );
+
+ private final String value;
+
+ private EventType( String value )
+ {
+ this.value = value;
+ }
+
+ public String toString()
+ {
+ return value;
+ }
}
protected static class Event
@@ -173,90 +199,80 @@
}
}
- @Override
public void sessionCreated( NextFilter nextFilter, IoSession session )
{
nextFilter.sessionCreated( session );
}
- @Override
public void sessionOpened( NextFilter nextFilter,
IoSession session )
{
fireEvent( nextFilter, session, EventType.OPENED, null );
}
- @Override
public void sessionClosed( NextFilter nextFilter,
IoSession session )
{
fireEvent( nextFilter, session, EventType.CLOSED, null );
}
- @Override
public void sessionIdle( NextFilter nextFilter,
IoSession session, IdleStatus status )
{
fireEvent( nextFilter, session, EventType.IDLE, status );
}
- @Override
public void exceptionCaught( NextFilter nextFilter,
IoSession session, Throwable cause )
{
fireEvent( nextFilter, session, EventType.EXCEPTION, cause );
}
- @Override
public void messageReceived( NextFilter nextFilter,
IoSession session, Object message )
{
- ByteBufferUtil.acquireIfPossible( message );
fireEvent( nextFilter, session, EventType.RECEIVED, message );
}
- @Override
public void messageSent( NextFilter nextFilter,
IoSession session, Object message )
{
- ByteBufferUtil.acquireIfPossible( message );
fireEvent( nextFilter, session, EventType.SENT, message );
}
protected void processEvent( NextFilter nextFilter, IoSession session,
EventType type, Object data )
{
- switch( type )
+ if( type == EventType.RECEIVED )
+ {
+ nextFilter.messageReceived( session, data );
+ }
+ else if( type == EventType.SENT )
{
- case RECEIVED:
- nextFilter.messageReceived( session, data );
- ByteBufferUtil.releaseIfPossible( data );
- break;
- case SENT:
- nextFilter.messageSent( session, data );
- ByteBufferUtil.releaseIfPossible( data );
- break;
- case EXCEPTION:
- nextFilter.exceptionCaught( session, ( Throwable ) data );
- break;
- case IDLE:
- nextFilter.sessionIdle( session, ( IdleStatus ) data );
- break;
- case OPENED:
- nextFilter.sessionOpened( session );
- break;
- case CLOSED:
- nextFilter.sessionClosed( session );
- break;
+ nextFilter.messageSent( session, data );
+ }
+ else if( type == EventType.EXCEPTION )
+ {
+ nextFilter.exceptionCaught( session, (Throwable)data );
+ }
+ else if( type == EventType.IDLE )
+ {
+ nextFilter.sessionIdle( session, (IdleStatus)data );
+ }
+ else if( type == EventType.OPENED )
+ {
+ nextFilter.sessionOpened( session );
+ }
+ else if( type == EventType.CLOSED )
+ {
+ nextFilter.sessionClosed( session );
}
}
- @Override
public void filterWrite( NextFilter nextFilter, IoSession session,
WriteRequest writeRequest )
{
nextFilter.filterWrite( session, writeRequest );
}
- @Override
public void filterClose( NextFilter nextFilter, IoSession session ) throws
Exception
{
nextFilter.filterClose( session );
@@ -275,19 +291,23 @@
{
while( true )
{
- Event event = buffer.eventQueue.poll();
+ Event event;
- if( null == event )
+ synchronized( buffer.eventQueue )
{
- buffer.processingCompleted.compareAndSet( false, true );
- break;
+ event = buffer.eventQueue.poll();
+
+ if( event == null )
+ {
+ buffer.processingCompleted = true;
+ break;
+ }
}
processEvent( event.getNextFilter(), buffer.session,
event.getType(), event.getData() );
}
- if( logger.isDebugEnabled() )
- {
+ if ( logger.isDebugEnabled() ) {
logger.debug( "Exiting since queue is empty for " +
buffer.session.getRemoteAddress() );
}
}
Modified:
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
URL:
http://svn.apache.org/viewvc/mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java?view=diff&rev=509442&r1=509441&r2=509442
==============================================================================
---
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
(original)
+++
mina/trunk/core/src/main/java/org/apache/mina/filter/executor/ExecutorFilter.java
Mon Feb 19 23:18:46 2007
@@ -85,18 +85,29 @@
Event event = new Event( type, nextFilter, data );
SessionBuffer buf = SessionBuffer.getSessionBuffer( session );
+ boolean execute;
synchronized( buf.eventQueue )
{
buf.eventQueue.offer( event );
if( buf.processingCompleted )
{
buf.processingCompleted = false;
- if ( logger.isDebugEnabled() ) {
- logger.debug( "Launching thread for " +
session.getRemoteAddress() );
- }
-
- executor.execute( new ProcessEventsRunnable( buf ) );
+ execute = true;
+ }
+ else
+ {
+ execute = false;
}
+ }
+
+ if( execute )
+ {
+ if ( logger.isDebugEnabled() )
+ {
+ logger.debug( "Launching thread for " +
session.getRemoteAddress() );
+ }
+
+ executor.execute( new ProcessEventsRunnable( buf ) );
}
}