GitHub user Breathtak created a discussion: Some Quest for RPC Demo Test
Location: rpc folder under the rocketmq-example directory When I execute the RequestProducer main method, this method throws an exception,exception as follows: > org.apache.rocketmq.client.exception.RequestTimeoutException: CODE: 10006 > DESC: send request message to <RequestTopic> OK, but wait reply message > timeout, 3000 ms. > at > org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.waitResponse(DefaultMQProducerImpl.java:1762) > at > org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.request(DefaultMQProducerImpl.java:1633) > at > org.apache.rocketmq.client.producer.DefaultMQProducer.request(DefaultMQProducer.java:810) > at > org.apache.rocketmq.example.rpc.RequestProducer.main(RequestProducer.java:47) Then I modified the TTL, but the effect was the same,So I added a few lines of logs as follows: > request create RequestResponseFuture and correlationId : > 104fa2e7-468b-4eb8-a932-a7214f2c1c08 > request remove RequestResponseFuture > org.apache.rocketmq.client.exception.RequestTimeoutException: CODE: 10006 > DESC: send request message to <RequestTopic> OK, but wait reply message > timeout, 3000 ms. > at > org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.waitResponse(DefaultMQProducerImpl.java:1762) > at > org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl.request(DefaultMQProducerImpl.java:1633) > at > org.apache.rocketmq.client.producer.DefaultMQProducer.request(DefaultMQProducer.java:810) > at > org.apache.rocketmq.example.rpc.RequestProducer.main(RequestProducer.java:47) > processReplyMessage set Reply correlationId: > 104fa2e7-468b-4eb8-a932-a7214f2c1c08 Found that the problem is “requestResponseFuture acquireCountDownLatch ()” This leads to the cancellation of the blocking wait,When processReplyMessage is executed, RequestResponseFuture has already been removed > @Override > public void onSuccess(SendResult sendResult) { > requestResponseFuture.setSendRequestOk(true); > requestResponseFuture.acquireCountDownLatch(); > } So I will requestResponseFuture acquireCountDownLatch () to delete, according to the TTL datagrams to wait, return the result as follows: > request create RequestResponseFuture and correlationId : > a97c46d2-dabc-4345-afa9-5af1f5f46ed3 > processReplyMessage set Reply correlationId: > a97c46d2-dabc-4345-afa9-5af1f5f46ed3 > request remove RequestResponseFuture > request message: reply message contents. > request to <RequestTopic> cost: 1017 replyMessage: MessageExt > [brokerName=null, queueId=0, storeSize=0, queueOffset=0, sysFlag=0, > bornTimestamp=1754470419554, bornHost=/221.6.33.116:43645, > storeTimestamp=1754470421504, storeHost=/129.211.50.247:10911, msgId=null, > commitLogOffset=0, bodyCRC=0, reconsumeTimes=0, preparedTransactionOffset=0, > toString()=Message{topic='DefaultCluster_REPLY_TOPIC', flag=0, > properties={ARRIVE_TIME=1754470419570, PUSH_REPLY_TIME=1754470421504, > MSG_REGION=DefaultRegion, UNIQ_KEY=C0A81E111A8063947C6B1D5FD4620007, > CORRELATION_ID=a97c46d2-dabc-4345-afa9-5af1f5f46ed3, MSG_TYPE=reply, > TTL=3000, REPLY_TO_CLIENT=192.168.30.17@24756#779130303594900, > TRACE_ON=true}, body=[114, 101, 112, 108, 121, 32, 109, 101, 115, 115, 97, > 103, 101, 32, 99, 111, 110, 116, 101, 110, 116, 115, 46], > transactionId='null'}] Question:requestResponseFuture.acquireCountDownLatch() Is there any problem with the execution GitHub link: https://github.com/apache/rocketmq/discussions/9600 ---- This is an automatically sent email for dev@rocketmq.apache.org. To unsubscribe, please send an email to: dev-unsubscr...@rocketmq.apache.org