This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 7748040f41 [ISSUE #8599] Fix send fail with receiving GO_AWAY when 
rolling update proxy and add channel id in logs (#8685)
7748040f41 is described below

commit 7748040f41a69a7e18e2acb55c03005f539a4ba2
Author: qianye <[email protected]>
AuthorDate: Fri Sep 13 11:45:21 2024 +0800

    [ISSUE #8599] Fix send fail with receiving GO_AWAY when rolling update 
proxy and add channel id in logs (#8685)
---
 .../remoting/netty/NettyRemotingAbstract.java      |  8 +--
 .../remoting/netty/NettyRemotingClient.java        | 61 ++++++++++++----------
 2 files changed, 37 insertions(+), 32 deletions(-)

diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 9f3136195b..ffa3726059 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -39,8 +39,8 @@ import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import javax.annotation.Nullable;
 import org.apache.rocketmq.common.AbortProcessException;
@@ -393,7 +393,7 @@ public abstract class NettyRemotingAbstract {
                 responseFuture.release();
             }
         } else {
-            log.warn("receive response, cmd={}, but not matched any request, 
address={}", cmd, RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
+            log.warn("receive response, cmd={}, but not matched any request, 
address={}, channelId={}", cmd, 
RemotingHelper.parseChannelRemoteAddr(ctx.channel()), ctx.channel().id());
         }
     }
 
@@ -560,13 +560,13 @@ public abstract class NettyRemotingAbstract {
                         return;
                     }
                     requestFail(opaque);
-                    log.warn("send a request command to channel <{}> failed.", 
RemotingHelper.parseChannelRemoteAddr(channel));
+                    log.warn("send a request command to channel <{}>, 
channelId={}, failed.", RemotingHelper.parseChannelRemoteAddr(channel), 
channel.id());
                 });
                 return future;
             } catch (Exception e) {
                 responseTable.remove(opaque);
                 responseFuture.release();
-                log.warn("send a request command to channel <" + 
RemotingHelper.parseChannelRemoteAddr(channel) + "> Exception", e);
+                log.warn("send a request command to channel <{}> channelId={} 
Exception", RemotingHelper.parseChannelRemoteAddr(channel), channel.id(), e);
                 future.completeExceptionally(new 
RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), 
e));
                 return future;
             }
diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 41976122b2..ef9762ddc6 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -49,7 +49,6 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.security.cert.CertificateException;
-import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -416,14 +415,14 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                     boolean removeItemFromTable = true;
                     final ChannelWrapper prevCW = 
this.channelTables.get(addrRemote);
 
