This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d206590692 [ISSUE #6979] Fix opaque will be duplicate in multi client
scene (#6985)
d206590692 is described below
commit d206590692bfdffca6bc58327e9533bc4bb68122
Author: Lei Zhiyuan <[email protected]>
AuthorDate: Thu Jul 13 11:17:38 2023 +0800
[ISSUE #6979] Fix opaque will be duplicate in multi client scene (#6985)
---
.../proxy/processor/DefaultMessagingProcessor.java | 14 ++++++++++++--
1 file changed, 12 insertions(+), 2 deletions(-)
diff --git
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
index 1b3f0af4ea..188cb7b9bd 100644
---
a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
+++
b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/DefaultMessagingProcessor.java
@@ -235,13 +235,23 @@ public class DefaultMessagingProcessor extends
AbstractStartAndShutdown implemen
@Override
public CompletableFuture<RemotingCommand> request(ProxyContext ctx, String
brokerName, RemotingCommand request,
long timeoutMillis) {
- return this.requestBrokerProcessor.request(ctx, brokerName, request,
timeoutMillis);
+ int originalRequestOpaque = request.getOpaque();
+ request.setOpaque(RemotingCommand.createNewRequestId());
+ return this.requestBrokerProcessor.request(ctx, brokerName, request,
timeoutMillis).thenApply(r -> {
+ request.setOpaque(originalRequestOpaque);
+ return r;
+ });
}
@Override
public CompletableFuture<Void> requestOneway(ProxyContext ctx, String
brokerName, RemotingCommand request,
long timeoutMillis) {
- return this.requestBrokerProcessor.requestOneway(ctx, brokerName,
request, timeoutMillis);
+ int originalRequestOpaque = request.getOpaque();
+ request.setOpaque(RemotingCommand.createNewRequestId());
+ return this.requestBrokerProcessor.requestOneway(ctx, brokerName,
request, timeoutMillis).thenApply(r -> {
+ request.setOpaque(originalRequestOpaque);
+ return r;
+ });
}
@Override