The 2.2.X branch is thread-safe.
CONFIDENTIALITY NOTICE: The contents of this email message and any attachments are intended solely for the addressee(s) and may contain confidential and/or privileged information and may be legally protected from disclosure. On Sat, Sep 18, 2021 at 12:15 AM Zhang Hua (Jira) <[email protected]> wrote: > > [ > https://issues.apache.org/jira/browse/DIRMINA-1149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17417012#comment-17417012 > ] > > Zhang Hua commented on DIRMINA-1149: > ------------------------------------ > > I tracked and debugged the source code, and found that the cause of the > problem may be the ProtoColCoderfilter.filterWriter method > {code:java} > try { > // Now we can try to encode the response > encoder.encode(session, message, encoderOut); // > Send it directly > Queue<Object> bufferQueue = ((AbstractProtocolEncoderOutput) > encoderOut).getMessageQueue(); // Write all the encoded messages > now > while (!bufferQueue.isEmpty()) { > Object encodedMessage = bufferQueue.poll(); > if (encodedMessage == null) { > break; > } // Flush only when the buffer has > remaining. > if (!(encodedMessage instanceof IoBuffer) || ((IoBuffer) > encodedMessage).hasRemaining()) { > SocketAddress destination = > writeRequest.getDestination(); > WriteRequest encodedWriteRequest = new > EncodedWriteRequest(encodedMessage, null, destination); > nextFilter.filterWrite(session, encodedWriteRequest); > } > } // Call the next filter > nextFilter.filterWrite(session, new > MessageWriteRequest(writeRequest)); > } catch (Exception e) { > > {code} > In a multi-threaded environment, > > step1: Thread A user=1, index=1, call encoder.encode(session, message, > encoderOut); > > step2: Thread B executes Object encodedMessage = bufferQueue.poll(); to > get (user=1, index=1) > > step3: Thread A bufferQueue.isEmpty()=true, next index > > step4: Thread A user=1, index=2, call encoder.encode(session, message, > encoderOut); > > step5: Thread A executes Object encodedMessage = bufferQueue.poll(); to > get (user=1, index=2) > > step6: Thread A executes nextFilter.filterWrite(session, > encodedWriteRequest);// fire data (user=1,index=2) > > step7: Thread B executes nextFilter.filterWrite(session, > encodedWriteRequest);// fire data (user=1,index=1) > > > > So the root cause is that ProtolColCoderFilter is not thread-safe > I wrote a simple SyncProtolcolCoderFilter to avoid this problem > > > {code:java} > public class SyncProtocolCodecFilter extends ProtocolCodecFilter { > public SyncProtocolCodecFilter(ProtocolCodecFactory factory) { > super(factory); > } @Override > public void filterWrite(NextFilter nextFilter, IoSession session, > WriteRequest writeRequest) throws Exception { > synchronized (session) { > super.filterWrite(nextFilter, session, writeRequest); > } > } > } > > {code} > > > IoSession.write under multi-thread enviroment, lose message order > > ----------------------------------------------------------------- > > > > Key: DIRMINA-1149 > > URL: https://issues.apache.org/jira/browse/DIRMINA-1149 > > Project: MINA > > Issue Type: Bug > > Components: Core > > Affects Versions: 2.0.21 > > Environment: Java 1.8 > > Windows 10 > > Reporter: Zhang Hua > > Priority: Minor > > Attachments: minatest.zip > > > > > > I am writing a stress-test that tests multi-thread safetyness of > IoSession.write method, and find lose message order. > > My test method is as follows > > 1. The client test code starts 50 threads, sharing the same IoSession > object > > 2. Each test thread simulates a user and sends data in sequence > > I believe that the IoFilter I use meets the thread safety conditions > > The result I expect is that the server receives the data of each user in > an orderly manner, but not > > Synchronizing on the session.write makes the problem go away; > > Do I really have to synchronize on the session to solve this issue? > > > > ClientDemo.java > > {code:java} > > public class ClientDemo { > > public static void main(String[] args) throws Exception { > > NioSocketConnector connector = new NioSocketConnector(); > > DefaultIoFilterChainBuilder chain = connector.getFilterChain(); > > chain.addLast("mdc", new MdcInjectionFilter()); > > chain.addLast("codec", new ProtocolCodecFilter(new > MessagePackCodecFactory())); > > TcpRPCHandler responseHandler = new TcpRPCHandler(); > > connector.setHandler(responseHandler); > > connector.setConnectTimeoutCheckInterval(30); > > ConnectFuture cf = connector.connect(new > InetSocketAddress("127.0.0.1", 9999)); > > IoSession session = cf.awaitUninterruptibly().getSession(); > ExecutorService executor = Executors.newFixedThreadPool(50); > > for (int i = 0; i < 50; ++i) { > > executor.execute(new SenderWorker(i, session)); > > } > > while (true) { > > Thread.sleep(5000); > > System.out.println("client alive......"); > > // responseHandler.printProgress(); > > } } > > } > > class SenderWorker implements Runnable { > > private int userId; > > private IoSession session; public SenderWorker(int userId, > IoSession session) { > > this.userId = userId; > > this.session = session; > > } @Override > > public void run() { > > for (int i = 0; i < 100; ++i) { > > MessageData data = new MessageData(userId, i); > > /*synchronized (session)*/ { > > session.write(data); > > } > > if (i % 5 == 0) { > > try { > > Thread.sleep(10); > > } catch (Exception e) { > > } > > } > > } > > } > > } > > {code} > > See the attachment for the complete code, I use maven to manage the > project > > > > -- > This message was sent by Atlassian Jira > (v8.3.4#803005) > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [email protected] > For additional commands, e-mail: [email protected] > >
