This is an automated email from the ASF dual-hosted git repository.
guohao pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new f2eef4874f Fix graceful shutdown not work (#9938)
f2eef4874f is described below
commit f2eef4874f65b21b7b310de588d9f55f4aeb2e2c
Author: GuoHao <[email protected]>
AuthorDate: Tue Apr 19 12:21:03 2022 +0800
Fix graceful shutdown not work (#9938)
---
.../dubbo/remoting/api/PortUnificationServer.java | 18 +++++++--------
.../remoting/api/PortUnificationServerHandler.java | 27 ++++++----------------
.../protocol/tri/transport/GracefulShutdown.java | 7 +++---
3 files changed, 19 insertions(+), 33 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java
index 1ddf3241fa..f9b2bb4272 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java
@@ -34,11 +34,11 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
-import io.netty.channel.group.ChannelGroupFuture;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GlobalEventExecutor;
import java.net.InetSocketAddress;
import java.util.List;
@@ -59,6 +59,10 @@ public class PortUnificationServer {
private static final Logger logger =
LoggerFactory.getLogger(PortUnificationServer.class);
private final List<WireProtocol> protocols;
private final URL url;
+
+ private final DefaultChannelGroup channels = new DefaultChannelGroup(
+ GlobalEventExecutor.INSTANCE);
+
private final int serverShutdownTimeoutMills;
/**
* netty server bootstrap.
@@ -68,7 +72,6 @@ public class PortUnificationServer {
* the boss channel that receive connections and dispatch these to worker
channel.
*/
private Channel channel;
- private DefaultChannelGroup channelGroup;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
@@ -127,15 +130,15 @@ public class PortUnificationServer {
final PortUnificationServerHandler puHandler;
if (enableSsl) {
puHandler = new PortUnificationServerHandler(url,
- SslContexts.buildServerSslContext(url), true,
protocols);
+ SslContexts.buildServerSslContext(url), true,
protocols, channels);
} else {
- puHandler = new PortUnificationServerHandler(url,
null, false, protocols);
+ puHandler = new PortUnificationServerHandler(url,
null, false, protocols,
+ channels);
}
p.addLast("server-idle-handler",
new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS));
p.addLast("negotiation-protocol", puHandler);
- channelGroup = puHandler.getChannels();
}
});
// bind
@@ -161,10 +164,7 @@ public class PortUnificationServer {
channel = null;
}
- if (channelGroup != null) {
- ChannelGroupFuture closeFuture = channelGroup.close();
- closeFuture.await(serverShutdownTimeoutMills);
- }
+ channels.close().await(serverShutdownTimeoutMills);
final long cost = System.currentTimeMillis() - st;
logger.info("Port unification server closed. cost:" + cost);
} catch (InterruptedException e) {
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java
index 1a28cb43f1..634b406313 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java
@@ -24,11 +24,10 @@ import org.apache.dubbo.common.logger.LoggerFactory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
-import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
-import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.List;
import java.util.Set;
@@ -38,23 +37,20 @@ public class PortUnificationServerHandler extends
ByteToMessageDecoder {
private static final Logger LOGGER = LoggerFactory.getLogger(
PortUnificationServerHandler.class);
+ private final ChannelGroup channels;
+
private final SslContext sslCtx;
private final URL url;
private final boolean detectSsl;
private final List<WireProtocol> protocols;
- private final DefaultChannelGroup channels = new DefaultChannelGroup(
- GlobalEventExecutor.INSTANCE);
-
- public PortUnificationServerHandler(URL url, List<WireProtocol> protocols)
{
- this(url, null, false, protocols);
- }
public PortUnificationServerHandler(URL url, SslContext sslCtx, boolean
detectSsl,
- List<WireProtocol> protocols) {
+ List<WireProtocol> protocols, ChannelGroup channels) {
this.url = url;
this.sslCtx = sslCtx;
this.protocols = protocols;
this.detectSsl = detectSsl;
+ this.channels = channels;
}
@Override
@@ -62,22 +58,12 @@ public class PortUnificationServerHandler extends
ByteToMessageDecoder {
LOGGER.error("Unexpected exception from downstream before protocol
detected.", cause);
}
- public DefaultChannelGroup getChannels() {
- return channels;
- }
-
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
channels.add(ctx.channel());
}
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- super.channelInactive(ctx);
- channels.remove(ctx.channel());
- }
-
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object>
out)
throws Exception {
@@ -124,7 +110,8 @@ public class PortUnificationServerHandler extends
ByteToMessageDecoder {
private void enableSsl(ChannelHandlerContext ctx) {
ChannelPipeline p = ctx.pipeline();
p.addLast("ssl", sslCtx.newHandler(ctx.alloc()));
- p.addLast("unificationA", new PortUnificationServerHandler(url,
sslCtx, false, protocols));
+ p.addLast("unificationA",
+ new PortUnificationServerHandler(url, sslCtx, false, protocols,
channels));
p.remove(this);
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java
index 43852a988e..5e7090c6c8 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/transport/GracefulShutdown.java
@@ -48,14 +48,14 @@ public class GracefulShutdown {
Http2GoAwayFrame goAwayFrame = new
DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR, ByteBufUtil
.writeAscii(ctx.alloc(), goAwayMessage));
goAwayFrame.setExtraStreamIds(Integer.MAX_VALUE);
- ctx.write(goAwayFrame);
+ ctx.writeAndFlush(goAwayFrame);
pingFuture = ctx.executor().schedule(
() -> secondGoAwayAndClose(ctx),
GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
TimeUnit.NANOSECONDS);
Http2PingFrame pingFrame = new
DefaultHttp2PingFrame(GRACEFUL_SHUTDOWN_PING, false);
- ctx.write(pingFrame);
+ ctx.writeAndFlush(pingFrame);
}
void secondGoAwayAndClose(ChannelHandlerContext ctx) {
@@ -69,8 +69,7 @@ public class GracefulShutdown {
try {
Http2GoAwayFrame goAwayFrame = new
DefaultHttp2GoAwayFrame(Http2Error.NO_ERROR,
ByteBufUtil.writeAscii(this.ctx.alloc(), this.goAwayMessage));
- ctx.write(goAwayFrame);
- ctx.flush();
+ ctx.writeAndFlush(goAwayFrame);
//TODO support customize graceful shutdown timeout mills
ctx.close(originPromise);
} catch (Exception e) {