GitHub user Breathtak edited a discussion: 
(救救孩子吧)源码问题关于rocketmq-example目录下的rpc测试

位置:在rocketmq-example的rpc目录下的测试demo (version:5.3.2)
当我修改了运行 改测试的时候,测试结果抛出了 异常,内容是消息发送成功但是等待3000ms没有收到回复消息,内容如下

> org.apache.rocketmq.client.exception.RequestTimeoutException: CODE: 10006  
> DESC: send request message to <RequestTopic> OK, but wait reply message 
> timeout, 3000 ms.
>       at 
> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.waitResponse(DefaultMQProducerImpl.java:1762)
>       at 
> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.request(DefaultMQProducerImpl.java:1633)
>       at 
> org.apache.rocketmq.client.producer.DefaultMQProducer.request(DefaultMQProducer.java:810)
>       at 
> org.apache.rocketmq.example.rpc.RequestProducer.main(RequestProducer.java:47)

于是我尝试修改TTL的时间为30s,重新测试但是等待的时间并没有30s又抛出了该异常(在这个 过程中我填加了一些日志)

> request create RequestResponseFuture and  correlationId : 
> 104fa2e7-468b-4eb8-a932-a7214f2c1c08 
> request remove RequestResponseFuture 
> org.apache.rocketmq.client.exception.RequestTimeoutException: CODE: 10006  
> DESC: send request message to <RequestTopic> OK, but wait reply message 
> timeout, 3000 ms.
>       at 
> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.waitResponse(DefaultMQProducerImpl.java:1762)
>       at 
> org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.request(DefaultMQProducerImpl.java:1633)
>       at 
> org.apache.rocketmq.client.producer.DefaultMQProducer.request(DefaultMQProducer.java:810)
>       at 
> org.apache.rocketmq.example.rpc.RequestProducer.main(RequestProducer.java:47)
> processReplyMessage set Reply correlationId: 
> 104fa2e7-468b-4eb8-a932-a7214f2c1c08 


根据上面的日志我发现移除RequestResponseFuture的动作在processReplyMessage 之前,然后 
我找到了如下代码,acquireCountDownLatch方法为取消阻塞等待。(这就导致了方法waitResponse 取消了阻塞继续执行  
直到最后移除RequestResponseFuture,但是processReplyMessage 还没有 执行完成)

>                 @Override
>                 public void onSuccess(SendResult sendResult) {
>                     requestResponseFuture.setSendRequestOk(true);
>                     requestResponseFuture.acquireCountDownLatch();
>                 }


于是我将 requestResponseFuture.acquireCountDownLatch();这行代码注释掉,让他根据TTL的时间来自行取消阻塞,效果 
为processReplyMessage 方法在waitResponse 前执行了,运行结果如下:

> request create RequestResponseFuture and  correlationId : 
> a97c46d2-dabc-4345-afa9-5af1f5f46ed3 
> processReplyMessage set Reply correlationId: 
> a97c46d2-dabc-4345-afa9-5af1f5f46ed3 
> request remove RequestResponseFuture   
> request message: reply message contents. 
> request to <RequestTopic> cost: 1017 replyMessage: MessageExt 
> [brokerName=null, queueId=0, storeSize=0, queueOffset=0, sysFlag=0, 
> bornTimestamp=1754470419554, bornHost=/221.6.33.116:43645, 
> storeTimestamp=1754470421504, storeHost=/129.211.50.247:10911, msgId=null, 
> commitLogOffset=0, bodyCRC=0, reconsumeTimes=0, preparedTransactionOffset=0, 
> toString()=Message{topic='DefaultCluster_REPLY_TOPIC', flag=0, 
> properties={ARRIVE_TIME=1754470419570, PUSH_REPLY_TIME=1754470421504, 
> MSG_REGION=DefaultRegion, UNIQ_KEY=C0A81E111A8063947C6B1D5FD4620007, 
> CORRELATION_ID=a97c46d2-dabc-4345-afa9-5af1f5f46ed3, MSG_TYPE=reply, 
> TTL=3000, REPLY_TO_CLIENT=192.168.30.17@24756#779130303594900, 
> TRACE_ON=true}, body=[114, 101, 112, 108, 121, 32, 109, 101, 115, 115, 97, 
> 103, 101, 32, 99, 111, 110, 116, 101, 110, 116, 115, 46], 
> transactionId='null'}] 

问题:requestResponseFuture.acquireCountDownLatch(); 这块是否存在着问题




GitHub link: https://github.com/apache/rocketmq/discussions/9600

----
This is an automatically sent email for dev@rocketmq.apache.org.
To unsubscribe, please send an email to: dev-unsubscr...@rocketmq.apache.org

Reply via email to