This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7dc0e5aed8 [ISSUE #7833] Fix invokeImpl() in RemotingAbstract
7dc0e5aed8 is described below
commit 7dc0e5aed83f738f5554cb5f00e0da5dabc06e04
Author: guyinyou <[email protected]>
AuthorDate: Mon Feb 19 17:33:01 2024 +0800
[ISSUE #7833] Fix invokeImpl() in RemotingAbstract
---
.../org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java | 8 +-------
.../org/apache/rocketmq/remoting/netty/NettyRemotingClient.java | 7 +++++++
2 files changed, 8 insertions(+), 7 deletions(-)
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 62a8a72901..235349fce3 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -496,13 +496,7 @@ public abstract class NettyRemotingAbstract {
public CompletableFuture<ResponseFuture> invokeImpl(final Channel channel,
final RemotingCommand request,
final long timeoutMillis) {
- String channelRemoteAddr =
RemotingHelper.parseChannelRemoteAddr(channel);
- doBeforeRpcHooks(channelRemoteAddr, request);
- return invoke0(channel, request, timeoutMillis).whenComplete((v, t) ->
{
- if (t == null) {
- doAfterRpcHooks(channelRemoteAddr, request,
v.getResponseCommand());
- }
- });
+ return invoke0(channel, request, timeoutMillis);
}
protected CompletableFuture<ResponseFuture> invoke0(final Channel channel,
final RemotingCommand request,
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index f5157d0304..925c4f9cb2 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -804,6 +804,9 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
public CompletableFuture<ResponseFuture> invokeImpl(final Channel channel,
final RemotingCommand request,
final long timeoutMillis) {
Stopwatch stopwatch = Stopwatch.createStarted();
+ String channelRemoteAddr =
RemotingHelper.parseChannelRemoteAddr(channel);
+ doBeforeRpcHooks(channelRemoteAddr, request);
+
return super.invokeImpl(channel, request,
timeoutMillis).thenCompose(responseFuture -> {
RemotingCommand response = responseFuture.getResponseCommand();
if (response.getCode() == ResponseCode.GO_AWAY) {
@@ -839,6 +842,10 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
}
}
return CompletableFuture.completedFuture(responseFuture);
+ }).whenComplete((v, t) -> {
+ if (t == null) {
+ doAfterRpcHooks(channelRemoteAddr, request,
v.getResponseCommand());
+ }
});
}