This is an automated email from the ASF dual-hosted git repository.

guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new f2eef4874f Fix graceful shutdown not work (#9938)
f2eef4874f is described below

commit f2eef4874f65b21b7b310de588d9f55f4aeb2e2c
Author: GuoHao <[email protected]>
AuthorDate: Tue Apr 19 12:21:03 2022 +0800

    Fix graceful shutdown not work (#9938)
---
 .../dubbo/remoting/api/PortUnificationServer.java  | 18 +++++++--------
 .../remoting/api/PortUnificationServerHandler.java | 27 ++++++----------------
 .../protocol/tri/transport/GracefulShutdown.java   |  7 +++---
 3 files changed, 19 insertions(+), 33 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java
index 1ddf3241fa..f9b2bb4272 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java
@@ -34,11 +34,11 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
-import io.netty.channel.group.ChannelGroupFuture;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GlobalEventExecutor;
 
 import java.net.InetSocketAddress;
 import java.util.List;
@@ -59,6 +59,10 @@ public class PortUnificationServer {
     private static final Logger logger = 
LoggerFactory.getLogger(PortUnificationServer.class);
     private final List<WireProtocol> protocols;
     private final URL url;
+
+    private final DefaultChannelGroup channels = new DefaultChannelGroup(
+        GlobalEventExecutor.INSTANCE);
+
     private final int serverShutdownTimeoutMills;
     /**
      * netty server bootstrap.
@@ -68,7 +72,6 @@ public class PortUnificationServer {
      * the boss channel that receive connections and dispatch these to worker 
channel.
      */
     private Channel channel;
-    private DefaultChannelGroup channelGroup;
     private EventLoopGroup bossGroup;
     private EventLoopGroup workerGroup;
 
@@ -127,15 +130,15 @@ public class PortUnificationServer {
                     final PortUnificationServerHandler puHandler;
                     if (enableSsl) {
                         puHandler = new PortUnificationServerHandler(url,
-                            SslContexts.buildServerSslContext(url), true, 
protocols);
+                            SslContexts.buildServerSslContext(url), true, 
protocols, channels);
                     } else {
-                        puHandler = new PortUnificationServerHandler(url, 
null, false, protocols);
+                        puHandler = new PortUnificationServerHandler(url, 
null, false, protocols,
+                            channels);
                     }
 
                     p.addLast("server-idle-handler",
                         new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS));
                     p.addLast("negotiation-protocol", puHandler);
-                    channelGroup = puHandler.getChannels();
                 }
             });
         // bind
@@ -161,10 +164,7 @@ public class PortUnificationServer {
                 channel = null;
             }
 
-            if (channelGroup != null) {
-                ChannelGroupFuture closeFuture = channelGroup.close();
-                closeFuture.await(serverShutdownTimeoutMills);
-            }
+            channels.close().await(serverShutdownTimeoutMills);
             final long cost = System.currentTimeMillis() - st;
             logger.info("Port unification server closed. cost:" + cost);
         } catch (InterruptedException e) {
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java
index 1a28cb43f1..634b406313 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java
@@ -24,11 +24,10 @@ import org.apache.dubbo.common.logger.LoggerFactory;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPipeline;
-import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.group.ChannelGroup;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
-import io.netty.util.concurrent.GlobalEventExecutor;
 
 import java.util.List;
 import java.util.Set;
@@ -38,23 +37,20 @@ public class PortUnificationServerHandler extends 
ByteToMessageDecoder {
     private static final Logger LOGGER = LoggerFactory.getLogger(
         PortUnificationServerHandler.class);
 
+    private final ChannelGroup channels;
+
     private final SslContext sslCtx;
     private final URL url;
     private final boolean detectSsl;
     private final List<WireProtocol> protocols;
-    private final DefaultChannelGroup channels = new DefaultChannelGroup(
-        GlobalEventExecutor.INSTANCE);
-
-    public PortUnificationServerHandler(URL url, List<WireProtocol> protocols) 
{
-        this(url, null, false, protocols);
-    }
 
     public PortUnificationServerHandler(URL url, SslContext sslCtx, boolean 
detectSsl,
-        List<WireProtocol> protocols) {
+        List<WireProtocol> protocols, ChannelGroup channels) {
         this.url = url;
         this.sslCtx = sslCtx;
         this.protocols = protocols;
         this.detectSsl = detectSsl;
+        this.channels = channels;
     }
 
     @Override
@@ -62,22 +58,12 @@ public class PortUnificationServerHandler extends 
ByteToMessageDecoder {
         LOGGER.error("Unexpected exception from downstream before protocol 
detected.", cause);
     }
 
-    public DefaultChannelGroup getChannels() {
-        return channels;
-    }
-
     @Override
     public void channelActive(ChannelHandlerContext ctx) throws Exception {
         super.channelActive(ctx);
         channels.add(ctx.channel());
     }
 
-    @Override
-    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-        super.channelInactive(ctx);
-        channels.remove(ctx.channel());
-    }
-
     @Override
     protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> 
out)
         throws Exception {
@@ -124,7 +110,8 @@ public class PortUnificationServerHandler extends 
ByteToMessageDecoder {
     private void enableSsl(ChannelHandlerContext ctx) {
         ChannelPipeline p = ctx.pipeline();
         p.addLast("ssl", sslCtx.newHandler(ctx.alloc()));
-        p.addLast("unificationA", new PortUnificationServerHandler(url, 
sslCtx, false, protocols));
+        p.addLast("unificationA",
+            new PortUnificationServerHandler(url, sslCtx, false, protocols, 
channels));
         p.remove(this);
     }
 
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java
index 43852a988e..5e7090c6c8 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java
@@ -48,14 +48,14 @@ public class GracefulShutdown {
         Http2GoAwayFrame goAwayFrame = new 
DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR, ByteBufUtil
             .writeAscii(ctx.alloc(), goAwayMessage));
         goAwayFrame.setExtraStreamIds(Integer.MAX_VALUE);
-        ctx.write(goAwayFrame);
+        ctx.writeAndFlush(goAwayFrame);
         pingFuture = ctx.executor().schedule(
             () -> secondGoAwayAndClose(ctx),
             GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
             TimeUnit.NANOSECONDS);
 
         Http2PingFrame pingFrame = new 
DefaultHttp2PingFrame(GRACEFUL_SHUTDOWN_PING, false);
-        ctx.write(pingFrame);
+        ctx.writeAndFlush(pingFrame);
     }
 
     void secondGoAwayAndClose(ChannelHandlerContext ctx) {
@@ -69,8 +69,7 @@ public class GracefulShutdown {
         try {
             Http2GoAwayFrame goAwayFrame = new 
DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR,
                 ByteBufUtil.writeAscii(this.ctx.alloc(), this.goAwayMessage));
-            ctx.write(goAwayFrame);
-            ctx.flush();
+            ctx.writeAndFlush(goAwayFrame);
             //TODO support customize graceful shutdown timeout mills
             ctx.close(originPromise);
         } catch (Exception e) {

Reply via email to