xxd763795151 opened a new issue #2516:
URL: https://github.com/apache/rocketmq/issues/2516


   **BUG REPORT**
   
   1. Please describe the issue you observed:
   
   When the request of send message resides in the sendThreadPoolQueue too 
long,  the broker may occur error"[TIMEOUT_CLEAN_QUEUE]broker busy, start flow 
control for a while, code as follow":
   ```
                       final long behind = System.currentTimeMillis() - 
rt.getCreateTimestamp();
                       if (behind >= maxWaitTimeMillsInQueue) {
                           if (blockingQueue.remove(runnable)) {
                               rt.setStopRun(true);
                               
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY, 
String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a 
while, period in queue: %sms, size of queue: %d", behind, 
blockingQueue.size()));
                           }
                       }
   ```
   the default value of maxWaitTimeMillsInQueue is 200ms.
   We have set it`s value to 1000ms on the production environment, but, this 
quesiton still happens occasionally.
   We use rocketmq-exporter+prometheus+grafana monitoring the value of 
sendThreadPoolQueueHeadWaitTimeMills, however the value always is 
0(Occasionally a very high value appears).It is not science!
   
   When I debug the broker`s source code, I found that there are two types of 
data in the sendThreadPoolQueue.
   > java.util.concurrent.CompletableFuture$UniAccept
   > org.apache.rocketmq.broker.latency.FutureTaskExt
   
   If the header element of sendThreadPoolQueue is  
org.apache.rocketmq.broker.latency.FutureTaskExt, will computer the value of 
sendThreadPoolQueueHeadWaitTimeMills. Otherwise, it return 0. Look at the 
source code below:
   ```
   // BrokerController.java
       public long headSlowTimeMills(BlockingQueue<Runnable> q) {
           long slowTimeMills = 0;
           final Runnable peek = q.peek();
           if (peek != null) {
               RequestTask rt = BrokerFastFailure.castRunnable(peek);
               slowTimeMills = rt == null ? 0 : this.messageStore.now() - 
rt.getCreateTimestamp();
           }
   
           if (slowTimeMills < 0) {
               slowTimeMills = 0;
           }
   
           return slowTimeMills;
       }
   ```
   Look at this line of  code : BrokerFastFailure.castRunnable(peek);
   ```
       public static RequestTask castRunnable(final Runnable runnable) {
           try {
               if (runnable instanceof FutureTaskExt) {
                   FutureTaskExt object = (FutureTaskExt) runnable;
                   return (RequestTask) object.getRunnable();
               }
           } catch (Throwable e) {
               log.error(String.format("castRunnable exception, %s", 
runnable.getClass().getName()), e);
           }
   
           return null;
       }
   
   ```
   
   The data of  java.util.concurrent.CompletableFuture$UniAccept comes 
from(SendMessageProcessor.java):
   ```   
    @Override
       public void asyncProcessRequest(ChannelHandlerContext ctx, 
RemotingCommand request, RemotingResponseCallback responseCallback) throws 
Exception {
           asyncProcessRequest(ctx, 
request).thenAcceptAsync(responseCallback::callback, 
this.brokerController.getSendMessageExecutor());
       }
   ```
   They share the sendThreadPoolQueue.  And the header element of 
