This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 96332967c9 HTTP3 bugfix (#14955)
96332967c9 is described below
commit 96332967c942c8521bdb978ebf398813f0e3b574
Author: Sean Yang <[email protected]>
AuthorDate: Fri Dec 6 11:30:30 2024 +0800
HTTP3 bugfix (#14955)
* HTTP3 bugfix
* Fix NPE when address is null
---------
Co-authored-by: xiaosheng <[email protected]>
---
.../api/connection/AbstractConnectionClient.java | 5 --
.../dubbo/remoting/transport/AbstractClient.java | 16 +----
.../dubbo/remoting/transport/AbstractEndpoint.java | 2 +-
.../dubbo/remoting/transport/AbstractServer.java | 4 +-
.../dubbo/remoting/http3/netty4/Constants.java | 25 +++++++
.../http3/netty4/NettyHttp3FrameCodec.java | 21 +++++-
.../netty4/{Helper.java => Http3Helper.java} | 2 +-
.../netty4/NettyHttp3ConnectionClient.java | 26 +++----
.../transport/netty4/NettyHttp3Server.java | 66 ++++++-----------
.../remoting/transport/netty/NettyClient.java | 4 --
.../remoting/transport/netty/NettyHandler.java | 2 +-
.../netty/NettyPortUnificationServer.java | 5 --
.../netty4/AbstractNettyConnectionClient.java | 84 +++++++++++-----------
.../remoting/transport/netty4/AddressUtils.java | 62 ++++++++++------
.../remoting/transport/netty4/NettyChannel.java | 66 ++++++++---------
.../transport/netty4/NettyChannelHandler.java | 9 +--
.../remoting/transport/netty4/NettyClient.java | 4 --
.../transport/netty4/NettyClientHandler.java | 9 +--
.../transport/netty4/NettyConnectionHandler.java | 46 +++++-------
.../netty4/NettyPortUnificationServer.java | 5 --
.../remoting/transport/netty4/NettyServer.java | 3 -
.../transport/netty4/NettyServerHandler.java | 8 +--
.../dubbo/rpc/protocol/tri/Http3Exchanger.java | 61 ++++++++++++++--
.../rpc/protocol/tri/h3/Http3ClientFrameCodec.java | 65 +++++++++++++++--
.../tri/h3/Http3TripleServerConnectionHandler.java | 50 +++++++++++++
.../h3/negotiation/AutoSwitchConnectionClient.java | 23 +++---
.../rpc/protocol/tri/TripleHttp3ProtocolTest.java | 4 +-
27 files changed, 406 insertions(+), 271 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java
index 2fea60ce9d..c6f7d40f37 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java
@@ -17,8 +17,6 @@
package org.apache.dubbo.remoting.api.connection;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.api.WireProtocol;
@@ -32,9 +30,6 @@ import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_ERR
public abstract class AbstractConnectionClient extends AbstractClient {
- private static final ErrorTypeAwareLogger logger =
-
LoggerFactory.getErrorTypeAwareLogger(AbstractConnectionClient.class);
-
protected WireProtocol protocol;
protected InetSocketAddress remote;
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
index c26d9352f4..5b842d33af 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
@@ -18,8 +18,6 @@ package org.apache.dubbo.remoting.transport;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.threadpool.manager.FrameworkExecutorRepository;
import org.apache.dubbo.common.utils.NetUtils;
@@ -55,8 +53,6 @@ import static
org.apache.dubbo.remoting.utils.UrlUtils.getIdleTimeout;
*/
public abstract class AbstractClient extends AbstractEndpoint implements
Client {
- private static final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(AbstractClient.class);
-
private final Lock connectLock = new ReentrantLock();
private final boolean needReconnect;
@@ -148,7 +144,7 @@ public abstract class AbstractClient extends
AbstractEndpoint implements Client
private void initExecutor(URL url) {
ExecutorRepository executorRepository =
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel());
- /**
+ /*
* Consumer's executor is shared globally, provider ip doesn't need to
be part of the thread name.
*
* Instance of url is InstanceAddressURL, so addParameter actually
adds parameters into ServiceInstance,
@@ -415,36 +411,26 @@ public abstract class AbstractClient extends
AbstractEndpoint implements Client
/**
* Open client.
- *
- * @throws Throwable
*/
protected abstract void doOpen() throws Throwable;
/**
* Close client.
- *
- * @throws Throwable
*/
protected abstract void doClose() throws Throwable;
/**
* Connect to server.
- *
- * @throws Throwable
*/
protected abstract void doConnect() throws Throwable;
/**
* disConnect to server.
- *
- * @throws Throwable
*/
protected abstract void doDisConnect() throws Throwable;
/**
* Get the connected channel.
- *
- * @return channel
*/
protected abstract Channel getChannel();
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java
index cc199265d6..c5cf86996f 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java
@@ -36,7 +36,7 @@ import static
org.apache.dubbo.rpc.model.ScopeModelUtil.getFrameworkModel;
*/
public abstract class AbstractEndpoint extends AbstractPeer implements
Resetable {
- private static final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(AbstractEndpoint.class);
+ protected final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(getClass());
private Codec2 codec;
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
index bb50ddc304..1977d2b52f 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractServer.java
@@ -17,8 +17,6 @@
package org.apache.dubbo.remoting.transport;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
import org.apache.dubbo.common.utils.ConcurrentHashSet;
import org.apache.dubbo.common.utils.ExecutorUtil;
@@ -45,7 +43,7 @@ import static
org.apache.dubbo.remoting.Constants.DEFAULT_ACCEPTS;
* AbstractServer
*/
public abstract class AbstractServer extends AbstractEndpoint implements
RemotingServer {
- private static final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(AbstractServer.class);
+
private Set<ExecutorService> executors = new ConcurrentHashSet<>();
private InetSocketAddress localAddress;
private InetSocketAddress bindAddress;
diff --git
a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/Constants.java
b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/Constants.java
new file mode 100644
index 0000000000..1080305dc4
--- /dev/null
+++
b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/Constants.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.http3.netty4;
+
+public final class Constants {
+
+ public static final String PIPELINE_CONFIGURATOR_KEY =
"http3PipelineConfigurator";
+ public static final CharSequence TRI_PING = "tri-ping";
+
+ private Constants() {}
+}
diff --git
a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/NettyHttp3FrameCodec.java
b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/NettyHttp3FrameCodec.java
index 1737f2dca8..9ef885ed3e 100644
---
a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/NettyHttp3FrameCodec.java
+++
b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/http3/netty4/NettyHttp3FrameCodec.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.remoting.http3.netty4;
import org.apache.dubbo.common.io.StreamUtils;
+import org.apache.dubbo.remoting.http12.HttpStatus;
import org.apache.dubbo.remoting.http12.h2.Http2Header;
import org.apache.dubbo.remoting.http12.h2.Http2InputMessageFrame;
import org.apache.dubbo.remoting.http12.h2.Http2MetadataFrame;
@@ -34,7 +35,9 @@ import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandler;
import io.netty.channel.ChannelPromise;
+import io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName;
import io.netty.incubator.codec.http3.DefaultHttp3DataFrame;
+import io.netty.incubator.codec.http3.DefaultHttp3Headers;
import io.netty.incubator.codec.http3.DefaultHttp3HeadersFrame;
import io.netty.incubator.codec.http3.Http3DataFrame;
import io.netty.incubator.codec.http3.Http3Headers;
@@ -42,6 +45,8 @@ import io.netty.incubator.codec.http3.Http3HeadersFrame;
import io.netty.incubator.codec.http3.Http3RequestStreamInboundHandler;
import io.netty.incubator.codec.quic.QuicStreamChannel;
+import static org.apache.dubbo.remoting.http3.netty4.Constants.TRI_PING;
+
@Sharable
public class NettyHttp3FrameCodec extends Http3RequestStreamInboundHandler
implements ChannelOutboundHandler {
@@ -49,7 +54,21 @@ public class NettyHttp3FrameCodec extends
Http3RequestStreamInboundHandler imple
@Override
protected void channelRead(ChannelHandlerContext ctx, Http3HeadersFrame
frame) {
- ctx.fireChannelRead(new Http2MetadataFrame(getStreamId(ctx), new
DefaultHttpHeaders(frame.headers()), false));
+ Http3Headers headers = frame.headers();
+ if (headers.contains(TRI_PING)) {
+ pingReceived(ctx);
+ return;
+ }
+
+ ctx.fireChannelRead(new Http2MetadataFrame(getStreamId(ctx), new
DefaultHttpHeaders(headers), false));
+ }
+
+ private void pingReceived(ChannelHandlerContext ctx) {
+ Http3Headers pongHeader = new DefaultHttp3Headers(false);
+ pongHeader.set(TRI_PING, "0");
+ pongHeader.set(PseudoHeaderName.STATUS.value(),
HttpStatus.OK.getStatusString());
+ ctx.write(new DefaultHttp3HeadersFrame(pongHeader));
+ ctx.close();
}
@Override
diff --git
a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/Helper.java
b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/Http3Helper.java
similarity index 99%
rename from
dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/Helper.java
rename to
dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/Http3Helper.java
index 654831fa14..4ae742fd56 100644
---
a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/Helper.java
+++
b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/Http3Helper.java
@@ -25,7 +25,7 @@ import
io.netty.incubator.codec.quic.QuicCongestionControlAlgorithm;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-final class Helper {
+final class Http3Helper {
@SuppressWarnings("unchecked")
static <T extends QuicCodecBuilder<T>> T configCodec(QuicCodecBuilder<T>
builder, URL url) {
diff --git
a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyHttp3ConnectionClient.java
b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyHttp3ConnectionClient.java
index fc4388e75d..a3ad10d8d8 100644
---
a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyHttp3ConnectionClient.java
+++
b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyHttp3ConnectionClient.java
@@ -21,18 +21,19 @@ import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.http3.Http3SslContexts;
-import org.apache.dubbo.remoting.utils.UrlUtils;
+import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.nio.NioDatagramChannel;
-import io.netty.handler.timeout.IdleStateHandler;
import io.netty.incubator.codec.http3.Http3;
import io.netty.incubator.codec.http3.Http3ClientConnectionHandler;
import io.netty.incubator.codec.quic.QuicChannel;
@@ -40,12 +41,12 @@ import io.netty.incubator.codec.quic.QuicChannelBootstrap;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static
org.apache.dubbo.remoting.http3.netty4.Constants.PIPELINE_CONFIGURATOR_KEY;
public final class NettyHttp3ConnectionClient extends
AbstractNettyConnectionClient {
+ private Consumer<ChannelPipeline> pipelineConfigurator;
private AtomicReference<io.netty.channel.Channel> datagramChannel;
-
private QuicChannelBootstrap bootstrap;
public NettyHttp3ConnectionClient(URL url, ChannelHandler handler) throws
RemotingException {
@@ -53,16 +54,17 @@ public final class NettyHttp3ConnectionClient extends
AbstractNettyConnectionCli
}
@Override
+ @SuppressWarnings("unchecked")
protected void initConnectionClient() {
super.initConnectionClient();
datagramChannel = new AtomicReference<>();
+ pipelineConfigurator = (Consumer<ChannelPipeline>)
getUrl().getAttribute(PIPELINE_CONFIGURATOR_KEY);
+ Objects.requireNonNull(pipelineConfigurator, "pipelineConfigurator
should be set");
}
@Override
protected void initBootstrap() throws Exception {
- int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
- io.netty.channel.ChannelHandler codec =
Helper.configCodec(Http3.newQuicClientCodecBuilder(), getUrl())
- .maxIdleTimeout(idleTimeout, MILLISECONDS)
+ io.netty.channel.ChannelHandler codec =
Http3Helper.configCodec(Http3.newQuicClientCodecBuilder(), getUrl())
.sslContext(Http3SslContexts.buildClientSslContext(getUrl()))
.build();
io.netty.channel.Channel nettyDatagramChannel = new Bootstrap()
@@ -81,17 +83,15 @@ public final class NettyHttp3ConnectionClient extends
AbstractNettyConnectionCli
datagramChannel.set(nettyDatagramChannel);
nettyDatagramChannel.closeFuture().addListener(channelFuture ->
datagramChannel.set(null));
- int heartbeat = UrlUtils.getHeartbeat(getUrl());
NettyConnectionHandler connectionHandler = new
NettyConnectionHandler(this);
bootstrap = QuicChannel.newBootstrap(nettyDatagramChannel)
.handler(new ChannelInitializer<QuicChannel>() {
@Override
protected void initChannel(QuicChannel ch) {
- ch.pipeline()
- .addLast(new IdleStateHandler(heartbeat, 0, 0,
MILLISECONDS))
- .addLast(Constants.CONNECTION_HANDLER_NAME,
connectionHandler)
- .addLast(new Http3ClientConnectionHandler());
-
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast(new Http3ClientConnectionHandler());
+ pipeline.addLast(Constants.CONNECTION_HANDLER_NAME,
connectionHandler);
+ pipelineConfigurator.accept(pipeline);
ch.closeFuture().addListener(channelFuture ->
clearNettyChannel());
}
})
diff --git
a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyHttp3Server.java
b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyHttp3Server.java
index cd45f5c286..e9a8634f25 100644
---
a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyHttp3Server.java
+++
b/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyHttp3Server.java
@@ -18,104 +18,78 @@ package org.apache.dubbo.remoting.transport.netty4;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.ConfigurationUtils;
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
-import org.apache.dubbo.remoting.http12.netty4.HttpWriteQueueHandler;
import org.apache.dubbo.remoting.http3.Http3SslContexts;
-import org.apache.dubbo.remoting.http3.netty4.NettyHttp3FrameCodec;
-import
org.apache.dubbo.remoting.http3.netty4.NettyHttp3ProtocolSelectorHandler;
import org.apache.dubbo.remoting.transport.AbstractServer;
import org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers;
-import org.apache.dubbo.remoting.utils.UrlUtils;
-import org.apache.dubbo.rpc.model.FrameworkModel;
-import org.apache.dubbo.rpc.model.ScopeModelUtil;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
-import io.netty.handler.timeout.IdleStateHandler;
import io.netty.incubator.codec.http3.Http3;
-import io.netty.incubator.codec.http3.Http3ServerConnectionHandler;
import io.netty.incubator.codec.quic.InsecureQuicTokenHandler;
import io.netty.incubator.codec.quic.QuicChannel;
-import io.netty.incubator.codec.quic.QuicStreamChannel;
import io.netty.util.concurrent.Future;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_CLOSE;
import static org.apache.dubbo.remoting.Constants.EVENT_LOOP_BOSS_POOL_NAME;
+import static
org.apache.dubbo.remoting.http3.netty4.Constants.PIPELINE_CONFIGURATOR_KEY;
public class NettyHttp3Server extends AbstractServer {
- private static final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(NettyHttp3Server.class);
-
private Map<String, Channel> channels;
private Bootstrap bootstrap;
private EventLoopGroup bossGroup;
private io.netty.channel.Channel channel;
+ private final Consumer<ChannelPipeline> pipelineConfigurator;
private final int serverShutdownTimeoutMills;
+ @SuppressWarnings("unchecked")
public NettyHttp3Server(URL url, ChannelHandler handler) throws
RemotingException {
super(url, ChannelHandlers.wrap(handler, url));
+ pipelineConfigurator = (Consumer<ChannelPipeline>)
getUrl().getAttribute(PIPELINE_CONFIGURATOR_KEY);
+ Objects.requireNonNull(pipelineConfigurator, "pipelineConfigurator
should be set");
serverShutdownTimeoutMills =
ConfigurationUtils.getServerShutdownTimeout(getUrl().getOrDefaultModuleModel());
}
@Override
protected void doOpen() throws Throwable {
bootstrap = new Bootstrap();
-
bossGroup = NettyEventLoopFactory.eventLoopGroup(1,
EVENT_LOOP_BOSS_POOL_NAME);
-
NettyServerHandler nettyServerHandler = new
NettyServerHandler(getUrl(), this);
channels = nettyServerHandler.getChannels();
-
- FrameworkModel frameworkModel =
ScopeModelUtil.getFrameworkModel(getUrl().getScopeModel());
- NettyHttp3ProtocolSelectorHandler selectorHandler =
- new NettyHttp3ProtocolSelectorHandler(getUrl(),
frameworkModel);
-
- int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
- io.netty.channel.ChannelHandler codec =
Helper.configCodec(Http3.newQuicServerCodecBuilder(), getUrl())
- .sslContext(Http3SslContexts.buildServerSslContext(getUrl()))
- .maxIdleTimeout(idleTimeout, MILLISECONDS)
- .tokenHandler(InsecureQuicTokenHandler.INSTANCE)
- .handler(new ChannelInitializer<QuicChannel>() {
- @Override
- protected void initChannel(QuicChannel ch) {
- ch.pipeline()
- .addLast(nettyServerHandler)
- .addLast(new IdleStateHandler(0, 0,
idleTimeout, MILLISECONDS))
- .addLast(new Http3ServerConnectionHandler(new
ChannelInitializer<QuicStreamChannel>() {
- @Override
- protected void
initChannel(QuicStreamChannel ch) {
- ch.pipeline()
-
.addLast(NettyHttp3FrameCodec.INSTANCE)
- .addLast(new
HttpWriteQueueHandler())
- .addLast(selectorHandler);
- }
- }));
- }
- })
- .build();
-
- // bind
try {
ChannelFuture channelFuture = bootstrap
.group(bossGroup)
.channel(NioDatagramChannel.class)
- .handler(codec)
+
.handler(Http3Helper.configCodec(Http3.newQuicServerCodecBuilder(), getUrl())
+
.sslContext(Http3SslContexts.buildServerSslContext(getUrl()))
+ .tokenHandler(InsecureQuicTokenHandler.INSTANCE)
+ .handler(new ChannelInitializer<QuicChannel>() {
+ @Override
+ protected void initChannel(QuicChannel ch) {
+ ChannelPipeline pipeline = ch.pipeline();
+ pipelineConfigurator.accept(pipeline);
+ pipeline.addLast(nettyServerHandler);
+ }
+ })
+ .build())
.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
diff --git
a/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyClient.java
b/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyClient.java
index 6ed419f3a3..87cc9dd445 100644
---
a/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyClient.java
+++
b/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyClient.java
@@ -18,8 +18,6 @@ package org.apache.dubbo.remoting.transport.netty;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.ChannelHandler;
@@ -48,8 +46,6 @@ import static
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FA
*/
public class NettyClient extends AbstractClient {
- private static final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(NettyClient.class);
-
// ChannelFactory's closure has a DirectMemory leak, using static to avoid
// https://issues.jboss.org/browse/NETTY-424
private static final ChannelFactory CHANNEL_FACTORY = new
NioClientSocketChannelFactory(
diff --git
a/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyHandler.java
b/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyHandler.java
index d864639dac..e9e62b7bb9 100644
---
a/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyHandler.java
+++
b/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyHandler.java
@@ -78,7 +78,7 @@ public class NettyHandler extends SimpleChannelHandler {
NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
}
- if (logger.isInfoEnabled()) {
+ if (logger.isInfoEnabled() && channel != null) {
logger.info("The connection between " + channel.getRemoteAddress()
+ " and " + channel.getLocalAddress()
+ " is established");
}
diff --git
a/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyPortUnificationServer.java
b/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyPortUnificationServer.java
index 822b9e1fb1..dedf9cb856 100644
---
a/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyPortUnificationServer.java
+++
b/dubbo-remoting/dubbo-remoting-netty/src/main/java/org/apache/dubbo/remoting/transport/netty/NettyPortUnificationServer.java
@@ -17,8 +17,6 @@
package org.apache.dubbo.remoting.transport.netty;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.NetUtils;
@@ -58,9 +56,6 @@ import static
org.apache.dubbo.remoting.Constants.EVENT_LOOP_WORKER_POOL_NAME;
*/
public class NettyPortUnificationServer extends AbstractPortUnificationServer {
- private static final ErrorTypeAwareLogger logger =
-
LoggerFactory.getErrorTypeAwareLogger(NettyPortUnificationServer.class);
-
private Map<String, Channel> dubboChannels = new ConcurrentHashMap<>(); //
<ip:port, channel>
private ServerBootstrap bootstrap;
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AbstractNettyConnectionClient.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AbstractNettyConnectionClient.java
index a9f983f56d..f26ca4d819 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AbstractNettyConnectionClient.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AbstractNettyConnectionClient.java
@@ -18,8 +18,6 @@ package org.apache.dubbo.remoting.transport.netty4;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
@@ -45,9 +43,6 @@ import static
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FA
public abstract class AbstractNettyConnectionClient extends
AbstractConnectionClient {
- private static final ErrorTypeAwareLogger LOGGER =
-
LoggerFactory.getErrorTypeAwareLogger(AbstractNettyConnectionClient.class);
-
private AtomicReference<Promise<Object>> connectingPromiseRef;
private AtomicReference<io.netty.channel.Channel> channelRef;
@@ -94,8 +89,8 @@ public abstract class AbstractNettyConnectionClient extends
AbstractConnectionCl
protected void doClose() {
// AbstractPeer close can set closed true.
if (isClosed()) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(String.format("Connection:%s freed ", this));
+ if (logger.isDebugEnabled()) {
+ logger.debug("Connection:{} freed", this);
}
performClose();
closePromise.setSuccess(null);
@@ -117,9 +112,14 @@ public abstract class AbstractNettyConnectionClient
extends AbstractConnectionCl
}
if (isClosed()) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(String.format("%s aborted to reconnect cause
connection closed. ", this));
+ if (logger.isDebugEnabled()) {
+ logger.debug("Connection:{} aborted to reconnect cause
connection closed", this);
}
+ return;
+ }
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Connection:{} attempting to reconnect to server {}",
this, getConnectAddress());
}
init.compareAndSet(false, true);
@@ -145,11 +145,11 @@ public abstract class AbstractNettyConnectionClient
extends AbstractConnectionCl
+ ", error message is:" + cause.getMessage(),
cause);
- LOGGER.error(
+ logger.error(
TRANSPORT_FAILED_CONNECT_PROVIDER,
"network disconnected",
"",
- "Failed to connect to provider server by other reason.",
+ "Failed to connect to provider server by other reason",
cause);
throw remotingException;
@@ -163,8 +163,8 @@ public abstract class AbstractNettyConnectionClient extends
AbstractConnectionCl
+ " using dubbo version "
+ Version.getVersion());
- LOGGER.error(
- TRANSPORT_CLIENT_CONNECT_TIMEOUT, "provider crash", "",
"Client-side timeout.", remotingException);
+ logger.error(
+ TRANSPORT_CLIENT_CONNECT_TIMEOUT, "provider crash", "",
"Client-side timeout", remotingException);
throw remotingException;
}
@@ -177,6 +177,17 @@ public abstract class AbstractNettyConnectionClient
extends AbstractConnectionCl
NettyChannel.removeChannelIfDisconnected(getNettyChannel());
}
+ protected void doReconnect() {
+ connectivityExecutor.execute(() -> {
+ try {
+ doConnect();
+ } catch (RemotingException e) {
+ logger.error(
+ TRANSPORT_FAILED_RECONNECT, "", "", "Failed to
reconnect to server: " + getConnectAddress());
+ }
+ });
+ }
+
@Override
public void onConnected(Object channel) {
if (!(channel instanceof io.netty.channel.Channel)) {
@@ -186,8 +197,8 @@ public abstract class AbstractNettyConnectionClient extends
AbstractConnectionCl
io.netty.channel.Channel nettyChannel = ((io.netty.channel.Channel)
channel);
if (isClosed()) {
nettyChannel.close();
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(String.format("%s is closed, ignoring connected
event", this));
+ if (logger.isDebugEnabled()) {
+ logger.debug("Connection:{} is closed, ignoring connected
event", this);
}
return;
}
@@ -211,8 +222,8 @@ public abstract class AbstractNettyConnectionClient extends
AbstractConnectionCl
// Notify the connection is available.
connectedPromise.trySuccess(null);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(String.format("%s connected ", this));
+ if (logger.isDebugEnabled()) {
+ logger.debug("Connection:{} connected", this);
}
}
@@ -229,8 +240,8 @@ public abstract class AbstractNettyConnectionClient extends
AbstractConnectionCl
nettyChannel.close();
}
NettyChannel.removeChannelIfDisconnected(nettyChannel);
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(String.format("%s goaway", this));
+ if (logger.isDebugEnabled()) {
+ logger.debug("Connection:{} goaway", this);
}
}
}
@@ -272,7 +283,7 @@ public abstract class AbstractNettyConnectionClient extends
AbstractConnectionCl
try {
doConnect();
} catch (RemotingException e) {
- LOGGER.error(TRANSPORT_FAILED_RECONNECT, "", "", "Failed to
connect to server: " + getConnectAddress());
+ logger.error(TRANSPORT_FAILED_RECONNECT, "", "", "Failed to
connect to server: " + getConnectAddress());
}
}
@@ -349,36 +360,25 @@ public abstract class AbstractNettyConnectionClient
extends AbstractConnectionCl
}
AbstractNettyConnectionClient connectionClient =
AbstractNettyConnectionClient.this;
if (connectionClient.isClosed() || connectionClient.getCounter()
== 0) {
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(String.format(
- "%s aborted to reconnect. %s",
- connectionClient, future.cause().getMessage()));
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Connection:{} aborted to reconnect. {}",
+ connectionClient,
+ future.cause().getMessage());
}
return;
}
- if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(String.format(
- "%s is reconnecting, attempt=%d cause=%s",
- connectionClient, 0, future.cause().getMessage()));
+ if (logger.isDebugEnabled()) {
+ logger.debug(
+ "Connection:{} is reconnecting, attempt=0 cause={}",
+ connectionClient,
+ future.cause().getMessage());
}
// Notify the connection is unavailable.
disconnectedPromise.trySuccess(null);
- connectivityExecutor.schedule(
- () -> {
- try {
- connectionClient.doConnect();
- } catch (RemotingException e) {
- LOGGER.error(
- TRANSPORT_FAILED_RECONNECT,
- "",
- "",
- "Failed to connect to server: " +
getConnectAddress());
- }
- },
- reconnectDuration,
- TimeUnit.MILLISECONDS);
+ doReconnect();
}
}
}
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AddressUtils.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AddressUtils.java
index 85ffb29aa3..6746e339d3 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AddressUtils.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AddressUtils.java
@@ -16,7 +16,6 @@
*/
package org.apache.dubbo.remoting.transport.netty4;
-import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.rpc.model.FrameworkModel;
import java.net.InetSocketAddress;
@@ -25,11 +24,17 @@ import java.util.List;
import io.netty.channel.Channel;
+import static org.apache.dubbo.common.utils.NetUtils.toAddressString;
+
public final class AddressUtils {
private static final List<ChannelAddressAccessor> ACCESSORS =
FrameworkModel.defaultModel().getActivateExtensions(ChannelAddressAccessor.class);
+ private static final String LOCAL_ADDRESS_KEY = "NETTY_LOCAL_ADDRESS_KEY";
+ private static final String REMOTE_ADDRESS_KEY =
"NETTY_REMOTE_ADDRESS_KEY";
+ private static final String PROTOCOL_KEY = "NETTY_PROTOCOL_KEY";
+
private AddressUtils() {}
public static InetSocketAddress getRemoteAddress(Channel channel) {
@@ -54,35 +59,50 @@ public final class AddressUtils {
return (InetSocketAddress) channel.localAddress();
}
- public static String getRemoteAddressKey(Channel channel) {
- InetSocketAddress address;
+ static void initAddressIfNecessary(NettyChannel nettyChannel) {
+ Channel channel = nettyChannel.getNioChannel();
+ SocketAddress address = channel.localAddress();
+ if (address instanceof InetSocketAddress) {
+ return;
+ }
+
for (int i = 0, size = ACCESSORS.size(); i < size; i++) {
ChannelAddressAccessor accessor = ACCESSORS.get(i);
- address = accessor.getRemoteAddress(channel);
- if (address != null) {
- return accessor.getProtocol() + ' ' +
NetUtils.toAddressString(address);
+ InetSocketAddress localAddress = accessor.getLocalAddress(channel);
+ if (localAddress != null) {
+ nettyChannel.setAttribute(LOCAL_ADDRESS_KEY, localAddress);
+ nettyChannel.setAttribute(REMOTE_ADDRESS_KEY,
accessor.getRemoteAddress(channel));
+ nettyChannel.setAttribute(PROTOCOL_KEY,
accessor.getProtocol());
+ break;
}
}
- InetSocketAddress remoteAddress = (InetSocketAddress)
channel.remoteAddress();
- if (remoteAddress == null) {
+ }
+
+ static InetSocketAddress getLocalAddress(NettyChannel channel) {
+ InetSocketAddress address = (InetSocketAddress)
channel.getAttribute(LOCAL_ADDRESS_KEY);
+ return address == null ? (InetSocketAddress)
(channel.getNioChannel().localAddress()) : address;
+ }
+
+ static InetSocketAddress getRemoteAddress(NettyChannel channel) {
+ InetSocketAddress address = (InetSocketAddress)
channel.getAttribute(REMOTE_ADDRESS_KEY);
+ return address == null ? (InetSocketAddress)
(channel.getNioChannel().remoteAddress()) : address;
+ }
+
+ static String getLocalAddressKey(NettyChannel channel) {
+ InetSocketAddress address = getLocalAddress(channel);
+ if (address == null) {
return "UNKNOWN";
}
- return NetUtils.toAddressString(remoteAddress);
+ String protocol = (String) channel.getAttribute(PROTOCOL_KEY);
+ return protocol == null ? toAddressString(address) : protocol + ' ' +
toAddressString(address);
}
- public static String getLocalAddressKey(Channel channel) {
- InetSocketAddress address;
- for (int i = 0, size = ACCESSORS.size(); i < size; i++) {
- ChannelAddressAccessor accessor = ACCESSORS.get(i);
- address = accessor.getLocalAddress(channel);
- if (address != null) {
- return accessor.getProtocol() + ' ' +
NetUtils.toAddressString(address);
- }
- }
- SocketAddress localAddress = channel.localAddress();
- if (localAddress == null) {
+ static String getRemoteAddressKey(NettyChannel channel) {
+ InetSocketAddress address = getRemoteAddress(channel);
+ if (address == null) {
return "UNKNOWN";
}
- return NetUtils.toAddressString((InetSocketAddress) localAddress);
+ String protocol = (String) channel.getAttribute(PROTOCOL_KEY);
+ return protocol == null ? toAddressString(address) : protocol + ' ' +
toAddressString(address);
}
}
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java
index 8191cdfd48..715438a738 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannel.java
@@ -35,6 +35,7 @@ import org.apache.dubbo.rpc.model.FrameworkModel;
import java.net.InetSocketAddress;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -73,16 +74,16 @@ final class NettyChannel extends AbstractChannel {
private final Netty4BatchWriteQueue writeQueue;
- private Codec2 codec;
-
private final boolean encodeInIOThread;
+ private Codec2 codec;
+
/**
* The constructor of NettyChannel.
* It is private so NettyChannel usually create by {@link
NettyChannel#getOrAddChannel(Channel, URL, ChannelHandler)}
*
* @param channel netty channel
- * @param url
+ * @param url dubbo url
* @param handler dubbo handler that contain netty handler
*/
private NettyChannel(Channel channel, URL url, ChannelHandler handler) {
@@ -94,6 +95,7 @@ final class NettyChannel extends AbstractChannel {
this.writeQueue = Netty4BatchWriteQueue.createWriteQueue(channel);
this.codec = getChannelCodec(url);
this.encodeInIOThread = getUrl().getParameter(ENCODE_IN_IO_THREAD_KEY,
DEFAULT_ENCODE_IN_IO_THREAD);
+ AddressUtils.initAddressIfNecessary(this);
}
/**
@@ -101,9 +103,8 @@ final class NettyChannel extends AbstractChannel {
* Put netty channel into it if dubbo channel don't exist in the cache.
*
* @param ch netty channel
- * @param url
+ * @param url dubbo url
* @param handler dubbo handler that contain netty's handler
- * @return
*/
static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler
handler) {
if (ch == null) {
@@ -150,12 +151,20 @@ final class NettyChannel extends AbstractChannel {
@Override
public InetSocketAddress getLocalAddress() {
- return AddressUtils.getLocalAddress(channel);
+ return AddressUtils.getLocalAddress(this);
}
@Override
public InetSocketAddress getRemoteAddress() {
- return AddressUtils.getRemoteAddress(channel);
+ return AddressUtils.getRemoteAddress(this);
+ }
+
+ public String getLocalAddressKey() {
+ return AddressUtils.getLocalAddressKey(this);
+ }
+
+ public String getRemoteAddressKey() {
+ return AddressUtils.getRemoteAddressKey(this);
}
@Override
@@ -172,7 +181,7 @@ final class NettyChannel extends AbstractChannel {
}
/**
- * Send message by netty and whether to wait the completion of the send.
+ * Send message by netty and whether to wait the completion of the sending.
*
* @param message message that need send.
* @param sent whether to ack async-sent
@@ -193,23 +202,20 @@ final class NettyChannel extends AbstractChannel {
codec.encode(this, buffer, message);
outputMessage = buf;
}
- ChannelFuture future =
writeQueue.enqueue(outputMessage).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws
Exception {
- if (!(message instanceof Request)) {
+ ChannelFuture future =
writeQueue.enqueue(outputMessage).addListener((ChannelFutureListener) f -> {
+ if (!(message instanceof Request)) {
+ return;
+ }
+ ChannelHandler handler = getChannelHandler();
+ if (f.isSuccess()) {
+ handler.sent(NettyChannel.this, message);
+ } else {
+ Throwable t = f.cause();
+ if (t == null) {
return;
}
- ChannelHandler handler = getChannelHandler();
- if (future.isSuccess()) {
- handler.sent(NettyChannel.this, message);
- } else {
- Throwable t = future.cause();
- if (t == null) {
- return;
- }
- Response response = buildErrorResponse((Request)
message, t);
- handler.received(NettyChannel.this, response);
- }
+ Response response = buildErrorResponse((Request) message,
t);
+ handler.received(NettyChannel.this, response);
}
});
@@ -318,18 +324,7 @@ final class NettyChannel extends AbstractChannel {
return channel.equals(client.getNettyChannel());
}
- if (getClass() != obj.getClass()) {
- return false;
- }
- NettyChannel other = (NettyChannel) obj;
- if (channel == null) {
- if (other.channel != null) {
- return false;
- }
- } else if (!channel.equals(other.channel)) {
- return false;
- }
- return true;
+ return getClass() == obj.getClass() && Objects.equals(channel,
((NettyChannel) obj).channel);
}
@Override
@@ -359,6 +354,7 @@ final class NettyChannel extends AbstractChannel {
return response;
}
+ @SuppressWarnings("deprecation")
private static Codec2 getChannelCodec(URL url) {
String codecName = url.getParameter(Constants.CODEC_KEY);
if (StringUtils.isEmpty(codecName)) {
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannelHandler.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannelHandler.java
index 1c31861c5e..99ce684b15 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannelHandler.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyChannelHandler.java
@@ -56,8 +56,8 @@ public class NettyChannelHandler extends
ChannelInboundHandlerAdapter {
logger.info(
"The connection {} of {} -> {} is established.",
ch,
- AddressUtils.getRemoteAddressKey(ch),
- AddressUtils.getLocalAddressKey(ch));
+ channel.getRemoteAddressKey(),
+ channel.getLocalAddressKey());
}
}
}
@@ -71,12 +71,13 @@ public class NettyChannelHandler extends
ChannelInboundHandlerAdapter {
dubboChannels.remove(NetUtils.toAddressString((InetSocketAddress)
ch.remoteAddress()));
if (channel != null) {
handler.disconnected(channel);
+
if (logger.isInfoEnabled()) {
logger.info(
"The connection {} of {} -> {} is disconnected.",
ch,
- AddressUtils.getRemoteAddressKey(ch),
- AddressUtils.getLocalAddressKey(ch));
+ channel.getRemoteAddressKey(),
+ channel.getLocalAddressKey());
}
}
} finally {
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java
index d01a481049..bd4d89f5b7 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClient.java
@@ -20,8 +20,6 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.resource.GlobalResourceInitializer;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.StringUtils;
@@ -67,8 +65,6 @@ public class NettyClient extends AbstractClient {
private static final String DEFAULT_SOCKS_PROXY_PORT = "1080";
- private static final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(NettyClient.class);
-
/**
* netty client bootstrap
*/
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java
index ed77458cf0..449b39385a 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyClientHandler.java
@@ -58,12 +58,13 @@ public class NettyClientHandler extends
ChannelDuplexHandler {
Channel ch = ctx.channel();
NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
handler.connected(channel);
+
if (logger.isInfoEnabled()) {
logger.info(
"The connection {} of {} -> {} is established.",
ch,
- AddressUtils.getLocalAddressKey(ch),
- AddressUtils.getRemoteAddressKey(ch));
+ channel.getLocalAddressKey(),
+ channel.getRemoteAddressKey());
}
}
@@ -81,8 +82,8 @@ public class NettyClientHandler extends ChannelDuplexHandler {
logger.info(
"The connection {} of {} -> {} is disconnected.",
ch,
- AddressUtils.getLocalAddressKey(ch),
- AddressUtils.getRemoteAddressKey(ch));
+ channel.getLocalAddressKey(),
+ channel.getRemoteAddressKey());
}
}
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionHandler.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionHandler.java
index 68bffdc952..9c4d08c0ce 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionHandler.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionHandler.java
@@ -30,7 +30,6 @@ import io.netty.channel.EventLoop;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
-import static
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_RECONNECT;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_UNEXPECTED_EXCEPTION;
@Sharable
@@ -62,7 +61,7 @@ public class NettyConnectionHandler extends
ChannelInboundHandlerAdapter impleme
connectionClient.onGoaway(nettyChannel);
}
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(String.format("Channel %s go away ,schedule
reconnect", nettyChannel));
+ LOGGER.debug("Channel {} go away ,schedule reconnect",
nettyChannel);
}
reconnect(nettyChannel);
}
@@ -74,43 +73,30 @@ public class NettyConnectionHandler extends
ChannelInboundHandlerAdapter impleme
}
Channel nettyChannel = ((Channel) channel);
if (LOGGER.isDebugEnabled()) {
- LOGGER.debug(String.format("Connection %s is reconnecting,
attempt=%d", connectionClient, 1));
+ LOGGER.debug("Connection:{} is reconnecting, attempt={}",
connectionClient, 1);
}
EventLoop eventLoop = nettyChannel.eventLoop();
if (connectionClient.isClosed()) {
- LOGGER.info("The client has been closed and will not reconnect. ");
+ LOGGER.info("The connection {} has been closed and will not
reconnect", connectionClient);
return;
}
- eventLoop.schedule(
- () -> {
- try {
- connectionClient.doConnect();
- } catch (Throwable e) {
- LOGGER.error(
- TRANSPORT_FAILED_RECONNECT,
- "",
- "",
- "Fail to connect to " +
connectionClient.getChannel(),
- e);
- }
- },
- 1,
- TimeUnit.SECONDS);
+ eventLoop.schedule(connectionClient::doReconnect, 1, TimeUnit.SECONDS);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.fireChannelActive();
Channel ch = ctx.channel();
- NettyChannel.getOrAddChannel(ch, connectionClient.getUrl(),
connectionClient);
+ NettyChannel channel = NettyChannel.getOrAddChannel(ch,
connectionClient.getUrl(), connectionClient);
if (!connectionClient.isClosed()) {
connectionClient.onConnected(ch);
- if (LOGGER.isInfoEnabled()) {
+
+ if (LOGGER.isInfoEnabled() && channel != null) {
LOGGER.info(
"The connection {} of {} -> {} is established.",
ch,
- AddressUtils.getLocalAddressKey(ch),
- AddressUtils.getRemoteAddressKey(ch));
+ channel.getLocalAddressKey(),
+ channel.getRemoteAddressKey());
}
} else {
ctx.close();
@@ -121,16 +107,20 @@ public class NettyConnectionHandler extends
ChannelInboundHandlerAdapter impleme
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
Channel ch = ctx.channel();
+ NettyChannel channel = NettyChannel.getOrAddChannel(ch,
connectionClient.getUrl(), connectionClient);
try {
Attribute<Boolean> goawayAttr = ch.attr(GO_AWAY_KEY);
if (!Boolean.TRUE.equals(goawayAttr.get())) {
reconnect(ch);
}
- LOGGER.info(
- "The connection {} of {} -> {} is disconnected.",
- ch,
- AddressUtils.getLocalAddressKey(ch),
- AddressUtils.getRemoteAddressKey(ch));
+
+ if (LOGGER.isInfoEnabled() && channel != null) {
+ LOGGER.info(
+ "The connection {} of {} -> {} is disconnected.",
+ ch,
+ channel.getLocalAddressKey(),
+ channel.getRemoteAddressKey());
+ }
} finally {
NettyChannel.removeChannel(ch);
}
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java
index 4af9649fc7..5786f3a9db 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServer.java
@@ -18,8 +18,6 @@ package org.apache.dubbo.remoting.transport.netty4;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.ConfigurationUtils;
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.Channel;
@@ -59,9 +57,6 @@ import static
org.apache.dubbo.remoting.Constants.EVENT_LOOP_WORKER_POOL_NAME;
*/
public class NettyPortUnificationServer extends AbstractPortUnificationServer {
- private static final ErrorTypeAwareLogger logger =
-
LoggerFactory.getErrorTypeAwareLogger(NettyPortUnificationServer.class);
-
private final int serverShutdownTimeoutMills;
/**
* netty server bootstrap.
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java
index 6de1969cf1..504931c90f 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServer.java
@@ -18,8 +18,6 @@ package org.apache.dubbo.remoting.transport.netty4;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.ConfigurationUtils;
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.ClassUtils;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.NetUtils;
@@ -64,7 +62,6 @@ import static
org.apache.dubbo.remoting.Constants.EVENT_LOOP_WORKER_POOL_NAME;
*/
public class NettyServer extends AbstractServer {
- private static final ErrorTypeAwareLogger logger =
LoggerFactory.getErrorTypeAwareLogger(NettyServer.class);
/**
* the cache for alive worker channel.
* <ip:port, dubbo channel>
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java
index c9987dcdb1..c0eac5df97 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyServerHandler.java
@@ -75,8 +75,8 @@ public class NettyServerHandler extends ChannelDuplexHandler {
logger.info(
"The connection {} of {} -> {} is established.",
ch,
- AddressUtils.getRemoteAddressKey(ch),
- AddressUtils.getLocalAddressKey(ch));
+ channel.getLocalAddressKey(),
+ channel.getRemoteAddressKey());
}
}
@@ -95,8 +95,8 @@ public class NettyServerHandler extends ChannelDuplexHandler {
logger.info(
"The connection {} of {} -> {} is disconnected.",
ch,
- AddressUtils.getRemoteAddressKey(ch),
- AddressUtils.getLocalAddressKey(ch));
+ channel.getRemoteAddressKey(),
+ channel.getLocalAddressKey());
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http3Exchanger.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http3Exchanger.java
index 247fe70dc8..3394a81295 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http3Exchanger.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http3Exchanger.java
@@ -19,25 +19,42 @@ package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.config.Configuration;
import org.apache.dubbo.common.constants.LoggerCodeConstants;
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
-import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.logger.FluentLogger;
import org.apache.dubbo.common.utils.ClassUtils;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.RemotingServer;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
+import org.apache.dubbo.remoting.http12.netty4.HttpWriteQueueHandler;
+import org.apache.dubbo.remoting.http3.netty4.NettyHttp3FrameCodec;
+import
org.apache.dubbo.remoting.http3.netty4.NettyHttp3ProtocolSelectorHandler;
import org.apache.dubbo.remoting.transport.ChannelHandlerAdapter;
import org.apache.dubbo.remoting.transport.netty4.NettyHttp3Server;
+import org.apache.dubbo.remoting.utils.UrlUtils;
import org.apache.dubbo.rpc.Constants;
+import org.apache.dubbo.rpc.model.ScopeModelUtil;
+import org.apache.dubbo.rpc.protocol.tri.h3.Http3ClientFrameCodec;
+import org.apache.dubbo.rpc.protocol.tri.h3.Http3TripleServerConnectionHandler;
import org.apache.dubbo.rpc.protocol.tri.h3.negotiation.Helper;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.handler.flush.FlushConsolidationHandler;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.incubator.codec.http3.Http3ServerConnectionHandler;
+import io.netty.incubator.codec.quic.QuicStreamChannel;
+
+import static
org.apache.dubbo.remoting.http3.netty4.Constants.PIPELINE_CONFIGURATOR_KEY;
public final class Http3Exchanger {
- private static final ErrorTypeAwareLogger LOGGER =
LoggerFactory.getErrorTypeAwareLogger(Http3Exchanger.class);
+ private static final FluentLogger LOGGER =
FluentLogger.of(Http3Exchanger.class);
private static final boolean HAS_NETTY_HTTP3 =
ClassUtils.isPresent("io.netty.incubator.codec.http3.Http3");
private static final Map<String, RemotingServer> SERVERS = new
ConcurrentHashMap<>();
private static final Map<String, AbstractConnectionClient> CLIENTS = new
ConcurrentHashMap<>(16);
@@ -65,7 +82,8 @@ public final class Http3Exchanger {
if (isEnabled(url)) {
return SERVERS.computeIfAbsent(url.getAddress(), addr -> {
try {
- return new NettyHttp3Server(url, HANDLER);
+ URL serverUrl =
url.putAttribute(PIPELINE_CONFIGURATOR_KEY, configServerPipeline(url));
+ return new NettyHttp3Server(serverUrl, HANDLER);
} catch (RemotingException e) {
throw new RuntimeException(e);
}
@@ -74,12 +92,31 @@ public final class Http3Exchanger {
return null;
}
+ private static Consumer<ChannelPipeline> configServerPipeline(URL url) {
+ NettyHttp3ProtocolSelectorHandler selectorHandler =
+ new NettyHttp3ProtocolSelectorHandler(url,
ScopeModelUtil.getFrameworkModel(url.getScopeModel()));
+ return pipeline -> {
+ pipeline.addLast(new Http3ServerConnectionHandler(new
ChannelInitializer<QuicStreamChannel>() {
+ @Override
+ protected void initChannel(QuicStreamChannel ch) {
+ ch.pipeline()
+ .addLast(new HttpWriteQueueHandler())
+ .addLast(new FlushConsolidationHandler(64, true))
+ .addLast(NettyHttp3FrameCodec.INSTANCE)
+ .addLast(selectorHandler);
+ }
+ }));
+ pipeline.addLast(new Http3TripleServerConnectionHandler());
+ };
+ }
+
public static AbstractConnectionClient connect(URL url) {
return CLIENTS.compute(url.getAddress(), (address, client) -> {
if (client == null) {
+ URL clientUrl = url.putAttribute(PIPELINE_CONFIGURATOR_KEY,
configClientPipeline(url));
AbstractConnectionClient connectionClient = NEGOTIATION_ENABLED
- ? Helper.createAutoSwitchClient(url, HANDLER)
- : Helper.createHttp3Client(url, HANDLER);
+ ? Helper.createAutoSwitchClient(clientUrl, HANDLER)
+ : Helper.createHttp3Client(clientUrl, HANDLER);
connectionClient.addCloseListener(() ->
CLIENTS.remove(address, connectionClient));
client = connectionClient;
} else {
@@ -89,6 +126,16 @@ public final class Http3Exchanger {
});
}
+ private static Consumer<ChannelPipeline> configClientPipeline(URL url) {
+ int heartbeat = UrlUtils.getHeartbeat(url);
+ int closeTimeout = UrlUtils.getCloseTimeout(url);
+ return pipeline -> {
+ pipeline.addLast(Http3ClientFrameCodec.INSTANCE);
+ pipeline.addLast(new IdleStateHandler(heartbeat, 0, 0,
TimeUnit.MILLISECONDS));
+ pipeline.addLast(new TriplePingPongHandler(closeTimeout));
+ };
+ }
+
public static void close() {
if (SERVERS.isEmpty()) {
return;
@@ -99,7 +146,7 @@ public final class Http3Exchanger {
try {
server.close();
} catch (Throwable t) {
- LOGGER.error(LoggerCodeConstants.PROTOCOL_ERROR_CLOSE_SERVER,
"", "", "Close Http3 server failed", t);
+ LOGGER.error(LoggerCodeConstants.PROTOCOL_ERROR_CLOSE_SERVER,
"Close http3 server failed", t);
}
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientFrameCodec.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientFrameCodec.java
index 11ef9da285..da4e9cd9e9 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientFrameCodec.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientFrameCodec.java
@@ -16,6 +16,10 @@
*/
package org.apache.dubbo.rpc.protocol.tri.h3;
+import org.apache.dubbo.common.logger.FluentLogger;
+import org.apache.dubbo.remoting.http12.HttpConstants;
+import org.apache.dubbo.remoting.http12.HttpMethods;
+import org.apache.dubbo.remoting.http3.netty4.Constants;
import org.apache.dubbo.remoting.http3.netty4.Http2HeadersAdapter;
import org.apache.dubbo.remoting.http3.netty4.Http3HeadersAdapter;
import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
@@ -24,33 +28,49 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2GoAwayFrame;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
+import io.netty.handler.codec.http2.DefaultHttp2PingFrame;
import io.netty.handler.codec.http2.DefaultHttp2ResetFrame;
import io.netty.handler.codec.http2.Http2DataFrame;
+import io.netty.handler.codec.http2.Http2Headers.PseudoHeaderName;
import io.netty.handler.codec.http2.Http2HeadersFrame;
+import io.netty.handler.codec.http2.Http2PingFrame;
import io.netty.incubator.codec.http3.DefaultHttp3DataFrame;
+import io.netty.incubator.codec.http3.DefaultHttp3Headers;
import io.netty.incubator.codec.http3.DefaultHttp3HeadersFrame;
+import io.netty.incubator.codec.http3.Http3;
import io.netty.incubator.codec.http3.Http3DataFrame;
import io.netty.incubator.codec.http3.Http3ErrorCode;
import io.netty.incubator.codec.http3.Http3Exception;
import io.netty.incubator.codec.http3.Http3GoAwayFrame;
+import io.netty.incubator.codec.http3.Http3Headers;
import io.netty.incubator.codec.http3.Http3HeadersFrame;
+import io.netty.incubator.codec.http3.Http3RequestStreamInitializer;
+import io.netty.incubator.codec.quic.QuicChannel;
import io.netty.incubator.codec.quic.QuicStreamChannel;
+import static
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_FAILED_RECONNECT;
+
@Sharable
public class Http3ClientFrameCodec extends ChannelDuplexHandler {
+ private static final FluentLogger LOGGER =
FluentLogger.of(Http3ClientFrameCodec.class);
public static final Http3ClientFrameCodec INSTANCE = new
Http3ClientFrameCodec();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (msg instanceof Http3HeadersFrame) {
- Http2HeadersAdapter headers = new
Http2HeadersAdapter(((Http3HeadersFrame) msg).headers());
- boolean endStream =
headers.contains(TripleHeaderEnum.STATUS_KEY.getKey());
- ctx.fireChannelRead(new DefaultHttp2HeadersFrame(headers,
endStream));
+ Http3Headers headers = ((Http3HeadersFrame) msg).headers();
+ if (headers.contains(Constants.TRI_PING)) {
+ pingAck(ctx);
+ } else {
+ boolean endStream =
headers.contains(TripleHeaderEnum.STATUS_KEY.getKey());
+ ctx.fireChannelRead(new DefaultHttp2HeadersFrame(new
Http2HeadersAdapter(headers), endStream));
+ }
} else if (msg instanceof Http3DataFrame) {
ctx.fireChannelRead(new DefaultHttp2DataFrame(((Http3DataFrame)
msg).content()));
} else if (msg instanceof Http3GoAwayFrame) {
@@ -60,9 +80,19 @@ public class Http3ClientFrameCodec extends
ChannelDuplexHandler {
}
}
+ private void pingAck(ChannelHandlerContext ctx) {
+ ChannelPipeline pipeline = ctx.channel().parent().pipeline();
+ pipeline.fireChannelRead(new DefaultHttp2PingFrame(0, true));
+ pipeline.fireChannelReadComplete();
+ }
+
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
- ctx.fireChannelRead(new DefaultHttp2DataFrame(Unpooled.EMPTY_BUFFER,
true));
+ if (ctx instanceof QuicStreamChannel) {
+ ctx.fireChannelRead(new
DefaultHttp2DataFrame(Unpooled.EMPTY_BUFFER, true));
+ } else {
+ ctx.fireChannelReadComplete();
+ }
}
@Override
@@ -80,11 +110,38 @@ public class Http3ClientFrameCodec extends
ChannelDuplexHandler {
return;
}
ctx.write(new DefaultHttp3DataFrame(frame.content()), promise);
+ } else if (msg instanceof Http2PingFrame) {
+ sendPing((QuicChannel) ctx.channel());
} else {
ctx.write(msg, promise);
}
}
+ private void sendPing(QuicChannel channel) {
+ Http3.newRequestStream(channel, new Http3RequestStreamInitializer() {
+ @Override
+ protected void initRequestStream(QuicStreamChannel ch) {
+ ch.pipeline().addLast(INSTANCE);
+ }
+ })
+ .addListener(future -> {
+ if (future.isSuccess()) {
+ QuicStreamChannel streamChannel = (QuicStreamChannel)
future.getNow();
+
+ Http3Headers header = new DefaultHttp3Headers(false);
+ header.set(PseudoHeaderName.METHOD.value(),
HttpMethods.OPTIONS.name());
+ header.set(PseudoHeaderName.PATH.value(), "*");
+ header.set(PseudoHeaderName.SCHEME.value(),
HttpConstants.HTTPS);
+ header.set(Constants.TRI_PING, "0");
+
+ streamChannel.write(new
DefaultHttp3HeadersFrame(header));
+ streamChannel.shutdownOutput();
+ } else {
+ LOGGER.warn(TRANSPORT_FAILED_RECONNECT, "Failed to
send ping frame", future.cause());
+ }
+ });
+ }
+
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
if (cause instanceof Http3Exception) {
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3TripleServerConnectionHandler.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3TripleServerConnectionHandler.java
new file mode 100644
index 0000000000..afcd19e48c
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3TripleServerConnectionHandler.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri.h3;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPromise;
+import io.netty.incubator.codec.http3.Http3GoAwayFrame;
+import io.netty.util.ReferenceCountUtil;
+
+public class Http3TripleServerConnectionHandler extends ChannelDuplexHandler {
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
+ if (msg instanceof Http3GoAwayFrame) {
+ ReferenceCountUtil.release(msg);
+ return;
+ }
+ super.channelRead(ctx, msg);
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ super.channelInactive(ctx);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
+ super.exceptionCaught(ctx, cause);
+ }
+
+ @Override
+ public void close(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
+ super.close(ctx, promise);
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/AutoSwitchConnectionClient.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/AutoSwitchConnectionClient.java
index baffe375a1..2431cb9b46 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/AutoSwitchConnectionClient.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/AutoSwitchConnectionClient.java
@@ -18,7 +18,6 @@ package org.apache.dubbo.rpc.protocol.tri.h3.negotiation;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
-import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.utils.ClassUtils;
import org.apache.dubbo.common.utils.NamedThreadFactory;
import org.apache.dubbo.common.utils.NetUtils;
@@ -33,11 +32,9 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_ERROR_CLOSE_CLIENT;
-import static
org.apache.dubbo.common.logger.LoggerFactory.getErrorTypeAwareLogger;
public class AutoSwitchConnectionClient extends AbstractConnectionClient {
- private static final ErrorTypeAwareLogger LOGGER =
getErrorTypeAwareLogger(AutoSwitchConnectionClient.class);
private static final int MAX_RETRIES = 8;
private final URL url;
@@ -58,8 +55,8 @@ public class AutoSwitchConnectionClient extends
AbstractConnectionClient {
ClassLoader tccl = Thread.currentThread().getContextClassLoader();
connectionClient.addConnectedListener(() -> ClassUtils.runWith(tccl,
() -> executor.execute(this::negotiate)));
increase();
- if (LOGGER.isInfoEnabled()) {
- LOGGER.info(
+ if (logger.isInfoEnabled()) {
+ logger.info(
"Start HTTP/3 AutoSwitchConnectionClient {} connect to the
server {}",
NetUtils.getLocalAddress(),
url.toInetSocketAddress());
@@ -80,14 +77,14 @@ public class AutoSwitchConnectionClient extends
AbstractConnectionClient {
if (clientCall == null) {
clientCall = new NegotiateClientCall(connectionClient, executor);
}
- LOGGER.info("Start HTTP/3 negotiation for [{}]", getBaseUrl());
+ logger.info("Start HTTP/3 negotiation for [{}]", getBaseUrl());
clientCall.start(url).whenComplete((altSvc, t) -> {
if (t == null) {
if (altSvc.contains("h3=")) {
negotiateSuccess();
return;
}
- LOGGER.info(
+ logger.info(
"HTTP/3 negotiation succeed, but provider reply
alt-svc='{}' not support HTTP/3 for [{}]",
altSvc,
getBaseUrl());
@@ -101,7 +98,7 @@ public class AutoSwitchConnectionClient extends
AbstractConnectionClient {
private void negotiateSuccess() {
negotiated = true;
- LOGGER.info("HTTP/3 negotiation succeed for [{}], create http3
client", getBaseUrl());
+ logger.info("HTTP/3 negotiation succeed for [{}], create http3
client", getBaseUrl());
http3ConnectionClient = Helper.createHttp3Client(url,
connectionClient.getDelegateHandler());
http3ConnectionClient.addConnectedListener(() ->
setHttp3Connected(true));
http3ConnectionClient.addDisconnectedListener(() ->
setHttp3Connected(false));
@@ -111,11 +108,11 @@ public class AutoSwitchConnectionClient extends
AbstractConnectionClient {
private void reScheduleNegotiate(Throwable t) {
if (attempt++ < MAX_RETRIES) {
int delay = 1 << attempt + 2;
- LOGGER.info("HTTP/3 negotiation failed, retry after {} seconds for
[{}]", delay, getBaseUrl(), t);
+ logger.info("HTTP/3 negotiation failed, retry after {} seconds for
[{}]", delay, getBaseUrl(), t);
executor.schedule(this::negotiate, delay, TimeUnit.SECONDS);
return;
}
- LOGGER.warn(
+ logger.warn(
PROTOCOL_ERROR_CLOSE_CLIENT,
"",
"",
@@ -133,7 +130,7 @@ public class AutoSwitchConnectionClient extends
AbstractConnectionClient {
private void setHttp3Connected(boolean http3Connected) {
this.http3Connected = http3Connected;
- LOGGER.info("Switch protocol to {} for [{}]", http3Connected ?
"HTTP/3" : "HTTP/2", url.toString(""));
+ logger.info("Switch protocol to {} for [{}]", http3Connected ?
"HTTP/3" : "HTTP/2", url.toString(""));
}
public boolean isHttp3Connected() {
@@ -155,13 +152,13 @@ public class AutoSwitchConnectionClient extends
AbstractConnectionClient {
try {
connectionClient.release();
} catch (Throwable t) {
- LOGGER.warn(PROTOCOL_ERROR_CLOSE_CLIENT, "", "", t.getMessage(),
t);
+ logger.warn(PROTOCOL_ERROR_CLOSE_CLIENT, "", "", t.getMessage(),
t);
}
if (http3ConnectionClient != null) {
try {
http3ConnectionClient.release();
} catch (Throwable t) {
- LOGGER.warn(PROTOCOL_ERROR_CLOSE_CLIENT, "", "",
t.getMessage(), t);
+ logger.warn(PROTOCOL_ERROR_CLOSE_CLIENT, "", "",
t.getMessage(), t);
}
}
return true;
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp3ProtocolTest.java
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp3ProtocolTest.java
index 342825c3d3..b53fc27f35 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp3ProtocolTest.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/test/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp3ProtocolTest.java
@@ -108,7 +108,7 @@ class TripleHttp3ProtocolTest {
MockStreamObserver outboundMessageSubscriber1 = new
MockStreamObserver();
greeterProxy.serverStream(REQUEST_MSG, outboundMessageSubscriber1);
outboundMessageSubscriber1.getLatch().await(3000,
TimeUnit.MILLISECONDS);
- Assertions.assertEquals(outboundMessageSubscriber1.getOnNextData(),
REQUEST_MSG);
+ Assertions.assertEquals(REQUEST_MSG,
outboundMessageSubscriber1.getOnNextData());
Assertions.assertTrue(outboundMessageSubscriber1.isOnCompleted());
// 3. test bidirectionalStream
@@ -118,7 +118,7 @@ class TripleHttp3ProtocolTest {
inboundMessageObserver.onCompleted();
outboundMessageSubscriber2.getLatch().await(3000,
TimeUnit.MILLISECONDS);
// verify client
- Assertions.assertEquals(outboundMessageSubscriber2.getOnNextData(),
IGreeter.SERVER_MSG);
+ Assertions.assertEquals(IGreeter.SERVER_MSG,
outboundMessageSubscriber2.getOnNextData());
Assertions.assertTrue(outboundMessageSubscriber2.isOnCompleted());
// verify server
MockStreamObserver serverOutboundMessageSubscriber =
(MockStreamObserver) serviceImpl.getMockStreamObserver();