This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new d73b601382 [ISSUE #7330] Fix channel connect issue for goaway (#7467)
d73b601382 is described below
commit d73b6013825db9124e39a37db67094e34b9c3d88
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Mon Oct 16 19:06:40 2023 +0800
[ISSUE #7330] Fix channel connect issue for goaway (#7467)
* add waitChannelFuture for goaway
* add body for retry channel
---
.../remoting/netty/NettyRemotingClient.java | 41 +++++++++++++++-------
1 file changed, 28 insertions(+), 13 deletions(-)
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 4bc51bd833..340daee67e 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -716,20 +716,25 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
}
if (cw != null) {
- ChannelFuture channelFuture = cw.getChannelFuture();
- if
(channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis()))
{
- if (cw.isOK()) {
- LOGGER.info("createChannel: connect remote host[{}]
success, {}", addr, channelFuture.toString());
- return cw.getChannel();
- } else {
- LOGGER.warn("createChannel: connect remote host[" + addr +
"] failed, " + channelFuture.toString());
- }
+ return waitChannelFuture(addr, cw);
+ }
+
+ return null;
+ }
+
+ private Channel waitChannelFuture(String addr, ChannelWrapper cw) {
+ ChannelFuture channelFuture = cw.getChannelFuture();
+ if
(channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis()))
{
+ if (cw.isOK()) {
+ LOGGER.info("createChannel: connect remote host[{}] success,
{}", addr, channelFuture.toString());
+ return cw.getChannel();
} else {
- LOGGER.warn("createChannel: connect remote host[{}] timeout
{}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
- channelFuture.toString());
+ LOGGER.warn("createChannel: connect remote host[{}] failed,
{}", addr, channelFuture.toString());
}
+ } else {
+ LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms,
{}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
+ channelFuture.toString());
}
-
return null;
}
@@ -818,8 +823,14 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
long duration =
stopwatch.elapsed(TimeUnit.MILLISECONDS);
stopwatch.stop();
RemotingCommand retryRequest =
RemotingCommand.createRequestCommand(request.getCode(),
request.readCustomHeader());
- Channel retryChannel = channelWrapper.getChannel();
- if (channel != retryChannel) {
+ retryRequest.setBody(request.getBody());
+ Channel retryChannel;
+ if (channelWrapper.isOK()) {
+ retryChannel = channelWrapper.getChannel();
+ } else {
+ retryChannel =
waitChannelFuture(channelWrapper.getChannelAddress(), channelWrapper);
+ }
+ if (retryChannel != null && channel !=
retryChannel) {
return super.invokeImpl(retryChannel,
retryRequest, timeoutMillis - duration);
}
}
@@ -994,6 +1005,10 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
this.lastResponseTime = System.currentTimeMillis();
}
+ public String getChannelAddress() {
+ return channelAddress;
+ }
+
public boolean reconnect() {
if (lock.writeLock().tryLock()) {
try {