This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new b9e1dc057f Triple protocol http1 upgrade support (#14026)
b9e1dc057f is described below

commit b9e1dc057f1a29f6f84be9822c799b549c849438
Author: Zhang ZP <[email protected]>
AuthorDate: Sat May 11 14:40:20 2024 +0800

    Triple protocol http1 upgrade support (#14026)
    
    * Triple protocol http1 upgrade support
    
    * support Application-Layer Protocol Negotiation
    
    * support Application-Layer Protocol Negotiation
    
    * support Application-Layer Protocol Negotiation
---
 dubbo-remoting/dubbo-remoting-netty4/pom.xml       |   4 +
 .../netty4/NettyPortUnificationServerHandler.java  | 100 ++++++++++++---------
 .../remoting/transport/netty4/ssl/SslContexts.java |  15 +++-
 .../rpc/protocol/tri/TripleHttp2Protocol.java      |  75 ++++++++++++----
 .../tri/h12/HttpServerAfterUpgradeHandler.java     |  69 ++++++++++++++
 5 files changed, 201 insertions(+), 62 deletions(-)

diff --git a/dubbo-remoting/dubbo-remoting-netty4/pom.xml 
b/dubbo-remoting/dubbo-remoting-netty4/pom.xml
index ec180ddb3f..06d548d1bd 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/pom.xml
+++ b/dubbo-remoting/dubbo-remoting-netty4/pom.xml
@@ -84,6 +84,10 @@
       <classifier>linux-aarch_64</classifier>
       <scope>runtime</scope>
     </dependency>
+    <dependency>
+      <groupId>io.netty</groupId>
+      <artifactId>netty-codec-http2</artifactId>
+    </dependency>
 
     <dependency>
       <groupId>org.apache.dubbo</groupId>
diff --git 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
index fe07a9e69a..f3c490376a 100644
--- 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
+++ 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
@@ -39,6 +39,8 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.ssl.ApplicationProtocolNames;
+import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.handler.ssl.SslHandshakeCompletionEvent;
@@ -119,48 +121,7 @@ public class NettyPortUnificationServerHandler extends 
ByteToMessageDecoder {
         if (providerConnectionConfig != null && isSsl(in)) {
             enableSsl(ctx, providerConnectionConfig);
         } else {
-            Set<String> supportedProtocolNames = new 
HashSet<>(protocols.keySet());
-            supportedProtocolNames.retainAll(urlMapper.keySet());
-
-            for (final String name : supportedProtocolNames) {
-                WireProtocol protocol = protocols.get(name);
-                in.markReaderIndex();
-                ChannelBuffer buf = new NettyBackedChannelBuffer(in);
-                final ProtocolDetector.Result result = 
protocol.detector().detect(buf);
-                in.resetReaderIndex();
-                switch (result.flag()) {
-                    case UNRECOGNIZED:
-                        continue;
-                    case RECOGNIZED:
-                        ChannelHandler localHandler = 
this.handlerMapper.getOrDefault(name, handler);
-                        URL localURL = this.urlMapper.getOrDefault(name, url);
-                        channel.setUrl(localURL);
-                        NettyConfigOperator operator = new 
NettyConfigOperator(channel, localHandler);
-                        operator.setDetectResult(result);
-                        protocol.configServerProtocolHandler(url, operator);
-                        ctx.pipeline().remove(this);
-                    case NEED_MORE_DATA:
-                        return;
-                    default:
-                        return;
-                }
-            }
-            byte[] preface = new byte[in.readableBytes()];
-            in.readBytes(preface);
-            Set<String> supported = url.getApplicationModel()
-                    .getExtensionLoader(WireProtocol.class)
-                    .getSupportedExtensions();
-            LOGGER.error(
-                    INTERNAL_ERROR,
-                    "unknown error in remoting module",
-                    "",
-                    String.format(
-                            "Can not recognize protocol from downstream=%s . " 
+ "preface=%s protocols=%s",
-                            ctx.channel().remoteAddress(), 
Bytes.bytes2hex(preface), supported));
-
-            // Unknown protocol; discard everything and close the connection.
-            in.clear();
-            ctx.close();
+            detectProtocol(ctx, url, channel, in);
         }
     }
 
@@ -171,6 +132,17 @@ public class NettyPortUnificationServerHandler extends 
ByteToMessageDecoder {
         p.addLast(
                 "unificationA",
                 new NettyPortUnificationServerHandler(url, false, protocols, 
handler, urlMapper, handlerMapper));
+        p.addLast("ALPN", new 
ApplicationProtocolNegotiationHandler(ApplicationProtocolNames.HTTP_1_1) {
+            @Override
+            protected void configurePipeline(ChannelHandlerContext ctx, String 
protocol) throws Exception {
+                if (!ApplicationProtocolNames.HTTP_2.equals(protocol)) {
+                    return;
+                }
+                NettyChannel channel = 
NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
+                ByteBuf in = ctx.alloc().buffer();
+                detectProtocol(ctx, url, channel, in);
+            }
+        });
         p.remove(this);
     }
 
@@ -181,4 +153,48 @@ public class NettyPortUnificationServerHandler extends 
ByteToMessageDecoder {
         }
         return false;
     }
+
+    private void detectProtocol(ChannelHandlerContext ctx, URL url, 
NettyChannel channel, ByteBuf in) {
+        Set<String> supportedProtocolNames = new HashSet<>(protocols.keySet());
+        supportedProtocolNames.retainAll(urlMapper.keySet());
+
+        for (final String name : supportedProtocolNames) {
+            WireProtocol protocol = protocols.get(name);
+            in.markReaderIndex();
+            ChannelBuffer buf = new NettyBackedChannelBuffer(in);
+            final ProtocolDetector.Result result = 
protocol.detector().detect(buf);
+            in.resetReaderIndex();
+            switch (result.flag()) {
+                case UNRECOGNIZED:
+                    continue;
+                case RECOGNIZED:
+                    ChannelHandler localHandler = 
this.handlerMapper.getOrDefault(name, handler);
+                    URL localURL = this.urlMapper.getOrDefault(name, url);
+                    channel.setUrl(localURL);
+                    NettyConfigOperator operator = new 
NettyConfigOperator(channel, localHandler);
+                    operator.setDetectResult(result);
+                    protocol.configServerProtocolHandler(url, operator);
+                    ctx.pipeline().remove(this);
+                case NEED_MORE_DATA:
+                    return;
+                default:
+                    return;
+            }
+        }
+        byte[] preface = new byte[in.readableBytes()];
+        in.readBytes(preface);
+        Set<String> supported =
+                
url.getApplicationModel().getExtensionLoader(WireProtocol.class).getSupportedExtensions();
+        LOGGER.error(
+                INTERNAL_ERROR,
+                "unknown error in remoting module",
+                "",
+                String.format(
+                        "Can not recognize protocol from downstream=%s . " + 
"preface=%s protocols=%s",
+                        ctx.channel().remoteAddress(), 
Bytes.bytes2hex(preface), supported));
+
+        // Unknown protocol; discard everything and close the connection.
+        in.clear();
+        ctx.close();
+    }
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/ssl/SslContexts.java
 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/ssl/SslContexts.java
index beb7e61142..13b1bbda87 100644
--- 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/ssl/SslContexts.java
+++ 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/ssl/SslContexts.java
@@ -31,11 +31,15 @@ import java.io.InputStream;
 import java.security.Provider;
 import java.security.Security;
 
+import io.netty.handler.codec.http2.Http2SecurityUtil;
+import io.netty.handler.ssl.ApplicationProtocolConfig;
+import io.netty.handler.ssl.ApplicationProtocolNames;
 import io.netty.handler.ssl.ClientAuth;
 import io.netty.handler.ssl.OpenSsl;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.SupportedCipherSuiteFilter;
 
 import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CLOSE_STREAM;
 
@@ -77,7 +81,16 @@ public class SslContexts {
             safeCloseStream(serverPrivateKeyPathStream);
         }
         try {
-            return 
sslClientContextBuilder.sslProvider(findSslProvider()).build();
+            return sslClientContextBuilder
+                    .sslProvider(findSslProvider())
+                    .ciphers(Http2SecurityUtil.CIPHERS, 
SupportedCipherSuiteFilter.INSTANCE)
+                    .applicationProtocolConfig(new ApplicationProtocolConfig(
+                            ApplicationProtocolConfig.Protocol.ALPN,
+                            
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
+                            
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
+                            ApplicationProtocolNames.HTTP_2,
+                            ApplicationProtocolNames.HTTP_1_1))
+                    .build();
         } catch (SSLException e) {
             throw new IllegalStateException("Build SslSession failed.", e);
         }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index ea088c9af1..4e91dabb35 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -31,8 +31,10 @@ import 
org.apache.dubbo.remoting.http12.netty4.h1.NettyHttp1ConnectionHandler;
 import org.apache.dubbo.remoting.http12.netty4.h2.NettyHttp2FrameCodec;
 import 
org.apache.dubbo.remoting.http12.netty4.h2.NettyHttp2ProtocolSelectorHandler;
 import org.apache.dubbo.remoting.utils.UrlUtils;
+import org.apache.dubbo.rpc.model.ApplicationModel;
 import org.apache.dubbo.rpc.model.FrameworkModel;
 import org.apache.dubbo.rpc.model.ScopeModelAware;
+import org.apache.dubbo.rpc.protocol.tri.h12.HttpServerAfterUpgradeHandler;
 import org.apache.dubbo.rpc.protocol.tri.h12.TripleProtocolDetector;
 import 
org.apache.dubbo.rpc.protocol.tri.h12.http1.DefaultHttp11ServerTransportListenerFactory;
 import 
org.apache.dubbo.rpc.protocol.tri.h12.http2.GenericHttp2ServerTransportListenerFactory;
@@ -48,14 +50,18 @@ import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.http.HttpObjectAggregator;
 import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.HttpServerUpgradeHandler;
+import io.netty.handler.codec.http2.Http2CodecUtil;
 import io.netty.handler.codec.http2.Http2FrameCodec;
 import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
 import io.netty.handler.codec.http2.Http2FrameLogger;
 import io.netty.handler.codec.http2.Http2MultiplexHandler;
+import io.netty.handler.codec.http2.Http2ServerUpgradeCodec;
 import io.netty.handler.codec.http2.Http2Settings;
 import io.netty.handler.codec.http2.Http2StreamChannel;
 import io.netty.handler.flush.FlushConsolidationHandler;
 import io.netty.handler.logging.LogLevel;
+import io.netty.util.AsciiString;
 
 import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_ENABLE_PUSH_KEY;
 import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_HEADER_TABLE_SIZE_KEY;
@@ -143,20 +149,66 @@ public class TripleHttp2Protocol extends 
AbstractWireProtocol implements ScopeMo
     }
 
     private void configurerHttp1Handlers(URL url, List<ChannelHandler> 
handlers) {
-        handlers.add(new ChannelHandlerPretender(new HttpServerCodec()));
+        final HttpServerCodec sourceCodec = new HttpServerCodec();
+        handlers.add(new ChannelHandlerPretender(sourceCodec));
+        // Triple protocol http1 upgrade support
+        handlers.add(new ChannelHandlerPretender(new HttpServerUpgradeHandler(
+                sourceCodec,
+                protocol -> {
+                    if 
(AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME, 
protocol)) {
+                        Configuration config =
+                                
ConfigurationUtils.getGlobalConfiguration(url.getOrDefaultApplicationModel());
+                        return new Http2ServerUpgradeCodec(
+                                buildHttp2FrameCodec(config, 
url.getOrDefaultApplicationModel()),
+                                new HttpServerAfterUpgradeHandler(),
+                                new HttpWriteQueueHandler(),
+                                new FlushConsolidationHandler(64, true),
+                                new TripleServerConnectionHandler(),
+                                buildHttp2MultiplexHandler(url),
+                                new TripleTailHandler());
+                    }
+                    // Not upgrade request
+                    return null;
+                },
+                Integer.MAX_VALUE)));
+        // If the upgrade was successful, remove the message from the output 
list
+        // so that it's not propagated to the next handler. This request will
+        // be propagated as a user event instead.
         handlers.add(new ChannelHandlerPretender(new 
HttpObjectAggregator(Integer.MAX_VALUE)));
         handlers.add(new ChannelHandlerPretender(new NettyHttp1Codec()));
         handlers.add(new ChannelHandlerPretender(new 
NettyHttp1ConnectionHandler(
                 url, frameworkModel, 
DefaultHttp11ServerTransportListenerFactory.INSTANCE)));
     }
 
