The 2.2.X branch is thread-safe.

CONFIDENTIALITY NOTICE: The contents of this email message and any
attachments are intended solely for the addressee(s) and may contain
confidential and/or privileged information and may be legally protected
from disclosure.


On Sat, Sep 18, 2021 at 12:15 AM Zhang Hua (Jira) <[email protected]> wrote:

>
>     [
> https://issues.apache.org/jira/browse/DIRMINA-1149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417012#comment-17417012
> ]
>
> Zhang Hua commented on DIRMINA-1149:
> ------------------------------------
>
> I tracked and debugged the source code, and found that the cause of the
> problem may be the ProtoColCoderfilter.filterWriter method
> {code:java}
>         try {
>             // Now we can try to encode the response
>             encoder.encode(session, message, encoderOut);            //
> Send it directly
>             Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput)
> encoderOut).getMessageQueue();            // Write all the encoded messages
> now
>             while (!bufferQueue.isEmpty()) {
>                 Object encodedMessage = bufferQueue.poll();
> if (encodedMessage == null) {
>                     break;
>                 }                // Flush only when the buffer has
> remaining.
>                 if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer)
> encodedMessage).hasRemaining()) {
>                     SocketAddress destination =
> writeRequest.getDestination();
>                     WriteRequest encodedWriteRequest = new
> EncodedWriteRequest(encodedMessage, null, destination);
> nextFilter.filterWrite(session, encodedWriteRequest);
>                 }
>             }            // Call the next filter
>             nextFilter.filterWrite(session, new
> MessageWriteRequest(writeRequest));
>         } catch (Exception e) {
>
> {code}
> In a multi-threaded environment,
>
> step1: Thread A user=1, index=1, call encoder.encode(session, message,
> encoderOut);
>
> step2: Thread B executes Object encodedMessage = bufferQueue.poll(); to
> get (user=1, index=1)
>
> step3: Thread A bufferQueue.isEmpty()=true, next index
>
> step4: Thread A user=1, index=2, call encoder.encode(session, message,
> encoderOut);
>
> step5: Thread A executes Object encodedMessage = bufferQueue.poll(); to
> get (user=1, index=2)
>
> step6: Thread A executes nextFilter.filterWrite(session,
> encodedWriteRequest);// fire data (user=1,index=2)
>
> step7: Thread B executes nextFilter.filterWrite(session,
> encodedWriteRequest);// fire data (user=1,index=1)
>
>
>
> So the root cause is that ProtolColCoderFilter is not thread-safe
> I wrote a simple SyncProtolcolCoderFilter to avoid this problem
>
>
> {code:java}
> public class SyncProtocolCodecFilter extends ProtocolCodecFilter {
>     public SyncProtocolCodecFilter(ProtocolCodecFactory factory) {
>         super(factory);
>     }    @Override
>     public void filterWrite(NextFilter nextFilter, IoSession session,
> WriteRequest writeRequest) throws Exception {
>         synchronized (session) {
>             super.filterWrite(nextFilter, session, writeRequest);
>         }
>     }
> }
>
> {code}
>
> > IoSession.write under multi-thread enviroment, lose message order
> > -----------------------------------------------------------------
> >
> >                 Key: DIRMINA-1149
> >                 URL: https://issues.apache.org/jira/browse/DIRMINA-1149
> >             Project: MINA
> >          Issue Type: Bug
> >          Components: Core
> >    Affects Versions: 2.0.21
> >         Environment: Java 1.8
> > Windows 10
> >            Reporter: Zhang Hua
> >            Priority: Minor
> >         Attachments: minatest.zip
> >
> >
> > I am writing a stress-test that tests multi-thread safetyness of
> IoSession.write method, and find lose message order.
> > My test method is as follows
> > 1. The client test code starts 50 threads, sharing the same IoSession
> object
> > 2. Each test thread simulates a user and sends data in sequence
> > I believe that the IoFilter I use meets the thread safety conditions
> > The result I expect is that the server receives the data of each user in
> an orderly manner, but not
> > Synchronizing on the session.write makes the problem go away;
> > Do I really have to synchronize on the session to solve this issue?
> >
> > ClientDemo.java
> > {code:java}
> > public class ClientDemo {
> > public static void main(String[] args) throws Exception {
> >         NioSocketConnector connector = new NioSocketConnector();
> >         DefaultIoFilterChainBuilder chain = connector.getFilterChain();
> >         chain.addLast("mdc", new MdcInjectionFilter());
> >         chain.addLast("codec", new ProtocolCodecFilter(new
> MessagePackCodecFactory()));
> >         TcpRPCHandler responseHandler = new TcpRPCHandler();
> >         connector.setHandler(responseHandler);
> >         connector.setConnectTimeoutCheckInterval(30);
> >         ConnectFuture cf = connector.connect(new
> InetSocketAddress("127.0.0.1", 9999));
> >         IoSession session = cf.awaitUninterruptibly().getSession();
>   ExecutorService executor = Executors.newFixedThreadPool(50);
> >         for (int i = 0; i < 50; ++i) {
> >             executor.execute(new SenderWorker(i, session));
> >         }
> >         while (true) {
> >             Thread.sleep(5000);
> >             System.out.println("client alive......");
> >             //            responseHandler.printProgress();
> >         }    }
> > }
> > class SenderWorker implements Runnable {
> >     private int userId;
> >     private IoSession session;    public SenderWorker(int userId,
> IoSession session) {
> >         this.userId = userId;
> >         this.session = session;
> >     }    @Override
> >     public void run() {
> >         for (int i = 0; i < 100; ++i) {
> >             MessageData data = new MessageData(userId, i);
> >             /*synchronized (session)*/ {
> >                 session.write(data);
> >             }
> >             if (i % 5 == 0) {
> >                 try {
> >                     Thread.sleep(10);
> >                 } catch (Exception e) {
> >                 }
> >             }
> >         }
> >     }
> > }
> > {code}
> > See the attachment for the complete code, I use maven to manage the
> project
>
>
>
> --
> This message was sent by Atlassian Jira
> (v8.3.4#803005)
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>

Reply via email to