Hi Luis,

I guess it works OK now.  I found your producer code doesn't generate
enough traffic, so I removed sleep(500); from the source code, and
replaced suspendRead and resumeRead calls with
ReadThrottleFilterBuilder.

What's somewhat weird is that the producer doesn't start to increase
its scheduledWriteMessages as soon as the socket receive buffer
becomes full.  I think it is related with the internal implementation
of the TCP/IP stack.

Let me paste my source code here:

package net.gleamynode.tmp;

import java.net.InetSocketAddress;

import org.apache.mina.filter.codec.ProtocolCodecFilter;
import 
org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.filter.executor.ExecutorFilter;
import org.apache.mina.filter.traffic.ReadThrottleFilterBuilder;
import org.apache.mina.transport.socket.nio.SocketAcceptor;

public class Consumer {
    public static void main(String[] args) throws Exception {
        SocketAcceptor acceptor = new SocketAcceptor();
        acceptor.getFilterChain().addLast(
                "executor",
                new ExecutorFilter());
        acceptor.getFilterChain().addLast(
                "codec",
                new ProtocolCodecFilter(new ObjectSerializationCodecFactory()));

        ReadThrottleFilterBuilder rtfb = new ReadThrottleFilterBuilder();
        rtfb.setMaximumConnectionBufferSize(64);
        rtfb.attach(acceptor.getFilterChain());

        acceptor.setLocalAddress(new InetSocketAddress(7777));
        acceptor.getSessionConfig().setReceiveBufferSize(128);
        acceptor.setHandler(new ConsumerHandler());
        acceptor.bind();
    }
}

----

package net.gleamynode.tmp;

import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;

public class ConsumerHandler extends IoHandlerAdapter {

    public void messageReceived(IoSession session, Object message)
            throws Exception {
        System.out.println("Consumer.getReadMessages(): "
                + session.getReadMessages());
        System.out.println((String) message);
        Thread.sleep(3000);
    }
}

----

package net.gleamynode.tmp;

import java.net.InetSocketAddress;

import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import 
org.apache.mina.filter.codec.serialization.ObjectSerializationCodecFactory;
import org.apache.mina.transport.socket.nio.SocketConnector;

public class Producer {
    public static void main(String[] args) throws Exception {

        SocketConnector connector = new SocketConnector();
        connector.getFilterChain().addLast("codec", new
ProtocolCodecFilter(new ObjectSerializationCodecFactory()));
        connector.setHandler(new IoHandlerAdapter());

        IoSession session = connector.connect(new
InetSocketAddress("127.0.0.1", 7777)).await().getSession();
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            System.out.println("Producer.getScheduledWriteMessages(): "
                    + session.getScheduledWriteMessages());
            String message = "content-" + i;
            System.out.println(message);
            session.write(message).await(500);
            if (session.isClosing()) break;
        }
    }
}

----

HTH,
Trustin
-- 
what we call human nature is actually human habit
--
http://gleamynode.net/
--
PGP Key ID: 0x0255ECA6

Reply via email to