-                    LOGGER.info("closeChannel: begin close the channel[{}] 
Found: {}", addrRemote, prevCW != null);
+                    LOGGER.info("closeChannel: begin close the 
channel[addr={}, id={}] Found: {}", addrRemote, channel.id(), prevCW != null);
 
                     if (null == prevCW) {
-                        LOGGER.info("closeChannel: the channel[{}] has been 
removed from the channel table before", addrRemote);
+                        LOGGER.info("closeChannel: the channel[addr={}, id={}] 
has been removed from the channel table before", addrRemote, channel.id());
                         removeItemFromTable = false;
                     } else if (prevCW.isWrapperOf(channel)) {
-                        LOGGER.info("closeChannel: the channel[{}] has been 
closed before, and has been created again, nothing to do.",
-                            addrRemote);
+                        LOGGER.info("closeChannel: the channel[addr={}, id={}] 
has been closed before, and has been created again, nothing to do.",
+                            addrRemote, channel.id());
                         removeItemFromTable = false;
                     }
 
@@ -432,7 +431,7 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                         if (channelWrapper != null && 
channelWrapper.tryClose(channel)) {
                             this.channelTables.remove(addrRemote);
                         }
-                        LOGGER.info("closeChannel: the channel[{}] was removed 
from channel table", addrRemote);
+                        LOGGER.info("closeChannel: the channel[addr={}, id={}] 
was removed from channel table", addrRemote, channel.id());
                     }
 
                     RemotingHelper.closeChannel(channel);
@@ -471,7 +470,7 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                     }
 
                     if (null == prevCW) {
-                        LOGGER.info("eventCloseChannel: the channel[{}] has 
been removed from the channel table before", addrRemote);
+                        LOGGER.info("eventCloseChannel: the channel[addr={}, 
id={}] has been removed from the channel table before", 
RemotingHelper.parseChannelRemoteAddr(channel), channel.id());
                         removeItemFromTable = false;
                     }
 
@@ -480,11 +479,11 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                         if (channelWrapper != null && 
channelWrapper.tryClose(channel)) {
                             this.channelTables.remove(addrRemote);
                         }
-                        LOGGER.info("closeChannel: the channel[{}] was removed 
from channel table", addrRemote);
+                        LOGGER.info("closeChannel: the channel[addr={}, id={}] 
was removed from channel table", addrRemote, channel.id());
                         RemotingHelper.closeChannel(channel);
                     }
                 } catch (Exception e) {
-                    LOGGER.error("closeChannel: close the channel exception", 
e);
+                    LOGGER.error("closeChannel: close the channel[id={}] 
exception", channel.id(), e);
                 } finally {
                     this.lockChannelTables.unlock();
                 }
@@ -562,9 +561,9 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                 boolean shouldClose = left > MIN_CLOSE_TIMEOUT_MILLIS || left 
> timeoutMillis / 4;
                 if (nettyClientConfig.isClientCloseSocketIfTimeout() && 
shouldClose) {
                     this.closeChannel(addr, channel);
-                    LOGGER.warn("invokeSync: close socket because of timeout, 
{}ms, {}", timeoutMillis, channelRemoteAddr);
+                    LOGGER.warn("invokeSync: close socket because of timeout, 
{}ms, channel[addr={}, id={}]", timeoutMillis, channelRemoteAddr, channel.id());
                 }
-                LOGGER.warn("invokeSync: wait response timeout exception, the 
channel[{}]", channelRemoteAddr);
+                LOGGER.warn("invokeSync: wait response timeout exception, the 
channel[addr={}, id={}]", channelRemoteAddr, channel.id());
                 throw e;
             }
         } else {
@@ -819,10 +818,11 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
             RemotingCommand response = responseFuture.getResponseCommand();
             if (response.getCode() == ResponseCode.GO_AWAY) {
                 if (nettyClientConfig.isEnableReconnectForGoAway()) {
+                    LOGGER.info("Receive go away from channelId={}, 
channel={}", channel.id(), channel);
                     ChannelWrapper channelWrapper = 
channelWrapperTables.computeIfPresent(channel, (channel0, channelWrapper0) -> {
                         try {
-                            if (channelWrapper0.reconnect()) {
-                                LOGGER.info("Receive go away from channel {}, 
recreate the channel", channel0);
+                            if (channelWrapper0.reconnect(channel0)) {
+                                LOGGER.info("Receive go away from 
channelId={}, channel={}, recreate the channelId={}", channel0.id(), channel0, 
channelWrapper0.getChannel().id());
                                 
channelWrapperTables.put(channelWrapper0.getChannel(), channelWrapper0);
                             }
                         } catch (Throwable t) {
@@ -830,10 +830,11 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                         }
                         return channelWrapper0;
                     });
-                    if (channelWrapper != null) {
+                    if (channelWrapper != null && 
!channelWrapper.isWrapperOf(channel)) {
                         if (nettyClientConfig.isEnableTransparentRetry()) {
                             RemotingCommand retryRequest = 
RemotingCommand.createRequestCommand(request.getCode(), 
request.readCustomHeader());
                             retryRequest.setBody(request.getBody());
+                            retryRequest.setExtFields(request.getExtFields());
                             if (channelWrapper.isOK()) {
                                 long duration = 
stopwatch.elapsed(TimeUnit.MILLISECONDS);
                                 stopwatch.stop();
@@ -865,6 +866,8 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                                 return future;
                             }
                         }
+                    } else {
+                        LOGGER.warn("invokeImpl receive GO_AWAY, 
channelWrapper is null or channel is the same in wrapper, channelId={}", 
channel.id());
                     }
                 }
             }
@@ -1002,7 +1005,6 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
         // only affected by sync or async request, oneway is not included.
         private ChannelFuture channelToClose;
         private long lastResponseTime;
-        private volatile long lastReconnectTimestamp = 0L;
         private final String channelAddress;
 
         public ChannelWrapper(String address, ChannelFuture channelFuture) {
@@ -1021,10 +1023,7 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
         }
 
         public boolean isWrapperOf(Channel channel) {
-            if (this.channelFuture.channel() != null && 
this.channelFuture.channel() == channel) {
-                return true;
-            }
-            return false;
+            return this.channelFuture.channel() != null && 
this.channelFuture.channel() == channel;
         }
 
         private Channel getChannel() {
@@ -1052,20 +1051,27 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
             return channelAddress;
         }
 
-        public boolean reconnect() {
+        public boolean reconnect(Channel channel) {
+            if (!isWrapperOf(channel)) {
+                LOGGER.warn("channelWrapper has reconnect, so do nothing, now 
channelId={}, input channelId={}",getChannel().id(), channel.id());
+                return false;
+            }
             if (lock.writeLock().tryLock()) {
                 try {
-                    if (lastReconnectTimestamp == 0L || 
System.currentTimeMillis() - lastReconnectTimestamp > 
Duration.ofSeconds(nettyClientConfig.getMaxReconnectIntervalTimeSeconds()).toMillis())
 {
+                    if (isWrapperOf(channel)) {
                         channelToClose = channelFuture;
                         String[] hostAndPort = getHostAndPort(channelAddress);
                         channelFuture = fetchBootstrap(channelAddress)
                             .connect(hostAndPort[0], 
Integer.parseInt(hostAndPort[1]));
-                        lastReconnectTimestamp = System.currentTimeMillis();
                         return true;
+                    } else {
+                        LOGGER.warn("channelWrapper has reconnect, so do 
nothing, now channelId={}, input channelId={}",getChannel().id(), channel.id());
                     }
                 } finally {
                     lock.writeLock().unlock();
                 }
+            } else {
+                LOGGER.warn("channelWrapper reconnect try lock fail, now 
channelId={}", getChannel().id());
             }
             return false;
         }
@@ -1152,7 +1158,7 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
         @Override
         public void channelActive(ChannelHandlerContext ctx) throws Exception {
             final String remoteAddress = 
RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-            LOGGER.info("NETTY CLIENT PIPELINE: ACTIVE, {}", remoteAddress);
+            LOGGER.info("NETTY CLIENT PIPELINE: ACTIVE, {}, channelId={}", 
remoteAddress, ctx.channel().id());
             super.channelActive(ctx);
 
             if (NettyRemotingClient.this.channelEventListener != null) {
@@ -1175,7 +1181,7 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
         @Override
         public void close(ChannelHandlerContext ctx, ChannelPromise promise) 
throws Exception {
             final String remoteAddress = 
RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-            LOGGER.info("NETTY CLIENT PIPELINE: CLOSE {}", remoteAddress);
+            LOGGER.info("NETTY CLIENT PIPELINE: CLOSE channel[addr={}, 
id={}]", remoteAddress, ctx.channel().id());
             closeChannel(ctx.channel());
             super.close(ctx, promise);
             NettyRemotingClient.this.failFast(ctx.channel());
@@ -1187,7 +1193,7 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
         @Override
         public void channelInactive(ChannelHandlerContext ctx) throws 
Exception {
             final String remoteAddress = 
RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-            LOGGER.info("NETTY CLIENT PIPELINE: channelInactive, the 
channel[{}]", remoteAddress);
+            LOGGER.info("NETTY CLIENT PIPELINE: channelInactive, the 
channel[addr={}, id={}]", remoteAddress, ctx.channel().id());
             closeChannel(ctx.channel());
             super.channelInactive(ctx);
         }
@@ -1198,7 +1204,7 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
                 IdleStateEvent event = (IdleStateEvent) evt;
                 if (event.state().equals(IdleState.ALL_IDLE)) {
                     final String remoteAddress = 
RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-                    LOGGER.warn("NETTY CLIENT PIPELINE: IDLE exception [{}]", 
remoteAddress);
+                    LOGGER.warn("NETTY CLIENT PIPELINE: IDLE exception 
channel[addr={}, id={}]", remoteAddress, ctx.channel().id());
                     closeChannel(ctx.channel());
                     if (NettyRemotingClient.this.channelEventListener != null) 
{
                         NettyRemotingClient.this
@@ -1213,8 +1219,7 @@ public class NettyRemotingClient extends 
NettyRemotingAbstract implements Remoti
         @Override
         public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) throws Exception {
             final String remoteAddress = 
RemotingHelper.parseChannelRemoteAddr(ctx.channel());
-            LOGGER.warn("NETTY CLIENT PIPELINE: exceptionCaught {}", 
remoteAddress);
-            LOGGER.warn("NETTY CLIENT PIPELINE: exceptionCaught exception.", 
cause);
+            LOGGER.warn("NETTY CLIENT PIPELINE: exceptionCaught 
channel[addr={}, id={}]", remoteAddress, ctx.channel().id(), cause);
             closeChannel(ctx.channel());
             if (NettyRemotingClient.this.channelEventListener != null) {
                 NettyRemotingClient.this.putNettyEvent(new 
NettyEvent(NettyEventType.EXCEPTION, remoteAddress, ctx.channel()));

Reply via email to