areyouok commented on pull request #3509:
URL: https://github.com/apache/rocketmq/pull/3509#issuecomment-991437424


   This commit cause critical problem. we should rollback it. And I sugguest 
review PR #3540 again.
   
   @RongtongJin @XiaoyiPeng @duhengforever 
   
   The broker hangs in our test:
   ```
   "BrokerControllerScheduledThread1" #30 prio=5 os_prio=31 
tid=0x00007fabdb29d800 nid=0x6a03 runnable [0x000070000ae7c000]
      java.lang.Thread.State: RUNNABLE
        at 
java.util.concurrent.LinkedBlockingQueue$LBQSpliterator.tryAdvance(LinkedBlockingQueue.java:950)
        at 
java.util.stream.ReferencePipeline.forEachWithCancel(ReferencePipeline.java:126)
        at 
java.util.stream.AbstractPipeline.copyIntoWithCancel(AbstractPipeline.java:499)
        at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:486)
        at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
        at java.util.stream.FindOps$FindOp.evaluateSequential(FindOps.java:152)
        at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
        at 
java.util.stream.ReferencePipeline.findFirst(ReferencePipeline.java:464)
        at 
org.apache.rocketmq.broker.BrokerController.headSlowTimeMills(BrokerController.java:658)
        at 
org.apache.rocketmq.broker.BrokerController.headSlowTimeMills4SendThreadPoolQueue(BrokerController.java:672)
        at 
org.apache.rocketmq.broker.BrokerController.printWaterMark(BrokerController.java:688)
        at 
org.apache.rocketmq.broker.BrokerController$5.run(BrokerController.java:386)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   
      Locked ownable synchronizers:
        - <0x000000078014eeb8> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)
        - <0x000000078014ef00> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)
        - <0x0000000780d2ba58> (a 
java.util.concurrent.ThreadPoolExecutor$Worker)
   
   "NettyServerCodecThread_8" #81 prio=5 os_prio=31 tid=0x00007fabdb294000 
nid=0x13d03 waiting on condition [0x000070000e215000]
      java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000078014ef00> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
        at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
        at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
        at 
java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:418)
        at 
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
        at 
java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
        at 
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(NettyRemotingAbstract.java:256)
        at 
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processMessageReceived(NettyRemotingAbstract.java:158)
        at 
org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:420)
        at 
org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:415)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
        at 
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:411)
        at 
org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:352)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
        at 
io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
        at 
io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.lang.Thread.run(Thread.java:748)
   
      Locked ownable synchronizers:
        - None
   
   "NettyServerCodecThread_6" #79 prio=5 os_prio=31 tid=0x00007fabe209d800 
nid=0xc503 waiting on condition [0x000070000e00f000]
      java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x000000078014ef00> (a 
java.util.concurrent.locks.ReentrantLock$NonfairSync)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:836)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:870)
        at 
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1199)
        at 
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:209)
        at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:285)
        at 
java.util.concurrent.LinkedBlockingQueue.offer(LinkedBlockingQueue.java:418)
        at 
java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
        at 
java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
        at 
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processRequestCommand(NettyRemotingAbstract.java:256)
        at 
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.processMessageReceived(NettyRemotingAbstract.java:158)
        at 
org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:420)
        at 
org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler.channelRead0(NettyRemotingServer.java:415)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at 
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:311)
        at 
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:432)
        at 
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at 
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at 
org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:411)
        at 
org.apache.rocketmq.remoting.netty.NettyRemotingServer$HandshakeHandler.channelRead0(NettyRemotingServer.java:352)
        at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
        at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at 
io.netty.channel.AbstractChannelHandlerContext.access$600(AbstractChannelHandlerContext.java:61)
        at 
io.netty.channel.AbstractChannelHandlerContext$7.run(AbstractChannelHandlerContext.java:370)
        at 
io.netty.util.concurrent.DefaultEventExecutor.run(DefaultEventExecutor.java:66)
        at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at java.lang.Thread.run(Thread.java:748)
   
      Locked ownable synchronizers:
        - None
   
   ...
   ```
   
   
   some simple code to re-produce it:
   ```
   public class TestQueue {
       public static void main(String[] args) throws Exception {
           LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<>(1000);
           for (int i = 0; i < 10; i++) {
               new Thread(() -> {
                   while (true) {
                       queue.offer(new Object());
                       queue.remove();
                   }
               }).start();
           }
           while (true) {
               System.out.println("begin scan, i still alive");
               queue.stream()
                       .filter(o -> o == null)
                       .findFirst()
                       .isPresent();
               Thread.sleep(100);
               System.out.println("finish scan, i still alive");
           }
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to