This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7748040f41 [ISSUE #8599] Fix send fail with receiving GO_AWAY when
rolling update proxy and add channel id in logs (#8685)
7748040f41 is described below
commit 7748040f41a69a7e18e2acb55c03005f539a4ba2
Author: qianye <[email protected]>
AuthorDate: Fri Sep 13 11:45:21 2024 +0800
[ISSUE #8599] Fix send fail with receiving GO_AWAY when rolling update
proxy and add channel id in logs (#8685)
---
.../remoting/netty/NettyRemotingAbstract.java | 8 +--
.../remoting/netty/NettyRemotingClient.java | 61 ++++++++++++----------
2 files changed, 37 insertions(+), 32 deletions(-)
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 9f3136195b..ffa3726059 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -39,8 +39,8 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.rocketmq.common.AbortProcessException;
@@ -393,7 +393,7 @@ public abstract class NettyRemotingAbstract {
responseFuture.release();
}
} else {
- log.warn("receive response, cmd={}, but not matched any request,
address={}", cmd, RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+ log.warn("receive response, cmd={}, but not matched any request,
address={}, channelId={}", cmd,
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), ctx.channel().id());
}
}
@@ -560,13 +560,13 @@ public abstract class NettyRemotingAbstract {
return;
}
requestFail(opaque);
- log.warn("send a request command to channel <{}> failed.",
RemotingHelper.parseChannelRemoteAddr(channel));
+ log.warn("send a request command to channel <{}>,
channelId={}, failed.", RemotingHelper.parseChannelRemoteAddr(channel),
channel.id());
});
return future;
} catch (Exception e) {
responseTable.remove(opaque);
responseFuture.release();
- log.warn("send a request command to channel <" +
RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
+ log.warn("send a request command to channel <{}> channelId={}
Exception", RemotingHelper.parseChannelRemoteAddr(channel), channel.id(), e);
future.completeExceptionally(new
RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel),
e));
return future;
}
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 41976122b2..ef9762ddc6 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -49,7 +49,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.cert.CertificateException;
-import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -416,14 +415,14 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
boolean removeItemFromTable = true;
final ChannelWrapper prevCW =
this.channelTables.get(addrRemote);
- LOGGER.info("closeChannel: begin close the channel[{}]
Found: {}", addrRemote, prevCW != null);
+ LOGGER.info("closeChannel: begin close the
channel[addr={}, id={}] Found: {}", addrRemote, channel.id(), prevCW != null);
if (null == prevCW) {
- LOGGER.info("closeChannel: the channel[{}] has been
removed from the channel table before", addrRemote);
+ LOGGER.info("closeChannel: the channel[addr={}, id={}]
has been removed from the channel table before", addrRemote, channel.id());
removeItemFromTable = false;
} else if (prevCW.isWrapperOf(channel)) {
- LOGGER.info("closeChannel: the channel[{}] has been
closed before, and has been created again, nothing to do.",
- addrRemote);
+ LOGGER.info("closeChannel: the channel[addr={}, id={}]
has been closed before, and has been created again, nothing to do.",
+ addrRemote, channel.id());
removeItemFromTable = false;
}
@@ -432,7 +431,7 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
if (channelWrapper != null &&
channelWrapper.tryClose(channel)) {
this.channelTables.remove(addrRemote);
}
- LOGGER.info("closeChannel: the channel[{}] was removed
from channel table", addrRemote);
+ LOGGER.info("closeChannel: the channel[addr={}, id={}]
was removed from channel table", addrRemote, channel.id());
}
RemotingHelper.closeChannel(channel);
@@ -471,7 +470,7 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
}
if (null == prevCW) {
- LOGGER.info("eventCloseChannel: the channel[{}] has
been removed from the channel table before", addrRemote);
+ LOGGER.info("eventCloseChannel: the channel[addr={},
id={}] has been removed from the channel table before",
RemotingHelper.parseChannelRemoteAddr(channel), channel.id());
removeItemFromTable = false;
}
@@ -480,11 +479,11 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
if (channelWrapper != null &&
channelWrapper.tryClose(channel)) {
this.channelTables.remove(addrRemote);
}
- LOGGER.info("closeChannel: the channel[{}] was removed
from channel table", addrRemote);
+ LOGGER.info("closeChannel: the channel[addr={}, id={}]
was removed from channel table", addrRemote, channel.id());
RemotingHelper.closeChannel(channel);
}
} catch (Exception e) {
- LOGGER.error("closeChannel: close the channel exception",
e);
+ LOGGER.error("closeChannel: close the channel[id={}]
exception", channel.id(), e);
} finally {
this.lockChannelTables.unlock();
}
@@ -562,9 +561,9 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
boolean shouldClose = left > MIN_CLOSE_TIMEOUT_MILLIS || left
> timeoutMillis / 4;
if (nettyClientConfig.isClientCloseSocketIfTimeout() &&
shouldClose) {
this.closeChannel(addr, channel);
- LOGGER.warn("invokeSync: close socket because of timeout,
{}ms, {}", timeoutMillis, channelRemoteAddr);
+ LOGGER.warn("invokeSync: close socket because of timeout,
{}ms, channel[addr={}, id={}]", timeoutMillis, channelRemoteAddr, channel.id());
}
- LOGGER.warn("invokeSync: wait response timeout exception, the
channel[{}]", channelRemoteAddr);
+ LOGGER.warn("invokeSync: wait response timeout exception, the
channel[addr={}, id={}]", channelRemoteAddr, channel.id());
throw e;
}
} else {
@@ -819,10 +818,11 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
RemotingCommand response = responseFuture.getResponseCommand();
if (response.getCode() == ResponseCode.GO_AWAY) {
if (nettyClientConfig.isEnableReconnectForGoAway()) {
+ LOGGER.info("Receive go away from channelId={},
channel={}", channel.id(), channel);
ChannelWrapper channelWrapper =
channelWrapperTables.computeIfPresent(channel, (channel0, channelWrapper0) -> {
try {
- if (channelWrapper0.reconnect()) {
- LOGGER.info("Receive go away from channel {},
recreate the channel", channel0);
+ if (channelWrapper0.reconnect(channel0)) {
+ LOGGER.info("Receive go away from
channelId={}, channel={}, recreate the channelId={}", channel0.id(), channel0,
channelWrapper0.getChannel().id());
channelWrapperTables.put(channelWrapper0.getChannel(), channelWrapper0);
}
} catch (Throwable t) {
@@ -830,10 +830,11 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
}
return channelWrapper0;
});
- if (channelWrapper != null) {
+ if (channelWrapper != null &&
!channelWrapper.isWrapperOf(channel)) {
if (nettyClientConfig.isEnableTransparentRetry()) {
RemotingCommand retryRequest =
RemotingCommand.createRequestCommand(request.getCode(),
request.readCustomHeader());
retryRequest.setBody(request.getBody());
+ retryRequest.setExtFields(request.getExtFields());
if (channelWrapper.isOK()) {
long duration =
stopwatch.elapsed(TimeUnit.MILLISECONDS);
stopwatch.stop();
@@ -865,6 +866,8 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
return future;
}
}
+ } else {
+ LOGGER.warn("invokeImpl receive GO_AWAY,
channelWrapper is null or channel is the same in wrapper, channelId={}",
channel.id());
}
}
}
@@ -1002,7 +1005,6 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
// only affected by sync or async request, oneway is not included.
private ChannelFuture channelToClose;
private long lastResponseTime;
- private volatile long lastReconnectTimestamp = 0L;
private final String channelAddress;
public ChannelWrapper(String address, ChannelFuture channelFuture) {
@@ -1021,10 +1023,7 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
}
public boolean isWrapperOf(Channel channel) {
- if (this.channelFuture.channel() != null &&
this.channelFuture.channel() == channel) {
- return true;
- }
- return false;
+ return this.channelFuture.channel() != null &&
this.channelFuture.channel() == channel;
}
private Channel getChannel() {
@@ -1052,20 +1051,27 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
return channelAddress;
}
- public boolean reconnect() {
+ public boolean reconnect(Channel channel) {
+ if (!isWrapperOf(channel)) {
+ LOGGER.warn("channelWrapper has reconnect, so do nothing, now
channelId={}, input channelId={}",getChannel().id(), channel.id());
+ return false;
+ }
if (lock.writeLock().tryLock()) {
try {
- if (lastReconnectTimestamp == 0L ||
System.currentTimeMillis() - lastReconnectTimestamp >
Duration.ofSeconds(nettyClientConfig.getMaxReconnectIntervalTimeSeconds()).toMillis())
{
+ if (isWrapperOf(channel)) {
channelToClose = channelFuture;
String[] hostAndPort = getHostAndPort(channelAddress);
channelFuture = fetchBootstrap(channelAddress)
.connect(hostAndPort[0],
Integer.parseInt(hostAndPort[1]));
- lastReconnectTimestamp = System.currentTimeMillis();
return true;
+ } else {
+ LOGGER.warn("channelWrapper has reconnect, so do
nothing, now channelId={}, input channelId={}",getChannel().id(), channel.id());
}
} finally {
lock.writeLock().unlock();
}
+ } else {
+ LOGGER.warn("channelWrapper reconnect try lock fail, now
channelId={}", getChannel().id());
}
return false;
}
@@ -1152,7 +1158,7 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String remoteAddress =
RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- LOGGER.info("NETTY CLIENT PIPELINE: ACTIVE, {}", remoteAddress);
+ LOGGER.info("NETTY CLIENT PIPELINE: ACTIVE, {}, channelId={}",
remoteAddress, ctx.channel().id());
super.channelActive(ctx);
if (NettyRemotingClient.this.channelEventListener != null) {
@@ -1175,7 +1181,7 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
final String remoteAddress =
RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- LOGGER.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
+ LOGGER.info("NETTY CLIENT PIPELINE: CLOSE channel[addr={},
id={}]", remoteAddress, ctx.channel().id());
closeChannel(ctx.channel());
super.close(ctx, promise);
NettyRemotingClient.this.failFast(ctx.channel());
@@ -1187,7 +1193,7 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
@Override
public void channelInactive(ChannelHandlerContext ctx) throws
Exception {
final String remoteAddress =
RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- LOGGER.info("NETTY CLIENT PIPELINE: channelInactive, the
channel[{}]", remoteAddress);
+ LOGGER.info("NETTY CLIENT PIPELINE: channelInactive, the
channel[addr={}, id={}]", remoteAddress, ctx.channel().id());
closeChannel(ctx.channel());
super.channelInactive(ctx);
}
@@ -1198,7 +1204,7 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state().equals(IdleState.ALL_IDLE)) {
final String remoteAddress =
RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- LOGGER.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]",
remoteAddress);
+ LOGGER.warn("NETTY CLIENT PIPELINE: IDLE exception
channel[addr={}, id={}]", remoteAddress, ctx.channel().id());
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null)
{
NettyRemotingClient.this
@@ -1213,8 +1219,7 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable
cause) throws Exception {
final String remoteAddress =
RemotingHelper.parseChannelRemoteAddr(ctx.channel());
- LOGGER.warn("NETTY CLIENT PIPELINE: exceptionCaught {}",
remoteAddress);
- LOGGER.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.",
cause);
+ LOGGER.warn("NETTY CLIENT PIPELINE: exceptionCaught
channel[addr={}, id={}]", remoteAddress, ctx.channel().id(), cause);
closeChannel(ctx.channel());
if (NettyRemotingClient.this.channelEventListener != null) {
NettyRemotingClient.this.putNettyEvent(new
NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));