This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new d206590692 [ISSUE #6979] Fix opaque will be duplicate in multi client 
scene (#6985)
d206590692 is described below

commit d206590692bfdffca6bc58327e9533bc4bb68122
Author: Lei Zhiyuan <[email protected]>
AuthorDate: Thu Jul 13 11:17:38 2023 +0800

    [ISSUE #6979] Fix opaque will be duplicate in multi client scene (#6985)
---
 .../proxy/processor/DefaultMessagingProcessor.java         | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
index 1b3f0af4ea..188cb7b9bd 100644
--- 
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+++ 
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
@@ -235,13 +235,23 @@ public class DefaultMessagingProcessor extends 
AbstractStartAndShutdown implemen
     @Override
     public CompletableFuture<RemotingCommand> request(ProxyContext ctx, String 
brokerName, RemotingCommand request,
         long timeoutMillis) {
-        return this.requestBrokerProcessor.request(ctx, brokerName, request, 
timeoutMillis);
+        int originalRequestOpaque = request.getOpaque();
+        request.setOpaque(RemotingCommand.createNewRequestId());
+        return this.requestBrokerProcessor.request(ctx, brokerName, request, 
timeoutMillis).thenApply(r -> {
+            request.setOpaque(originalRequestOpaque);
+            return r;
+        });
     }
 
     @Override
     public CompletableFuture<Void> requestOneway(ProxyContext ctx, String 
brokerName, RemotingCommand request,
         long timeoutMillis) {
-        return this.requestBrokerProcessor.requestOneway(ctx, brokerName, 
request, timeoutMillis);
+        int originalRequestOpaque = request.getOpaque();
+        request.setOpaque(RemotingCommand.createNewRequestId());
+        return this.requestBrokerProcessor.requestOneway(ctx, brokerName, 
request, timeoutMillis).thenApply(r -> {
+            request.setOpaque(originalRequestOpaque);
+            return r;
+        });
     }
 
     @Override

Reply via email to