[ 
https://issues.apache.org/jira/browse/DIRMINA-1149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17416448#comment-17416448
 ] 

Jonathan Valliere commented on DIRMINA-1149:
--------------------------------------------

Here is the example 

{code:java}
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class Main
{
        public static void main(String[] args)
        {
                final Queue<Number> result = new ConcurrentLinkedDeque<>();
                final ExecutorService ex = new ThreadPoolExecutor(50, 50, 5, 
TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
                final Queue<Runnable> tasks = new ArrayDeque<>();

                for (int i = 0; i < 50; i++)
                {
                        tasks.add(new Task(result, i));
                }

                while (tasks.isEmpty() == false)
                {
                        ex.execute(tasks.poll());
                }

                while (result.size() != 50)
                {
                        try
                        {
                                Thread.sleep(5);
                        }
                        catch (InterruptedException e)
                        {

                        }
                }

                System.out.println(result.toString());
        }

        static class Task implements Runnable
        {
                final Queue<Number>     t;
                final Number            x;

                public Task(Queue<Number> t, int num)
                {
                        this.x = num;
                        this.t = t;
                }

                @Override
                public void run()
                {
                        this.t.add(this.x);
                }
        }
}

{code}

The output was


{code:java}
[0, 3, 2, 1, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 
22, 23, 24, 25, 26, 27, 29, 28, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 
42, 43, 44, 45, 46, 47, 48, 49]
{code}



> IoSession.write under multi-thread enviroment, lose message order
> -----------------------------------------------------------------
>
>                 Key: DIRMINA-1149
>                 URL: https://issues.apache.org/jira/browse/DIRMINA-1149
>             Project: MINA
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 2.0.21
>         Environment: Java 1.8
> Windows 10
>            Reporter: Zhang Hua
>            Priority: Minor
>         Attachments: minatest.zip
>
>
> I am writing a stress-test that tests multi-thread safetyness of 
> IoSession.write method, and find lose message order. 
> My test method is as follows
> 1. The client test code starts 50 threads, sharing the same IoSession object
> 2. Each test thread simulates a user and sends data in sequence
> I believe that the IoFilter I use meets the thread safety conditions
> The result I expect is that the server receives the data of each user in an 
> orderly manner, but not
> Synchronizing on the session.write makes the problem go away;
> Do I really have to synchronize on the session to solve this issue?
>  
> ClientDemo.java
> {code:java}
> public class ClientDemo {    
> public static void main(String[] args) throws Exception {
>         NioSocketConnector connector = new NioSocketConnector();
>         DefaultIoFilterChainBuilder chain = connector.getFilterChain();
>         chain.addLast("mdc", new MdcInjectionFilter());
>         chain.addLast("codec", new ProtocolCodecFilter(new 
> MessagePackCodecFactory()));
>         TcpRPCHandler responseHandler = new TcpRPCHandler();
>         connector.setHandler(responseHandler);
>         connector.setConnectTimeoutCheckInterval(30);
>         ConnectFuture cf = connector.connect(new 
> InetSocketAddress("127.0.0.1", 9999));
>         IoSession session = cf.awaitUninterruptibly().getSession();        
> ExecutorService executor = Executors.newFixedThreadPool(50);
>         for (int i = 0; i < 50; ++i) {
>             executor.execute(new SenderWorker(i, session));
>         }
>         while (true) {
>             Thread.sleep(5000);
>             System.out.println("client alive......");
>             //            responseHandler.printProgress();
>         }    }
> }
> class SenderWorker implements Runnable {
>     private int userId;
>     private IoSession session;    public SenderWorker(int userId, IoSession 
> session) {
>         this.userId = userId;
>         this.session = session;
>     }    @Override
>     public void run() {
>         for (int i = 0; i < 100; ++i) {
>             MessageData data = new MessageData(userId, i);
>             /*synchronized (session)*/ {
>                 session.write(data);
>             }
>             if (i % 5 == 0) {
>                 try {
>                     Thread.sleep(10);
>                 } catch (Exception e) {
>                 }
>             }
>         }
>     }
> }
> {code}
> See the attachment for the complete code, I use maven to manage the project



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to