This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new b9e1dc057f Triple protocol http1 upgrade support (#14026)
b9e1dc057f is described below
commit b9e1dc057f1a29f6f84be9822c799b549c849438
Author: Zhang ZP <[email protected]>
AuthorDate: Sat May 11 14:40:20 2024 +0800
Triple protocol http1 upgrade support (#14026)
* Triple protocol http1 upgrade support
* support Application-Layer Protocol Negotiation
* support Application-Layer Protocol Negotiation
* support Application-Layer Protocol Negotiation
---
dubbo-remoting/dubbo-remoting-netty4/pom.xml | 4 +
.../netty4/NettyPortUnificationServerHandler.java | 100 ++++++++++++---------
.../remoting/transport/netty4/ssl/SslContexts.java | 15 +++-
.../rpc/protocol/tri/TripleHttp2Protocol.java | 75 ++++++++++++----
.../tri/h12/HttpServerAfterUpgradeHandler.java | 69 ++++++++++++++
5 files changed, 201 insertions(+), 62 deletions(-)
diff --git a/dubbo-remoting/dubbo-remoting-netty4/pom.xml
b/dubbo-remoting/dubbo-remoting-netty4/pom.xml
index ec180ddb3f..06d548d1bd 100644
--- a/dubbo-remoting/dubbo-remoting-netty4/pom.xml
+++ b/dubbo-remoting/dubbo-remoting-netty4/pom.xml
@@ -84,6 +84,10 @@
<classifier>linux-aarch_64</classifier>
<scope>runtime</scope>
</dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-codec-http2</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.dubbo</groupId>
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
index fe07a9e69a..f3c490376a 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
@@ -39,6 +39,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.ByteToMessageDecoder;
+import io.netty.handler.ssl.ApplicationProtocolNames;
+import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;
@@ -119,48 +121,7 @@ public class NettyPortUnificationServerHandler extends
ByteToMessageDecoder {
if (providerConnectionConfig != null && isSsl(in)) {
enableSsl(ctx, providerConnectionConfig);
} else {
- Set<String> supportedProtocolNames = new
HashSet<>(protocols.keySet());
- supportedProtocolNames.retainAll(urlMapper.keySet());
-
- for (final String name : supportedProtocolNames) {
- WireProtocol protocol = protocols.get(name);
- in.markReaderIndex();
- ChannelBuffer buf = new NettyBackedChannelBuffer(in);
- final ProtocolDetector.Result result =
protocol.detector().detect(buf);
- in.resetReaderIndex();
- switch (result.flag()) {
- case UNRECOGNIZED:
- continue;
- case RECOGNIZED:
- ChannelHandler localHandler =
this.handlerMapper.getOrDefault(name, handler);
- URL localURL = this.urlMapper.getOrDefault(name, url);
- channel.setUrl(localURL);
- NettyConfigOperator operator = new
NettyConfigOperator(channel, localHandler);
- operator.setDetectResult(result);
- protocol.configServerProtocolHandler(url, operator);
- ctx.pipeline().remove(this);
- case NEED_MORE_DATA:
- return;
- default:
- return;
- }
- }
- byte[] preface = new byte[in.readableBytes()];
- in.readBytes(preface);
- Set<String> supported = url.getApplicationModel()
- .getExtensionLoader(WireProtocol.class)
- .getSupportedExtensions();
- LOGGER.error(
- INTERNAL_ERROR,
- "unknown error in remoting module",
- "",
- String.format(
- "Can not recognize protocol from downstream=%s . "
+ "preface=%s protocols=%s",
- ctx.channel().remoteAddress(),
Bytes.bytes2hex(preface), supported));
-
- // Unknown protocol; discard everything and close the connection.
- in.clear();
- ctx.close();
+ detectProtocol(ctx, url, channel, in);
}
}
@@ -171,6 +132,17 @@ public class NettyPortUnificationServerHandler extends
ByteToMessageDecoder {
p.addLast(
"unificationA",
new NettyPortUnificationServerHandler(url, false, protocols,
handler, urlMapper, handlerMapper));
+ p.addLast("ALPN", new
ApplicationProtocolNegotiationHandler(ApplicationProtocolNames.HTTP_1_1) {
+ @Override
+ protected void configurePipeline(ChannelHandlerContext ctx, String
protocol) throws Exception {
+ if (!ApplicationProtocolNames.HTTP_2.equals(protocol)) {
+ return;
+ }
+ NettyChannel channel =
NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
+ ByteBuf in = ctx.alloc().buffer();
+ detectProtocol(ctx, url, channel, in);
+ }
+ });
p.remove(this);
}
@@ -181,4 +153,48 @@ public class NettyPortUnificationServerHandler extends
ByteToMessageDecoder {
}
return false;
}
+
+ private void detectProtocol(ChannelHandlerContext ctx, URL url,
NettyChannel channel, ByteBuf in) {
+ Set<String> supportedProtocolNames = new HashSet<>(protocols.keySet());
+ supportedProtocolNames.retainAll(urlMapper.keySet());
+
+ for (final String name : supportedProtocolNames) {
+ WireProtocol protocol = protocols.get(name);
+ in.markReaderIndex();
+ ChannelBuffer buf = new NettyBackedChannelBuffer(in);
+ final ProtocolDetector.Result result =
protocol.detector().detect(buf);
+ in.resetReaderIndex();
+ switch (result.flag()) {
+ case UNRECOGNIZED:
+ continue;
+ case RECOGNIZED:
+ ChannelHandler localHandler =
this.handlerMapper.getOrDefault(name, handler);
+ URL localURL = this.urlMapper.getOrDefault(name, url);
+ channel.setUrl(localURL);
+ NettyConfigOperator operator = new
NettyConfigOperator(channel, localHandler);
+ operator.setDetectResult(result);
+ protocol.configServerProtocolHandler(url, operator);
+ ctx.pipeline().remove(this);
+ case NEED_MORE_DATA:
+ return;
+ default:
+ return;
+ }
+ }
+ byte[] preface = new byte[in.readableBytes()];
+ in.readBytes(preface);
+ Set<String> supported =
+
url.getApplicationModel().getExtensionLoader(WireProtocol.class).getSupportedExtensions();
+ LOGGER.error(
+ INTERNAL_ERROR,
+ "unknown error in remoting module",
+ "",
+ String.format(
+ "Can not recognize protocol from downstream=%s . " +
"preface=%s protocols=%s",
+ ctx.channel().remoteAddress(),
Bytes.bytes2hex(preface), supported));
+
+ // Unknown protocol; discard everything and close the connection.
+ in.clear();
+ ctx.close();
+ }
}
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/ssl/SslContexts.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/ssl/SslContexts.java
index beb7e61142..13b1bbda87 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/ssl/SslContexts.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/ssl/SslContexts.java
@@ -31,11 +31,15 @@ import java.io.InputStream;
import java.security.Provider;
import java.security.Security;
+import io.netty.handler.codec.http2.Http2SecurityUtil;
+import io.netty.handler.ssl.ApplicationProtocolConfig;
+import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.OpenSsl;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
+import io.netty.handler.ssl.SupportedCipherSuiteFilter;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CLOSE_STREAM;
@@ -77,7 +81,16 @@ public class SslContexts {
safeCloseStream(serverPrivateKeyPathStream);
}
try {
- return
sslClientContextBuilder.sslProvider(findSslProvider()).build();
+ return sslClientContextBuilder
+ .sslProvider(findSslProvider())
+ .ciphers(Http2SecurityUtil.CIPHERS,
SupportedCipherSuiteFilter.INSTANCE)
+ .applicationProtocolConfig(new ApplicationProtocolConfig(
+ ApplicationProtocolConfig.Protocol.ALPN,
+
ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE,
+
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
+ ApplicationProtocolNames.HTTP_2,
+ ApplicationProtocolNames.HTTP_1_1))
+ .build();
} catch (SSLException e) {
throw new IllegalStateException("Build SslSession failed.", e);
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index ea088c9af1..4e91dabb35 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -31,8 +31,10 @@ import
org.apache.dubbo.remoting.http12.netty4.h1.NettyHttp1ConnectionHandler;
import org.apache.dubbo.remoting.http12.netty4.h2.NettyHttp2FrameCodec;
import
org.apache.dubbo.remoting.http12.netty4.h2.NettyHttp2ProtocolSelectorHandler;
import org.apache.dubbo.remoting.utils.UrlUtils;
+import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.ScopeModelAware;
+import org.apache.dubbo.rpc.protocol.tri.h12.HttpServerAfterUpgradeHandler;
import org.apache.dubbo.rpc.protocol.tri.h12.TripleProtocolDetector;
import
org.apache.dubbo.rpc.protocol.tri.h12.http1.DefaultHttp11ServerTransportListenerFactory;
import
org.apache.dubbo.rpc.protocol.tri.h12.http2.GenericHttp2ServerTransportListenerFactory;
@@ -48,14 +50,18 @@ import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.HttpServerUpgradeHandler;
+import io.netty.handler.codec.http2.Http2CodecUtil;
import io.netty.handler.codec.http2.Http2FrameCodec;
import io.netty.handler.codec.http2.Http2FrameCodecBuilder;
import io.netty.handler.codec.http2.Http2FrameLogger;
import io.netty.handler.codec.http2.Http2MultiplexHandler;
+import io.netty.handler.codec.http2.Http2ServerUpgradeCodec;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.Http2StreamChannel;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.logging.LogLevel;
+import io.netty.util.AsciiString;
import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_ENABLE_PUSH_KEY;
import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_HEADER_TABLE_SIZE_KEY;
@@ -143,20 +149,66 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
}
private void configurerHttp1Handlers(URL url, List<ChannelHandler>
handlers) {
- handlers.add(new ChannelHandlerPretender(new HttpServerCodec()));
+ final HttpServerCodec sourceCodec = new HttpServerCodec();
+ handlers.add(new ChannelHandlerPretender(sourceCodec));
+ // Triple protocol http1 upgrade support
+ handlers.add(new ChannelHandlerPretender(new HttpServerUpgradeHandler(
+ sourceCodec,
+ protocol -> {
+ if
(AsciiString.contentEquals(Http2CodecUtil.HTTP_UPGRADE_PROTOCOL_NAME,
protocol)) {
+ Configuration config =
+
ConfigurationUtils.getGlobalConfiguration(url.getOrDefaultApplicationModel());
+ return new Http2ServerUpgradeCodec(
+ buildHttp2FrameCodec(config,
url.getOrDefaultApplicationModel()),
+ new HttpServerAfterUpgradeHandler(),
+ new HttpWriteQueueHandler(),
+ new FlushConsolidationHandler(64, true),
+ new TripleServerConnectionHandler(),
+ buildHttp2MultiplexHandler(url),
+ new TripleTailHandler());
+ }
+ // Not upgrade request
+ return null;
+ },
+ Integer.MAX_VALUE)));
+ // If the upgrade was successful, remove the message from the output
list
+ // so that it's not propagated to the next handler. This request will
+ // be propagated as a user event instead.
handlers.add(new ChannelHandlerPretender(new
HttpObjectAggregator(Integer.MAX_VALUE)));
handlers.add(new ChannelHandlerPretender(new NettyHttp1Codec()));
handlers.add(new ChannelHandlerPretender(new
NettyHttp1ConnectionHandler(
url, frameworkModel,
DefaultHttp11ServerTransportListenerFactory.INSTANCE)));
}
+ private Http2MultiplexHandler buildHttp2MultiplexHandler(URL url) {
+ return new Http2MultiplexHandler(new
ChannelInitializer<Http2StreamChannel>() {
+ @Override
+ protected void initChannel(Http2StreamChannel ch) {
+ final ChannelPipeline p = ch.pipeline();
+ p.addLast(new NettyHttp2FrameCodec());
+ p.addLast(new NettyHttp2ProtocolSelectorHandler(
+ url, frameworkModel,
GenericHttp2ServerTransportListenerFactory.INSTANCE));
+ }
+ });
+ }
+
private void configurerHttp2Handlers(URL url, List<ChannelHandler>
handlers) {
Configuration config =
ConfigurationUtils.getGlobalConfiguration(url.getOrDefaultApplicationModel());
- final Http2FrameCodec codec = TripleHttp2FrameCodecBuilder.forServer()
+ final Http2FrameCodec codec = buildHttp2FrameCodec(config,
url.getOrDefaultApplicationModel());
+ final Http2MultiplexHandler handler = buildHttp2MultiplexHandler(url);
+ handlers.add(new ChannelHandlerPretender(new HttpWriteQueueHandler()));
+ handlers.add(new ChannelHandlerPretender(codec));
+ handlers.add(new ChannelHandlerPretender(new
FlushConsolidationHandler(64, true)));
+ handlers.add(new ChannelHandlerPretender(new
TripleServerConnectionHandler()));
+ handlers.add(new ChannelHandlerPretender(handler));
+ handlers.add(new ChannelHandlerPretender(new TripleTailHandler()));
+ }
+
+ private Http2FrameCodec buildHttp2FrameCodec(Configuration config,
ApplicationModel applicationModel) {
+ return TripleHttp2FrameCodecBuilder.forServer()
.customizeConnection((connection) -> connection
.remote()
- .flowController(
- new TriHttp2RemoteFlowController(connection,
url.getOrDefaultApplicationModel())))
+ .flowController(new
TriHttp2RemoteFlowController(connection, applicationModel)))
.gracefulShutdownTimeoutMillis(10000)
.initialSettings(new Http2Settings()
.headerTableSize(
@@ -168,20 +220,5 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
config.getInt(H2_SETTINGS_MAX_HEADER_LIST_SIZE_KEY,
DEFAULT_MAX_HEADER_LIST_SIZE)))
.frameLogger(SERVER_LOGGER)
.build();
- final Http2MultiplexHandler handler = new Http2MultiplexHandler(new
ChannelInitializer<Http2StreamChannel>() {
- @Override
- protected void initChannel(Http2StreamChannel ch) {
- final ChannelPipeline p = ch.pipeline();
- p.addLast(new NettyHttp2FrameCodec());
- p.addLast(new NettyHttp2ProtocolSelectorHandler(
- url, frameworkModel,
GenericHttp2ServerTransportListenerFactory.INSTANCE));
- }
- });
- handlers.add(new ChannelHandlerPretender(new HttpWriteQueueHandler()));
- handlers.add(new ChannelHandlerPretender(codec));
- handlers.add(new ChannelHandlerPretender(new
FlushConsolidationHandler(64, true)));
- handlers.add(new ChannelHandlerPretender(new
TripleServerConnectionHandler()));
- handlers.add(new ChannelHandlerPretender(handler));
- handlers.add(new ChannelHandlerPretender(new TripleTailHandler()));
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/HttpServerAfterUpgradeHandler.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/HttpServerAfterUpgradeHandler.java
new file mode 100644
index 0000000000..30c89087d5
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/HttpServerAfterUpgradeHandler.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri.h12;
+
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.http.HttpServerUpgradeHandler;
+import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
+import io.netty.handler.codec.http2.Http2CodecUtil;
+import io.netty.handler.codec.http2.Http2FrameCodec;
+import io.netty.handler.codec.http2.Http2FrameStream;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.InboundHttpToHttp2Adapter;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+
+/**
+ * If an upgrade occurred, the program need send a simple response via HTTP/2
on stream 1 (the stream specifically reserved
+ * for cleartext HTTP upgrade). However, {@link Http2FrameCodec} send
'upgradeRequest' to upgraded channel handlers by
+ * {@link InboundHttpToHttp2Adapter} (As it noted that this may behave
strangely). So we need to distinguish the 'upgradeRequest'
+ * and send the response.<br/>
+ *
+ * @see HttpServerUpgradeHandler
+ * @see Http2FrameCodec
+ * @see InboundHttpToHttp2Adapter
+ * @since 3.3.0
+ */
+@Sharable
+public class HttpServerAfterUpgradeHandler extends
ChannelInboundHandlerAdapter {
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
+ if (msg instanceof DefaultHttp2HeadersFrame) {
+ DefaultHttp2HeadersFrame headersFrame = (DefaultHttp2HeadersFrame)
msg;
+ if (headersFrame.stream().id() ==
Http2CodecUtil.HTTP_UPGRADE_STREAM_ID && headersFrame.isEndStream()) {
+ // upgradeRequest
+ sendResponse(ctx, headersFrame.stream());
+ return;
+ }
+ }
+ super.channelRead(ctx, msg);
+ }
+
+ /**
+ * Send a frame for the response status
+ */
+ private static void sendResponse(ChannelHandlerContext ctx,
Http2FrameStream stream) {
+ Http2Headers headers = new
DefaultHttp2Headers().status(OK.codeAsText());
+ ctx.write(new DefaultHttp2HeadersFrame(headers).stream(stream));
+ ctx.write(new DefaultHttp2DataFrame(true).stream(stream));
+ }
+}