This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 66ba4566f5 close channel when receive go away twice (#8862)
66ba4566f5 is described below
commit 66ba4566f5ebaeac47c73eaaf4a86567e3760063
Author: qianye <[email protected]>
AuthorDate: Thu Nov 14 14:07:20 2024 +0800
close channel when receive go away twice (#8862)
close channel when receive go away twice (#8862)
---
.../rocketmq/remoting/common/RemotingHelper.java | 30 ++++++++----
.../rocketmq/remoting/netty/NettyClientConfig.java | 10 ----
.../remoting/netty/NettyRemotingAbstract.java | 2 +-
.../remoting/netty/NettyRemotingClient.java | 53 ++++++++--------------
4 files changed, 41 insertions(+), 54 deletions(-)
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
index 552fd2b15f..d94efe71e4 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/common/RemotingHelper.java
@@ -21,6 +21,15 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.utils.NetworkUtil;
@@ -36,15 +45,6 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
-import java.io.IOException;
-import java.lang.reflect.Field;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
-import java.util.HashMap;
-import java.util.Map;
-
public class RemotingHelper {
public static final String DEFAULT_CHARSET = "UTF-8";
public static final String DEFAULT_CIDR_ALL = "0.0.0.0/0";
@@ -355,6 +355,18 @@ public class RemotingHelper {
}
}
+ public static CompletableFuture<Void>
convertChannelFutureToCompletableFuture(ChannelFuture channelFuture) {
+ CompletableFuture<Void> completableFuture = new CompletableFuture<>();
+ channelFuture.addListener((ChannelFutureListener) future -> {
+ if (future.isSuccess()) {
+ completableFuture.complete(null);
+ } else {
+ completableFuture.completeExceptionally(new
RemotingConnectException(channelFuture.channel().remoteAddress().toString(),
future.cause()));
+ }
+ });
+ return completableFuture;
+ }
+
public static String getRequestCodeDesc(int code) {
return REQUEST_CODE_MAP.getOrDefault(code, String.valueOf(code));
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
index 7b7263e27a..8260163640 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyClientConfig.java
@@ -59,8 +59,6 @@ public class NettyClientConfig {
private boolean enableReconnectForGoAway = true;
- private boolean enableTransparentRetry = true;
-
public boolean isClientCloseSocketIfTimeout() {
return clientCloseSocketIfTimeout;
}
@@ -205,14 +203,6 @@ public class NettyClientConfig {
this.enableReconnectForGoAway = enableReconnectForGoAway;
}
- public boolean isEnableTransparentRetry() {
- return enableTransparentRetry;
- }
-
- public void setEnableTransparentRetry(boolean enableTransparentRetry) {
- this.enableTransparentRetry = enableTransparentRetry;
- }
-
public String getSocksProxyConfig() {
return socksProxyConfig;
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index ffa3726059..b0c7099b9d 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -273,7 +273,7 @@ public abstract class NettyRemotingAbstract {
Runnable run = buildProcessRequestHandler(ctx, cmd, pair, opaque);
if (isShuttingDown.get()) {
- if (cmd.getVersion() > MQVersion.Version.V5_1_4.ordinal()) {
+ if (cmd.getVersion() > MQVersion.Version.V5_3_1.ordinal()) {
final RemotingCommand response =
RemotingCommand.createResponseCommand(ResponseCode.GO_AWAY,
"please go away");
response.setOpaque(opaque);
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index ae82b09eda..b3042c9f8d 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -73,6 +73,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.utils.FutureUtils;
import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -88,6 +89,8 @@ import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.proxy.SocksProxyConfig;
+import static
org.apache.rocketmq.remoting.common.RemotingHelper.convertChannelFutureToCompletableFuture;
+
public class NettyRemotingClient extends NettyRemotingAbstract implements
RemotingClient {
private static final Logger LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
@@ -554,7 +557,7 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
updateChannelLastResponseTime(addr);
return response;
} catch (RemotingSendRequestException e) {
- LOGGER.warn("invokeSync: send request exception, so close the
channel[{}]", channelRemoteAddr);
+ LOGGER.warn("invokeSync: send request exception, so close the
channel[addr={}, id={}]", channelRemoteAddr, channel.id());
this.closeChannel(addr, channel);
throw e;
} catch (RemotingTimeoutException e) {
@@ -832,45 +835,27 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
return channelWrapper0;
});
if (channelWrapper != null &&
!channelWrapper.isWrapperOf(channel)) {
- if (nettyClientConfig.isEnableTransparentRetry()) {
- RemotingCommand retryRequest =
RemotingCommand.createRequestCommand(request.getCode(),
request.readCustomHeader());
- retryRequest.setBody(request.getBody());
- retryRequest.setExtFields(request.getExtFields());
- if (channelWrapper.isOK()) {
- long duration =
stopwatch.elapsed(TimeUnit.MILLISECONDS);
- stopwatch.stop();
- Channel retryChannel =
channelWrapper.getChannel();
- if (retryChannel != null && channel !=
retryChannel) {
- return super.invokeImpl(retryChannel,
retryRequest, timeoutMillis - duration);
- }
- } else {
- CompletableFuture<ResponseFuture> future = new
CompletableFuture<>();
- ChannelFuture channelFuture =
channelWrapper.getChannelFuture();
- channelFuture.addListener(f -> {
- long duration =
stopwatch.elapsed(TimeUnit.MILLISECONDS);
- stopwatch.stop();
- if (f.isSuccess()) {
- Channel retryChannel0 =
channelFuture.channel();
- if (retryChannel0 != null && channel
!= retryChannel0) {
- super.invokeImpl(retryChannel0,
retryRequest, timeoutMillis - duration).whenComplete((v, t) -> {
- if (t != null) {
-
future.completeExceptionally(t);
- } else {
- future.complete(v);
- }
- });
- }
- } else {
- future.completeExceptionally(new
RemotingConnectException(channelWrapper.channelAddress));
+ RemotingCommand retryRequest =
RemotingCommand.createRequestCommand(request.getCode(),
request.readCustomHeader());
+ retryRequest.setBody(request.getBody());
+ retryRequest.setExtFields(request.getExtFields());
+ CompletableFuture<Void> future =
convertChannelFutureToCompletableFuture(channelWrapper.getChannelFuture());
+ return future.thenCompose(v -> {
+ long duration =
stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ stopwatch.stop();
+ return
super.invokeImpl(channelWrapper.getChannel(), retryRequest, timeoutMillis -
duration)
+ .thenCompose(r -> {
+ if (r.getResponseCommand().getCode() ==
ResponseCode.GO_AWAY) {
+ return
FutureUtils.completeExceptionally(new
RemotingSendRequestException(channelRemoteAddr,
+ new Throwable("Receive GO_AWAY
twice in request from channelId=" + channel.id())));
}
+ return
CompletableFuture.completedFuture(r);
});
- return future;
- }
- }
+ });
} else {
LOGGER.warn("invokeImpl receive GO_AWAY,
channelWrapper is null or channel is the same in wrapper, channelId={}",
channel.id());
}
}
+ return FutureUtils.completeExceptionally(new
RemotingSendRequestException(channelRemoteAddr, new Throwable("Receive GO_AWAY
from channelId=" + channel.id())));
}
return CompletableFuture.completedFuture(responseFuture);
}).whenComplete((v, t) -> {