+    private Http2MultiplexHandler buildHttp2MultiplexHandler(URL url) {
+        return new Http2MultiplexHandler(new 
ChannelInitializer<Http2StreamChannel>() {
+            @Override
+            protected void initChannel(Http2StreamChannel ch) {
+                final ChannelPipeline p = ch.pipeline();
+                p.addLast(new NettyHttp2FrameCodec());
+                p.addLast(new NettyHttp2ProtocolSelectorHandler(
+                        url, frameworkModel, 
GenericHttp2ServerTransportListenerFactory.INSTANCE));
+            }
+        });
+    }
+
     private void configurerHttp2Handlers(URL url, List<ChannelHandler> 
handlers) {
         Configuration config = 
ConfigurationUtils.getGlobalConfiguration(url.getOrDefaultApplicationModel());
-        final Http2FrameCodec codec = TripleHttp2FrameCodecBuilder.forServer()
+        final Http2FrameCodec codec = buildHttp2FrameCodec(config, 
url.getOrDefaultApplicationModel());
+        final Http2MultiplexHandler handler = buildHttp2MultiplexHandler(url);
+        handlers.add(new ChannelHandlerPretender(new HttpWriteQueueHandler()));
+        handlers.add(new ChannelHandlerPretender(codec));
+        handlers.add(new ChannelHandlerPretender(new 
FlushConsolidationHandler(64, true)));
+        handlers.add(new ChannelHandlerPretender(new 
TripleServerConnectionHandler()));
+        handlers.add(new ChannelHandlerPretender(handler));
+        handlers.add(new ChannelHandlerPretender(new TripleTailHandler()));
+    }
+
+    private Http2FrameCodec buildHttp2FrameCodec(Configuration config, 
ApplicationModel applicationModel) {
+        return TripleHttp2FrameCodecBuilder.forServer()
                 .customizeConnection((connection) -> connection
                         .remote()
-                        .flowController(
-                                new TriHttp2RemoteFlowController(connection, 
url.getOrDefaultApplicationModel())))
+                        .flowController(new 
TriHttp2RemoteFlowController(connection, applicationModel)))
                 .gracefulShutdownTimeoutMillis(10000)
                 .initialSettings(new Http2Settings()
                         .headerTableSize(
@@ -168,20 +220,5 @@ public class TripleHttp2Protocol extends 
AbstractWireProtocol implements ScopeMo
                                 
config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY, 
DEFAULT_MAX_HEADER_LIST_SIZE)))
                 .frameLogger(SERVER_LOGGER)
                 .build();
