xxd763795151 opened a new issue #2516:
URL: https://github.com/apache/rocketmq/issues/2516
**BUG REPORT**
1. Please describe the issue you observed:
When the request of send message resides in the sendThreadPoolQueue too
long, the broker may occur error"[TIMEOUT_CLEAN_QUEUE]broker busy, start flow
control for a while, code as follow":
```
final long behind = System.currentTimeMillis() -
rt.getCreateTimestamp();
if (behind >= maxWaitTimeMillsInQueue) {
if (blockingQueue.remove(runnable)) {
rt.setStopRun(true);
rt.returnResponse(RemotingSysResponseCode.SYSTEM_BUSY,
String.format("[TIMEOUT_CLEAN_QUEUE]broker busy, start flow control for a
while, period in queue: %sms, size of queue: %d", behind,
blockingQueue.size()));
}
}
```
the default value of maxWaitTimeMillsInQueue is 200ms.
We have set it`s value to 1000ms on the production environment, but, this
quesiton still happens occasionally.
We use rocketmq-exporter+prometheus+grafana monitoring the value of
sendThreadPoolQueueHeadWaitTimeMills, however the value always is
0(Occasionally a very high value appears).It is not science!
When I debug the broker`s source code, I found that there are two types of
data in the sendThreadPoolQueue.
> java.util.concurrent.CompletableFuture$UniAccept
> org.apache.rocketmq.broker.latency.FutureTaskExt
If the header element of sendThreadPoolQueue is
org.apache.rocketmq.broker.latency.FutureTaskExt, will computer the value of
sendThreadPoolQueueHeadWaitTimeMills. Otherwise, it return 0. Look at the
source code below:
```
// BrokerController.java
public long headSlowTimeMills(BlockingQueue<Runnable> q) {
long slowTimeMills = 0;
final Runnable peek = q.peek();
if (peek != null) {
RequestTask rt = BrokerFastFailure.castRunnable(peek);
slowTimeMills = rt == null ? 0 : this.messageStore.now() -
rt.getCreateTimestamp();
}
if (slowTimeMills < 0) {
slowTimeMills = 0;
}
return slowTimeMills;
}
```
Look at this line of code : BrokerFastFailure.castRunnable(peek);
```
public static RequestTask castRunnable(final Runnable runnable) {
try {
if (runnable instanceof FutureTaskExt) {
FutureTaskExt object = (FutureTaskExt) runnable;
return (RequestTask) object.getRunnable();
}
} catch (Throwable e) {
log.error(String.format("castRunnable exception, %s",
runnable.getClass().getName()), e);
}
return null;
}
```
The data of java.util.concurrent.CompletableFuture$UniAccept comes
from(SendMessageProcessor.java):
```
@Override
public void asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request, RemotingResponseCallback responseCallback) throws
Exception {
asyncProcessRequest(ctx,
request).thenAcceptAsync(responseCallback::callback,
this.brokerController.getSendMessageExecutor());
}
```
They share the sendThreadPoolQueue. And the header element of
sendThreadPoolQueue is java.util.concurrent.CompletableFuture$UniAccept most
of the time.
2. Please tell us about your environment:
linux
mac os
rocketmq 4.7.1 release
3. Other information (e.g. detailed explanation, logs, related issues,
suggestions how to fix, etc):
How I found these info in the sendThreadPoolQueue.
Print it. Such as the code below:
```
// BrokerController#headSlowTimeMills(BlockingQueue<Runnable> q)
........
if (q == this.sendThreadPoolQueue) {
System.out.println("send queue foreach size: " + q.size());
q.stream().forEach(r -> {
long tmpSlowTime = 0l;
RequestTask rt = BrokerFastFailure.castRunnable(r);
System.out.println(r.getClass());
tmpSlowTime = rt == null ? -1 : this.messageStore.now() -
rt.getCreateTimestamp();
System.out.println(tmpSlowTime);
});
//System.out.println("Send queue slow time mills: " +
slowTimeMills);
}
.......
```
this is print info:
> send queue foreach size: 4
> class java.util.concurrent.CompletableFuture$UniAccept
> -1
> class org.apache.rocketmq.broker.latency.FutureTaskExt
> 387
> class java.util.concurrent.CompletableFuture$UniAccept
> -1
> class org.apache.rocketmq.broker.latency.FutureTaskExt
> 80
And print the stack trace:
```
this.sendThreadPoolQueue = new
LinkedBlockingQueue<Runnable>(this.brokerConfig.getSendThreadPoolQueueCapacity())
{
@Override
public void put(Runnable runnable) throws InterruptedException {
System.out.println("queue put: " + runnable.getClass());
super.put(runnable);
}
@Override
public boolean offer(Runnable runnable) {
System.out.println("queue offer: " + runnable.getClass() +
", current thread: " + Thread.currentThread().getName() + ", thread id: " +
Thread.currentThread().getId());
Throwable throwable = new Throwable();
StackTraceElement[] stackTraceElements =
throwable.getStackTrace();
if (stackTraceElements != null) {
Arrays.stream(stackTraceElements).forEach(stackTraceElement -> {
System.out.println(stackTraceElement.getClassName()
+ "#"
+ stackTraceElement.getMethodName() + "#" +
stackTraceElement.getLineNumber());
});
}
System.out.println("---------------------------end------------------------------");
return super.offer(runnable);
}
@Override
public boolean offer(Runnable runnable, long timeout, TimeUnit
unit) throws InterruptedException {
System.out.println("queue timeoutoffer: " +
runnable.getClass());
return super.offer(runnable, timeout, unit);
}
};
```
info as fllow:
> queue offer: class java.util.concurrent.CompletableFuture$UniAccept,
current thread: SendMessageThread_1, thread id: 81
> org.apache.rocketmq.broker.BrokerController$1#offer#205
> org.apache.rocketmq.broker.BrokerController$1#offer#195
> java.util.concurrent.ThreadPoolExecutor#execute#1371
> java.util.concurrent.CompletableFuture$UniCompletion#claim#543
> java.util.concurrent.CompletableFuture#uniAccept#667
> java.util.concurrent.CompletableFuture$UniAccept#tryFire$$$capture#646
> java.util.concurrent.CompletableFuture$UniAccept#tryFire#-1
> java.util.concurrent.CompletableFuture#uniAcceptStage#686
> java.util.concurrent.CompletableFuture#thenAcceptAsync#2019
>
org.apache.rocketmq.broker.processor.SendMessageProcessor#asyncProcessRequest#82
> org.apache.rocketmq.remoting.netty.NettyRemotingAbstract$1#run#226
> org.apache.rocketmq.remoting.netty.RequestTask#run#80
> java.util.concurrent.Executors$RunnableAdapter#call#511
> java.util.concurrent.FutureTask#run$$$capture#266
> java.util.concurrent.FutureTask#run#-1
> java.util.concurrent.ThreadPoolExecutor#runWorker#1149
> java.util.concurrent.ThreadPoolExecutor$Worker#run#624
> java.lang.Thread#run#748
>
> queue offer: class org.apache.rocketmq.broker.latency.FutureTaskExt,
current thread: NettyServerCodecThread_5, thread id: 56
> org.apache.rocketmq.broker.BrokerController$1#offer#205
> org.apache.rocketmq.broker.BrokerController$1#offer#195
> java.util.concurrent.ThreadPoolExecutor#execute#1371
> java.util.concurrent.AbstractExecutorService#submit#112
> org.apache.rocketmq.broker.BrokerController$2#submit#304
>
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processRequestCommand#256
>
org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#processMessageReceived#158
>
org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler#channelRead0#420
>
org.apache.rocketmq.remoting.netty.NettyRemotingServer$NettyServerHandler#channelRead0#415
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]