sendThreadPoolQueue is java.util.concurrent.CompletableFuture$UniAccept  most 
of the time.
   
   
   2. Please tell us about your environment:
   
   linux
   mac os
   rocketmq 4.7.1 release
   
   3. Other information (e.g. detailed explanation, logs, related issues, 
suggestions how to fix, etc):
   
   How I found these info in the sendThreadPoolQueue.
   Print it. Such as the code below:
   ```
   // BrokerController#headSlowTimeMills(BlockingQueue<Runnable> q)
   ........
           if (q == this.sendThreadPoolQueue) {
               System.out.println("send queue foreach size: " + q.size());
               q.stream().forEach(r -> {
                   long tmpSlowTime = 0l;
                   RequestTask rt = BrokerFastFailure.castRunnable(r);
                   System.out.println(r.getClass());
                   tmpSlowTime = rt == null ? -1 : this.messageStore.now() - 
rt.getCreateTimestamp();
                   System.out.println(tmpSlowTime);
               });
               //System.out.println("Send queue slow time mills: " + 
slowTimeMills);
           }
   .......
   ```
   this is print info:
   > send queue foreach size: 4
   > class java.util.concurrent.CompletableFuture$UniAccept
   > -1
   > class org.apache.rocketmq.broker.latency.FutureTaskExt
   > 387
   > class java.util.concurrent.CompletableFuture$UniAccept
   > -1
   > class org.apache.rocketmq.broker.latency.FutureTaskExt
   > 80
   
   And print the stack trace:
   ```
           this.sendThreadPoolQueue = new 
LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity())
 {
               @Override
               public void put(Runnable runnable) throws InterruptedException {
                   System.out.println("queue put: " + runnable.getClass());
                   super.put(runnable);
               }
   
               @Override
               public boolean offer(Runnable runnable) {
                   System.out.println("queue offer: " + runnable.getClass() + 
", current thread: " + Thread.currentThread().getName() + ", thread id: " + 
Thread.currentThread().getId());
                   Throwable throwable = new Throwable();
                   StackTraceElement[] stackTraceElements = 
throwable.getStackTrace();
                   if (stackTraceElements != null) {
                       
Arrays.stream(stackTraceElements).forEach(stackTraceElement -> {
                           System.out.println(stackTraceElement.getClassName() 
+ "#"
                                   + stackTraceElement.getMethodName() + "#" + 
stackTraceElement.getLineNumber());
                       });
                   }
                   
System.out.println("---------------------------end------------------------------");
                   return super.offer(runnable);
               }
   
               @Override
               public boolean offer(Runnable runnable, long timeout, TimeUnit 
unit) throws InterruptedException {
                   System.out.println("queue timeoutoffer: " + 
runnable.getClass());
                   return super.offer(runnable, timeout, unit);
               }
           };
   ```
   info as fllow:
   > queue offer: class java.util.concurrent.CompletableFuture$UniAccept, 
current thread: SendMessageThread_1, thread id: 81
   > org.apache.rocketmq.broker.BrokerController$1#offer#205
   > org.apache.rocketmq.broker.BrokerController$1#offer#195
   > java.util.concurrent.ThreadPoolExecutor#execute#1371
   > java.util.concurrent.CompletableFuture$UniCompletion#claim#543
   > java.util.concurrent.CompletableFuture#uniAccept#667
   > java.util.concurrent.CompletableFuture$UniAccept#tryFire$$$capture#646
   > java.util.concurrent.CompletableFuture$UniAccept#tryFire#-1
   > java.util.concurrent.CompletableFuture#uniAcceptStage#686
   > java.util.concurrent.CompletableFuture#thenAcceptAsync#2019
   > 
org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncProcessRequest#82
   > org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$1#run#226
   > org.apache.rocketmq.remoting.netty.RequestTask#run#80
   > java.util.concurrent.Executors$RunnableAdapter#call#511
   > java.util.concurrent.FutureTask#run$$$capture#266
   > java.util.concurrent.FutureTask#run#-1
   > java.util.concurrent.ThreadPoolExecutor#runWorker#1149
   > java.util.concurrent.ThreadPoolExecutor$Worker#run#624
   > java.lang.Thread#run#748
   
   > 
   > queue offer: class org.apache.rocketmq.broker.latency.FutureTaskExt, 
current thread: NettyServerCodecThread_5, thread id: 56
   > org.apache.rocketmq.broker.BrokerController$1#offer#205
   > org.apache.rocketmq.broker.BrokerController$1#offer#195
   > java.util.concurrent.ThreadPoolExecutor#execute#1371
   > java.util.concurrent.AbstractExecutorService#submit#112
   > org.apache.rocketmq.broker.BrokerController$2#submit#304
   > 
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand#256
   > 
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived#158
   > 
org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler#channelRead0#420
   > 
org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler#channelRead0#415


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to