-        final Http2MultiplexHandler handler = new Http2MultiplexHandler(new 
ChannelInitializer<Http2StreamChannel>() {
-            @Override
-            protected void initChannel(Http2StreamChannel ch) {
-                final ChannelPipeline p = ch.pipeline();
-                p.addLast(new NettyHttp2FrameCodec());
-                p.addLast(new NettyHttp2ProtocolSelectorHandler(
-                        url, frameworkModel, 
GenericHttp2ServerTransportListenerFactory.INSTANCE));
-            }
-        });
-        handlers.add(new ChannelHandlerPretender(new HttpWriteQueueHandler()));
-        handlers.add(new ChannelHandlerPretender(codec));
-        handlers.add(new ChannelHandlerPretender(new 
FlushConsolidationHandler(64, true)));
-        handlers.add(new ChannelHandlerPretender(new 
TripleServerConnectionHandler()));
-        handlers.add(new ChannelHandlerPretender(handler));
-        handlers.add(new ChannelHandlerPretender(new TripleTailHandler()));
     }
 }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/HttpServerAfterUpgradeHandler.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/HttpServerAfterUpgradeHandler.java
new file mode 100644
index 0000000000..30c89087d5
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/HttpServerAfterUpgradeHandler.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri.h12;
+
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.http.HttpServerUpgradeHandler;
+import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
+import io.netty.handler.codec.http2.Http2CodecUtil;
+import io.netty.handler.codec.http2.Http2FrameCodec;
+import io.netty.handler.codec.http2.Http2FrameStream;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.InboundHttpToHttp2Adapter;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+
+/**
+ * If an upgrade occurred, the program need send a simple response via HTTP/2 
on stream 1 (the stream specifically reserved
+ * for cleartext HTTP upgrade). However, {@link Http2FrameCodec} send 
'upgradeRequest' to upgraded channel handlers by
+ * {@link InboundHttpToHttp2Adapter} (As it noted that this may behave 
strangely). So we need to distinguish the 'upgradeRequest'
+ * and send the response.<br/>
+ *
+ * @see HttpServerUpgradeHandler
+ * @see Http2FrameCodec
+ * @see InboundHttpToHttp2Adapter
+ * @since 3.3.0
+ */
+@Sharable
+public class HttpServerAfterUpgradeHandler extends 
ChannelInboundHandlerAdapter {
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+        if (msg instanceof DefaultHttp2HeadersFrame) {
+            DefaultHttp2HeadersFrame headersFrame = (DefaultHttp2HeadersFrame) 
msg;
+            if (headersFrame.stream().id() == 
Http2CodecUtil.HTTP_UPGRADE_STREAM_ID && headersFrame.isEndStream()) {
+                // upgradeRequest
+                sendResponse(ctx, headersFrame.stream());
+                return;
+            }
+        }
+        super.channelRead(ctx, msg);
+    }
+
+    /**
+     * Send a frame for the response status
+     */
+    private static void sendResponse(ChannelHandlerContext ctx, 
Http2FrameStream stream) {
+        Http2Headers headers = new 
DefaultHttp2Headers().status(OK.codeAsText());
+        ctx.write(new DefaultHttp2HeadersFrame(headers).stream(stream));
+        ctx.write(new DefaultHttp2DataFrame(true).stream(stream));
+    }
+}

Reply via email to