Hi,

While writing a test for traffic control on VmPipe transports I may have
stumbled across a bug in VmPipeFilter. Basically I have an echo server
and a client which repeatedly sends a single byte to the server. When I
run this test I get an exception:

Exception in thread "IoThreadPool-1" java.lang.IllegalStateException:
Already released buffer.  You released the buffer too many times.
    at
org.apache.mina.common.ByteBuffer$DefaultByteBuffer.release(ByteBuffer.java:593)
    at
org.apache.mina.util.ByteBufferUtil.releaseIfPossible(ByteBufferUtil.java:43)
    at
org.apache.mina.filter.ThreadPoolFilter.processEvent(ThreadPoolFilter.java:616)
    at
org.apache.mina.filter.ThreadPoolFilter$Worker.processEvents(ThreadPoolFilter.java:372)
    at
org.apache.mina.filter.ThreadPoolFilter$Worker.run(ThreadPoolFilter.java:326)

I think this is because the ThreadPoolFilter releases buffers right
after it has called messageReceived/messageSent. Since VmPipeFilter
sends the same message object (i.e. the buffer) to these two methods the
ThreadPoolFilter will already have released the buffer when VmPipeFilter
calls nextFilter.messageReceived.

I have attached a simple application which demonstrates the bug. I've
also attached a fix to VmPipeFilter which also adds correct byte counts
if messages are buffers. Let me know if the fix looks ok and I will
check it in.

/Niklas

package org.apache.mina.transport.vmpipe;

import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.ThreadPoolFilter;

public class VmPipeFilterTest
{
    public static void main(String[] args) throws Exception
    {
        ThreadPoolFilter threadPoolFilter = new ThreadPoolFilter();
        VmPipeAcceptor acceptor = new VmPipeAcceptor();
        acceptor.getFilterChain().addFirst( "threadPool", threadPoolFilter );
        VmPipeConnector connector = new VmPipeConnector();
        connector.getFilterChain().addFirst( "threadPool", threadPoolFilter );
        VmPipeAddress address = new VmPipeAddress( 1234 );

        IoHandler handler = new IoHandlerAdapter()
        {
            public void messageReceived( IoSession session, Object message )
                    throws Exception
            {
                session.write( ByteBuffer.allocate( 1 ) );
            }
        };
        
        acceptor.bind( address, handler );
        
        ConnectFuture future = connector.connect( address, handler );
        
        IoSession session = future.getSession();
        session.write( ByteBuffer.allocate( 1 ) );
        Thread.sleep( 5 * 1000 );
        session.close().join();
        
        acceptor.unbind( address );
        
        acceptor.getFilterChain().clear();
        connector.getFilterChain().clear();
    }
}
Index: src/java/org/apache/mina/transport/vmpipe/support/VmPipeFilter.java
===================================================================
--- src/java/org/apache/mina/transport/vmpipe/support/VmPipeFilter.java	(revision 348511)
+++ src/java/org/apache/mina/transport/vmpipe/support/VmPipeFilter.java	(working copy)
@@ -3,6 +3,7 @@
  */
 package org.apache.mina.transport.vmpipe.support;
 
+import org.apache.mina.common.ByteBuffer;
 import org.apache.mina.common.IoFilterAdapter;
 import org.apache.mina.common.IoSession;
 
@@ -19,20 +20,41 @@
     {
         VmPipeSessionImpl vps = ( VmPipeSessionImpl ) session;
 
-        vps.increaseReadBytes( 1 );
-
+        // Make a copy of message if it's a ByteBuffer. This prevents the buffer 
+        // from being released twice. We also update the readBytes with the number 
+        // of bytes remaining.
+        int byteCount = 1;
+        Object messageCopy = message;
+        if( message instanceof ByteBuffer )
+        {
+            ByteBuffer rb = ( ByteBuffer ) message;
+            byteCount = rb.remaining();
+            ByteBuffer wb = ByteBuffer.allocate( rb.remaining() );
+            wb.put( rb );
+            wb.flip();
+            messageCopy = wb;
+        }
+        vps.increaseReadBytes( byteCount );
+        
         // fire messageSent event first
         vps.remoteSession.getManagerFilterChain().messageSent( vps.remoteSession, message );
 
         // and then messageReceived
-        nextFilter.messageReceived( session, message );
+        nextFilter.messageReceived( session, messageCopy );
     }
 
     public void messageSent( NextFilter nextFilter,
                              IoSession session, Object message )
     {
         VmPipeSessionImpl vps = ( VmPipeSessionImpl ) session;
-        vps.increaseWrittenBytes( 1 );
+        
+        int byteCount = 1;
+        if( message instanceof ByteBuffer )
+        {
+            byteCount = ( ( ByteBuffer) message ).remaining();
+        }
+        vps.increaseWrittenBytes( byteCount );
+        
         vps.increaseWrittenWriteRequests();
 
         nextFilter.messageSent( session, message );

Reply via email to