This is an automated email from the ASF dual-hosted git repository.

earthchen pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 3469d7c  Fix tls not work when use pu handler (#9882)
3469d7c is described below

commit 3469d7c731632b56391ee6d4c2d1271c20b7a38e
Author: GuoHao <[email protected]>
AuthorDate: Sat Apr 2 21:19:52 2022 +0800

    Fix tls not work when use pu handler (#9882)
---
 .../dubbo/remoting/api/PortUnificationServer.java  | 22 ++++--
 .../remoting/api/PortUnificationServerHandler.java | 81 ++++++++++++++--------
 .../protocol/tri/stream/TripleServerStream.java    |  6 +-
 3 files changed, 73 insertions(+), 36 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java
index 7ce4932..1ddf324 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServer.java
@@ -76,9 +76,11 @@ public class PortUnificationServer {
         // you can customize name and type of client thread pool by 
THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
         // the handler will be wrapped: 
MultiMessageHandler->HeartbeatHandler->handler
         this.url = ExecutorUtil.setThreadName(url, "DubboPUServerHandler");
-        this.protocols = 
ExtensionLoader.getExtensionLoader(WireProtocol.class).getActivateExtension(url,
 new String[0]);
+        this.protocols = ExtensionLoader.getExtensionLoader(WireProtocol.class)
+            .getActivateExtension(url, new String[0]);
         // read config before destroy
-        serverShutdownTimeoutMills = 
ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel());
+        serverShutdownTimeoutMills = 
ConfigurationUtils.getServerShutdownTimeout(
+            getUrl().getOrDefaultModuleModel());
     }
 
     public URL getUrl() {
@@ -122,12 +124,16 @@ public class PortUnificationServer {
 //                        p.addLast(new LoggingHandler(LogLevel.DEBUG));
 
                     final boolean enableSsl = 
getUrl().getParameter(SSL_ENABLED_KEY, false);
+                    final PortUnificationServerHandler puHandler;
                     if (enableSsl) {
-                        p.addLast("negotiation-ssl", new 
SslServerTlsHandler(getUrl()));
+                        puHandler = new PortUnificationServerHandler(url,
+                            SslContexts.buildServerSslContext(url), true, 
protocols);
+                    } else {
+                        puHandler = new PortUnificationServerHandler(url, 
null, false, protocols);
                     }
 
-                    final PortUnificationServerHandler puHandler = new 
PortUnificationServerHandler(url, protocols);
-                    p.addLast("server-idle-handler", new IdleStateHandler(0, 
0, idleTimeout, MILLISECONDS));
+                    p.addLast("server-idle-handler",
+                        new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS));
                     p.addLast("negotiation-protocol", puHandler);
                     channelGroup = puHandler.getChannels();
                 }
@@ -173,8 +179,10 @@ public class PortUnificationServer {
             if (bootstrap != null) {
                 long timeout = serverShutdownTimeoutMills;
                 long quietPeriod = Math.min(2000L, timeout);
-                Future<?> bossGroupShutdownFuture = 
bossGroup.shutdownGracefully(quietPeriod, timeout, MILLISECONDS);
-                Future<?> workerGroupShutdownFuture = 
workerGroup.shutdownGracefully(quietPeriod, timeout, MILLISECONDS);
+                Future<?> bossGroupShutdownFuture = 
bossGroup.shutdownGracefully(quietPeriod,
+                    timeout, MILLISECONDS);
+                Future<?> workerGroupShutdownFuture = 
workerGroup.shutdownGracefully(quietPeriod,
+                    timeout, MILLISECONDS);
                 bossGroupShutdownFuture.awaitUninterruptibly(timeout, 
MILLISECONDS);
                 workerGroupShutdownFuture.awaitUninterruptibly(timeout, 
MILLISECONDS);
             }
diff --git 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java
 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java
index 4e15beb..1a28cb4 100644
--- 
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java
+++ 
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/PortUnificationServerHandler.java
@@ -23,9 +23,11 @@ import org.apache.dubbo.common.logger.LoggerFactory;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.GlobalEventExecutor;
 
 import java.util.List;
@@ -38,18 +40,21 @@ public class PortUnificationServerHandler extends 
ByteToMessageDecoder {
 
     private final SslContext sslCtx;
     private final URL url;
+    private final boolean detectSsl;
     private final List<WireProtocol> protocols;
     private final DefaultChannelGroup channels = new DefaultChannelGroup(
         GlobalEventExecutor.INSTANCE);
 
     public PortUnificationServerHandler(URL url, List<WireProtocol> protocols) 
{
-        this(url, null, protocols);
+        this(url, null, false, protocols);
     }
 
-    public PortUnificationServerHandler(URL url, SslContext sslCtx, 
List<WireProtocol> protocols) {
+    public PortUnificationServerHandler(URL url, SslContext sslCtx, boolean 
detectSsl,
+        List<WireProtocol> protocols) {
         this.url = url;
         this.sslCtx = sslCtx;
         this.protocols = protocols;
+        this.detectSsl = detectSsl;
     }
 
     @Override
@@ -81,34 +86,54 @@ public class PortUnificationServerHandler extends 
ByteToMessageDecoder {
             return;
         }
 
-        for (final WireProtocol protocol : protocols) {
-            in.markReaderIndex();
-            final ProtocolDetector.Result result = 
protocol.detector().detect(ctx, in);
-            in.resetReaderIndex();
-            switch (result) {
-                case UNRECOGNIZED:
-                    continue;
-                case RECOGNIZED:
-                    protocol.configServerPipeline(url, ctx.pipeline(), sslCtx);
-                    ctx.pipeline().remove(this);
-                case NEED_MORE_DATA:
-                    return;
-                default:
-                    return;
+        if (isSsl(in)) {
+            enableSsl(ctx);
+        } else {
+            for (final WireProtocol protocol : protocols) {
+                in.markReaderIndex();
+                final ProtocolDetector.Result result = 
protocol.detector().detect(ctx, in);
+                in.resetReaderIndex();
+                switch (result) {
+                    case UNRECOGNIZED:
+                        continue;
+                    case RECOGNIZED:
+                        protocol.configServerPipeline(url, ctx.pipeline(), 
sslCtx);
+                        ctx.pipeline().remove(this);
+                    case NEED_MORE_DATA:
+                        return;
+                    default:
+                        return;
+                }
             }
+            byte[] preface = new byte[in.readableBytes()];
+            in.readBytes(preface);
+            Set<String> supported = url.getApplicationModel()
+                .getExtensionLoader(WireProtocol.class)
+                .getSupportedExtensions();
+            LOGGER.error(String.format("Can not recognize protocol from 
downstream=%s . "
+                    + "preface=%s protocols=%s", ctx.channel().remoteAddress(),
+                Bytes.bytes2hex(preface),
+                supported));
+
+            // Unknown protocol; discard everything and close the connection.
+            in.clear();
+            ctx.close();
         }
-        byte[] preface = new byte[in.readableBytes()];
-        in.readBytes(preface);
-        Set<String> supported = url.getApplicationModel()
-            .getExtensionLoader(WireProtocol.class)
-            .getSupportedExtensions();
-        LOGGER.error(String.format("Can not recognize protocol from 
downstream=%s . "
-                + "preface=%s protocols=%s", ctx.channel().remoteAddress(), 
Bytes.bytes2hex(preface),
-            supported));
-
-        // Unknown protocol; discard everything and close the connection.
-        in.clear();
-        ctx.close();
     }
 
+    private void enableSsl(ChannelHandlerContext ctx) {
+        ChannelPipeline p = ctx.pipeline();
+        p.addLast("ssl", sslCtx.newHandler(ctx.alloc()));
+        p.addLast("unificationA", new PortUnificationServerHandler(url, 
sslCtx, false, protocols));
+        p.remove(this);
+    }
+
+    private boolean isSsl(ByteBuf buf) {
+        if (detectSsl) {
+            return SslHandler.isEncrypted(buf);
+        }
+        return false;
+    }
+
+
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
index 391eabb..3eab0c8 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/stream/TripleServerStream.java
@@ -384,7 +384,8 @@ public class TripleServerStream extends AbstractStream 
implements ServerStream {
             Map<String, Object> requestMetadata = headersToMap(headers);
             boolean hasStub = pathResolver.hasNativeStub(path);
             if (hasStub) {
-                listener = new StubAbstractServerCall(invoker, 
TripleServerStream.this, frameworkModel,
+                listener = new StubAbstractServerCall(invoker, 
TripleServerStream.this,
+                    frameworkModel,
                     acceptEncoding, serviceName, originalMethodName, executor);
             } else {
                 listener = new ReflectionAbstractServerCall(invoker, 
TripleServerStream.this,
@@ -407,6 +408,9 @@ public class TripleServerStream extends AbstractStream 
implements ServerStream {
         }
 
         private void doOnData(ByteBuf data, boolean endStream) {
+            if (deframer == null) {
+                return;
+            }
             deframer.deframe(data);
             if (endStream) {
                 deframer.close();

Reply via email to