GitHub user Breathtak edited a discussion: (救救孩子吧)源码问题关于rocketmq-example目录下的rpc测试
位置:在rocketmq-example的rpc目录下的测试demo (version:5.3.2) 当我修改了运行 改测试的时候,测试结果抛出了 异常,内容是消息发送成功但是等待3000ms没有收到回复消息,内容如下 > org.apache.rocketmq.client.exception.RequestTimeoutException: CODE: 10006 > DESC: send request message to <RequestTopic> OK, but wait reply message > timeout, 3000 ms. > at > org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.waitResponse(DefaultMQProducerImpl.java:1762) > at > org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.request(DefaultMQProducerImpl.java:1633) > at > org.apache.rocketmq.client.producer.DefaultMQProducer.request(DefaultMQProducer.java:810) > at > org.apache.rocketmq.example.rpc.RequestProducer.main(RequestProducer.java:47) 于是我尝试修改TTL的时间为30s,重新测试但是等待的时间并没有30s又抛出了该异常(在这个 过程中我填加了一些日志) > request create RequestResponseFuture and correlationId : > 104fa2e7-468b-4eb8-a932-a7214f2c1c08 > request remove RequestResponseFuture > org.apache.rocketmq.client.exception.RequestTimeoutException: CODE: 10006 > DESC: send request message to <RequestTopic> OK, but wait reply message > timeout, 3000 ms. > at > org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.waitResponse(DefaultMQProducerImpl.java:1762) > at > org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.request(DefaultMQProducerImpl.java:1633) > at > org.apache.rocketmq.client.producer.DefaultMQProducer.request(DefaultMQProducer.java:810) > at > org.apache.rocketmq.example.rpc.RequestProducer.main(RequestProducer.java:47) > processReplyMessage set Reply correlationId: > 104fa2e7-468b-4eb8-a932-a7214f2c1c08 根据上面的日志我发现移除RequestResponseFuture的动作在processReplyMessage 之前,然后 我找到了如下代码,acquireCountDownLatch方法为取消阻塞等待。(这就导致了方法waitResponse 取消了阻塞继续执行 直到最后移除RequestResponseFuture,但是processReplyMessage 还没有 执行完成) > @Override > public void onSuccess(SendResult sendResult) { > requestResponseFuture.setSendRequestOk(true); > requestResponseFuture.acquireCountDownLatch(); > } 于是我将 requestResponseFuture.acquireCountDownLatch();这行代码注释掉,让他根据TTL的时间来自行取消阻塞,效果 为processReplyMessage 方法在waitResponse 前执行了,运行结果如下: > request create RequestResponseFuture and correlationId : > a97c46d2-dabc-4345-afa9-5af1f5f46ed3 > processReplyMessage set Reply correlationId: > a97c46d2-dabc-4345-afa9-5af1f5f46ed3 > request remove RequestResponseFuture > request message: reply message contents. > request to <RequestTopic> cost: 1017 replyMessage: MessageExt > [brokerName=null, queueId=0, storeSize=0, queueOffset=0, sysFlag=0, > bornTimestamp=1754470419554, bornHost=/221.6.33.116:43645, > storeTimestamp=1754470421504, storeHost=/129.211.50.247:10911, msgId=null, > commitLogOffset=0, bodyCRC=0, reconsumeTimes=0, preparedTransactionOffset=0, > toString()=Message{topic='DefaultCluster_REPLY_TOPIC', flag=0, > properties={ARRIVE_TIME=1754470419570, PUSH_REPLY_TIME=1754470421504, > MSG_REGION=DefaultRegion, UNIQ_KEY=C0A81E111A8063947C6B1D5FD4620007, > CORRELATION_ID=a97c46d2-dabc-4345-afa9-5af1f5f46ed3, MSG_TYPE=reply, > TTL=3000, REPLY_TO_CLIENT=192.168.30.17@24756#779130303594900, > TRACE_ON=true}, body=[114, 101, 112, 108, 121, 32, 109, 101, 115, 115, 97, > 103, 101, 32, 99, 111, 110, 116, 101, 110, 116, 115, 46], > transactionId='null'}] 问题:requestResponseFuture.acquireCountDownLatch(); 这块是否存在着问题 GitHub link: https://github.com/apache/rocketmq/discussions/9600 ---- This is an automatically sent email for dev@rocketmq.apache.org. To unsubscribe, please send an email to: dev-unsubscr...@rocketmq.apache.org