Hi,
While writing a test for traffic control on VmPipe transports I may have
stumbled across a bug in VmPipeFilter. Basically I have an echo server
and a client which repeatedly sends a single byte to the server. When I
run this test I get an exception:
Exception in thread "IoThreadPool-1" java.lang.IllegalStateException:
Already released buffer. You released the buffer too many times.
at
org.apache.mina.common.ByteBuffer$DefaultByteBuffer.release(ByteBuffer.java:593)
at
org.apache.mina.util.ByteBufferUtil.releaseIfPossible(ByteBufferUtil.java:43)
at
org.apache.mina.filter.ThreadPoolFilter.processEvent(ThreadPoolFilter.java:616)
at
org.apache.mina.filter.ThreadPoolFilter$Worker.processEvents(ThreadPoolFilter.java:372)
at
org.apache.mina.filter.ThreadPoolFilter$Worker.run(ThreadPoolFilter.java:326)
I think this is because the ThreadPoolFilter releases buffers right
after it has called messageReceived/messageSent. Since VmPipeFilter
sends the same message object (i.e. the buffer) to these two methods the
ThreadPoolFilter will already have released the buffer when VmPipeFilter
calls nextFilter.messageReceived.
I have attached a simple application which demonstrates the bug. I've
also attached a fix to VmPipeFilter which also adds correct byte counts
if messages are buffers. Let me know if the fix looks ok and I will
check it in.
/Niklas
package org.apache.mina.transport.vmpipe;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoHandler;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.ThreadPoolFilter;
public class VmPipeFilterTest
{
public static void main(String[] args) throws Exception
{
ThreadPoolFilter threadPoolFilter = new ThreadPoolFilter();
VmPipeAcceptor acceptor = new VmPipeAcceptor();
acceptor.getFilterChain().addFirst( "threadPool", threadPoolFilter );
VmPipeConnector connector = new VmPipeConnector();
connector.getFilterChain().addFirst( "threadPool", threadPoolFilter );
VmPipeAddress address = new VmPipeAddress( 1234 );
IoHandler handler = new IoHandlerAdapter()
{
public void messageReceived( IoSession session, Object message )
throws Exception
{
session.write( ByteBuffer.allocate( 1 ) );
}
};
acceptor.bind( address, handler );
ConnectFuture future = connector.connect( address, handler );
IoSession session = future.getSession();
session.write( ByteBuffer.allocate( 1 ) );
Thread.sleep( 5 * 1000 );
session.close().join();
acceptor.unbind( address );
acceptor.getFilterChain().clear();
connector.getFilterChain().clear();
}
}
Index: src/java/org/apache/mina/transport/vmpipe/support/VmPipeFilter.java
===================================================================
--- src/java/org/apache/mina/transport/vmpipe/support/VmPipeFilter.java (revision 348511)
+++ src/java/org/apache/mina/transport/vmpipe/support/VmPipeFilter.java (working copy)
@@ -3,6 +3,7 @@
*/
package org.apache.mina.transport.vmpipe.support;
+import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoFilterAdapter;
import org.apache.mina.common.IoSession;
@@ -19,20 +20,41 @@
{
VmPipeSessionImpl vps = ( VmPipeSessionImpl ) session;
- vps.increaseReadBytes( 1 );
-
+ // Make a copy of message if it's a ByteBuffer. This prevents the buffer
+ // from being released twice. We also update the readBytes with the number
+ // of bytes remaining.
+ int byteCount = 1;
+ Object messageCopy = message;
+ if( message instanceof ByteBuffer )
+ {
+ ByteBuffer rb = ( ByteBuffer ) message;
+ byteCount = rb.remaining();
+ ByteBuffer wb = ByteBuffer.allocate( rb.remaining() );
+ wb.put( rb );
+ wb.flip();
+ messageCopy = wb;
+ }
+ vps.increaseReadBytes( byteCount );
+
// fire messageSent event first
vps.remoteSession.getManagerFilterChain().messageSent( vps.remoteSession, message );
// and then messageReceived
- nextFilter.messageReceived( session, message );
+ nextFilter.messageReceived( session, messageCopy );
}
public void messageSent( NextFilter nextFilter,
IoSession session, Object message )
{
VmPipeSessionImpl vps = ( VmPipeSessionImpl ) session;
- vps.increaseWrittenBytes( 1 );
+
+ int byteCount = 1;
+ if( message instanceof ByteBuffer )
+ {
+ byteCount = ( ( ByteBuffer) message ).remaining();
+ }
+ vps.increaseWrittenBytes( byteCount );
+
vps.increaseWrittenWriteRequests();
nextFilter.messageSent( session, message );