This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new 3beeebe2cf HTTP/3 protocol negotiation on the consumer side (#14489)
3beeebe2cf is described below
commit 3beeebe2cf709fcaade0a51d915dd4392efda462
Author: Sean Yang <[email protected]>
AuthorDate: Wed Aug 21 17:40:11 2024 +0800
HTTP/3 protocol negotiation on the consumer side (#14489)
* HTTP/3 protocol negotiation on the consumer side
* Fix missing return in ClassUtils
---
.../dubbo/common/extension/ExtensionAccessor.java | 6 +
.../org/apache/dubbo/common/utils/ClassUtils.java | 44 ++++
.../java/org/apache/dubbo/common/utils/Pair.java | 4 +-
.../apache/dubbo/config/nested/Http3Config.java | 15 ++
.../api/connection/AbstractConnectionClient.java | 21 +-
.../dubbo/remoting/transport/AbstractClient.java | 5 +
.../dubbo/remoting/transport/AbstractEndpoint.java | 2 +
.../dubbo/remoting/transport/AbstractPeer.java | 4 +
.../http12/AbstractServerHttpChannelObserver.java | 2 +-
.../remoting/http12/CompositeInputStream.java | 4 +-
.../remoting/http12/h2/Http2ChannelDelegate.java | 5 +
.../netty4/AbstractNettyConnectionClient.java | 102 ++++++---
.../remoting/transport/netty4/AddressUtils.java | 8 +-
.../transport/netty4/NettyConnectionClient.java | 2 +-
.../netty4/NettyPortUnificationServerHandler.java | 3 +-
.../main/java/org/apache/dubbo/rpc/Constants.java | 1 +
.../dubbo/rpc/protocol/tri}/Http3Exchanger.java | 66 ++++--
.../dubbo/rpc/protocol/tri/ServletExchanger.java | 12 +
.../dubbo/rpc/protocol/tri/TripleConstant.java | 1 -
.../dubbo/rpc/protocol/tri/TripleProtocol.java | 36 +--
.../tri/h12/AbstractServerTransportListener.java | 5 +
.../DefaultHttp11ServerTransportListener.java | 4 +-
.../http2/GenericHttp2ServerTransportListener.java | 4 +-
.../tri/h12/http2/Http2ClientStreamFactory.java | 5 +-
.../protocol/tri/h3/Http3ClientStreamFactory.java | 4 +-
.../AdaptiveClientStreamFactory.java} | 18 +-
.../h3/negotiation/AutoSwitchConnectionClient.java | 250 +++++++++++++++++++++
.../rpc/protocol/tri/h3/negotiation/Helper.java | 43 ++++
.../tri/h3/negotiation/NegotiateClientCall.java | 134 +++++++++++
.../tri/rest/argument/GeneralTypeConverter.java | 3 +-
.../rest/filter/RestExtensionExecutionFilter.java | 4 +-
.../rpc/protocol/tri/rest/mapping/RadixTree.java | 2 +-
.../rest/mapping/RestRequestHandlerMapping.java | 8 +-
.../tri/rest/mapping/meta/AnnotationEnum.java | 4 +-
.../rpc/protocol/tri/rest/util/TypeUtils.java | 21 +-
...bbo.rpc.protocol.tri.stream.ClientStreamFactory | 1 +
.../dubbo-seata-spring-boot-starter/pom.xml | 6 +
37 files changed, 716 insertions(+), 143 deletions(-)
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionAccessor.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionAccessor.java
index c9ed110845..5abade6f70 100644
---
a/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionAccessor.java
+++
b/dubbo-common/src/main/java/org/apache/dubbo/common/extension/ExtensionAccessor.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.common.extension;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
/**
* Uniform accessor for extension
@@ -61,4 +62,9 @@ public interface ExtensionAccessor {
}
return extensions.get(0);
}
+
+ default Set<String> getSupportedExtensions(Class<?> type) {
+ ExtensionLoader<?> extensionLoader = getExtensionLoader(type);
+ return extensionLoader != null ?
extensionLoader.getSupportedExtensions() : Collections.emptySet();
+ }
}
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ClassUtils.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ClassUtils.java
index 6c0656dae6..52cb2782fd 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ClassUtils.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/ClassUtils.java
@@ -492,6 +492,50 @@ public class ClassUtils {
return true;
}
+ /**
+ * Test the specified class name is present, array class is not supported
+ */
+ public static boolean isPresent(String className) {
+ try {
+ loadClass(className);
+ return true;
+ } catch (Throwable ignored) {
+ return false;
+ }
+ }
+
+ /**
+ * Load the {@link Class} by the specified name, array class is not
supported
+ */
+ public static Class<?> loadClass(String className) throws
ClassNotFoundException {
+ ClassLoader cl = null;
+ if (!className.startsWith("org.apache.dubbo")) {
+ try {
+ cl = Thread.currentThread().getContextClassLoader();
+ } catch (Throwable ignored) {
+ }
+ }
+ if (cl == null) {
+ cl = ClassUtils.class.getClassLoader();
+ }
+ return cl.loadClass(className);
+ }
+
+ public static void runWith(ClassLoader classLoader, Runnable runnable) {
+ Thread thread = Thread.currentThread();
+ ClassLoader tccl = thread.getContextClassLoader();
+ if (classLoader == null || classLoader.equals(tccl)) {
+ runnable.run();
+ return;
+ }
+ thread.setContextClassLoader(classLoader);
+ try {
+ runnable.run();
+ } finally {
+ thread.setContextClassLoader(tccl);
+ }
+ }
+
/**
* Resolve the {@link Class} by the specified name and {@link ClassLoader}
*
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/Pair.java
b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/Pair.java
index d719ff3061..af1177bffa 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/Pair.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/Pair.java
@@ -32,8 +32,8 @@ public final class Pair<L, R> implements Map.Entry<L, R>,
Comparable<Pair<L, R>>
@SuppressWarnings("rawtypes")
private static final Pair NULL = new Pair<>(null, null);
- public final L left;
- public final R right;
+ private final L left;
+ private final R right;
public static <L, R> Pair<L, R> of(L left, R right) {
return left == null && right == null ? nullPair() : new Pair<>(left,
right);
diff --git
a/dubbo-common/src/main/java/org/apache/dubbo/config/nested/Http3Config.java
b/dubbo-common/src/main/java/org/apache/dubbo/config/nested/Http3Config.java
index 1d6a87767f..03a05012d1 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/nested/Http3Config.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/nested/Http3Config.java
@@ -28,6 +28,13 @@ public class Http3Config implements Serializable {
*/
private Boolean enabled;
+ /**
+ * Whether to enable HTTP/3 negotiation
+ * If set to false, HTTP/2 alt-svc negotiation will be skipped, enabling
HTTP/3 but disabling HTTP/2 on the consumer side.
+ * <p>The default value is true.
+ */
+ private Boolean negotiation;
+
/**
* See <a
href="https://docs.rs/quiche/0.6.0/quiche/struct.Config.html#method.set_initial_max_data">set_initial_max_data</a>.
* <p>The default value is 8MiB.
@@ -123,6 +130,14 @@ public class Http3Config implements Serializable {
this.enabled = enabled;
}
+ public Boolean getNegotiation() {
+ return negotiation;
+ }
+
+ public void setNegotiation(Boolean negotiation) {
+ this.negotiation = negotiation;
+ }
+
public Integer getInitialMaxData() {
return initialMaxData;
}
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java
index 052b50d0ec..80aa710a28 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/api/connection/AbstractConnectionClient.java
@@ -52,6 +52,8 @@ public abstract class AbstractConnectionClient extends
AbstractClient {
super(url, handler);
}
+ protected AbstractConnectionClient() {}
+
public final void increase() {
COUNTER_UPDATER.set(this, 1L);
}
@@ -71,7 +73,7 @@ public abstract class AbstractConnectionClient extends
AbstractClient {
/**
* Decreases the reference count by 1 and calls {@link this#destroy} if
the reference count reaches 0.
*/
- public final boolean release() {
+ public boolean release() {
long remainingCount = COUNTER_UPDATER.decrementAndGet(this);
if (remainingCount == 0) {
@@ -98,12 +100,21 @@ public abstract class AbstractConnectionClient extends
AbstractClient {
public abstract boolean isAvailable();
/**
- * create connecting promise.
+ * add a listener that will be executed when a connection is established.
+ *
+ * @param func execute function
+ */
+ public abstract void addConnectedListener(Runnable func);
+
+ /**
+ * Add a listener that will be executed when the connection is
disconnected.
+ *
+ * @param func execute function
*/
- public abstract void createConnectingPromise();
+ public abstract void addDisconnectedListener(Runnable func);
/**
- * add the listener of close connection event.
+ * add a listener that will be executed when the connection is closed.
*
* @param func execute function
*/
@@ -135,7 +146,7 @@ public abstract class AbstractConnectionClient extends
AbstractClient {
* @param generalizable generalizable
* @return Dubbo Channel or NIOChannel such as NettyChannel
*/
- public abstract Object getChannel(Boolean generalizable);
+ public abstract <T> T getChannel(Boolean generalizable);
/**
* Get counter
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
index 14a5653cca..c26d9352f4 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractClient.java
@@ -140,6 +140,11 @@ public abstract class AbstractClient extends
AbstractEndpoint implements Client
}
}
+ protected AbstractClient() {
+ needReconnect = false;
+ frameworkModel = null;
+ }
+
private void initExecutor(URL url) {
ExecutorRepository executorRepository =
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel());
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java
index f8ab69fdcd..cc199265d6 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java
@@ -49,6 +49,8 @@ public abstract class AbstractEndpoint extends AbstractPeer
implements Resetable
url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY,
Constants.DEFAULT_CONNECT_TIMEOUT);
}
+ protected AbstractEndpoint() {}
+
protected static Codec2 getChannelCodec(URL url) {
String codecName = url.getParameter(Constants.CODEC_KEY);
if (StringUtils.isEmpty(codecName)) {
diff --git
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractPeer.java
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractPeer.java
index 9ded5052ad..a7a4edb7cc 100644
---
a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractPeer.java
+++
b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractPeer.java
@@ -48,6 +48,10 @@ public abstract class AbstractPeer implements Endpoint,
ChannelHandler {
this.handler = handler;
}
+ protected AbstractPeer() {
+ handler = null;
+ }
+
@Override
public void send(Object message) throws RemotingException {
send(message, url.getParameter(Constants.SENT_KEY, false));
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
index d9d563e3af..8341732672 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
@@ -225,7 +225,7 @@ public abstract class AbstractServerHttpChannelObserver
implements CustomizableH
}
if (LOGGER.isDebugEnabled()) {
try {
- LOGGER.debug("Http response body is: '{}'",
JsonUtils.toJson(data));
+ LOGGER.debug("Http response body sent: '{}' by [{}]",
JsonUtils.toJson(data), httpChannel);
} catch (Throwable ignored) {
}
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/CompositeInputStream.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/CompositeInputStream.java
index 58c069e52e..7832d0e19f 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/CompositeInputStream.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/CompositeInputStream.java
@@ -32,9 +32,9 @@ public class CompositeInputStream extends InputStream {
private int readIndex = 0;
public void addInputStream(InputStream inputStream) {
- this.inputStreams.offer(inputStream);
+ inputStreams.offer(inputStream);
try {
- this.totalAvailable += inputStream.available();
+ totalAvailable += inputStream.available();
} catch (IOException e) {
throw new DecodeException(e);
}
diff --git
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ChannelDelegate.java
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ChannelDelegate.java
index 994ceaee96..e3c489ed5e 100644
---
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ChannelDelegate.java
+++
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h2/Http2ChannelDelegate.java
@@ -68,4 +68,9 @@ public class Http2ChannelDelegate implements H2StreamChannel {
public Http2OutputMessage newOutputMessage(boolean endStream) {
return h2StreamChannel.newOutputMessage(endStream);
}
+
+ @Override
+ public String toString() {
+ return "Http2ChannelDelegate{" + "h2StreamChannel=" + h2StreamChannel
+ '}';
+ }
}
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AbstractNettyConnectionClient.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AbstractNettyConnectionClient.java
index 4446df5606..a9f983f56d 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AbstractNettyConnectionClient.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AbstractNettyConnectionClient.java
@@ -48,11 +48,15 @@ public abstract class AbstractNettyConnectionClient extends
AbstractConnectionCl
private static final ErrorTypeAwareLogger LOGGER =
LoggerFactory.getErrorTypeAwareLogger(AbstractNettyConnectionClient.class);
- private AtomicReference<Promise<Object>> connectingPromise;
+ private AtomicReference<Promise<Object>> connectingPromiseRef;
- private Promise<Void> closePromise;
+ private AtomicReference<io.netty.channel.Channel> channelRef;
+
+ private Promise<Void> connectedPromise;
+
+ private Promise<Void> disconnectedPromise;
- private AtomicReference<io.netty.channel.Channel> channel;
+ private Promise<Void> closePromise;
private AtomicBoolean isReconnecting;
@@ -72,14 +76,16 @@ public abstract class AbstractNettyConnectionClient extends
AbstractConnectionCl
@Override
protected void initConnectionClient() {
- this.remote = getConnectAddress();
- this.connectingPromise = new AtomicReference<>();
- this.connectionListener = new ConnectionListener();
- this.channel = new AtomicReference<>();
- this.isReconnecting = new AtomicBoolean(false);
- this.closePromise = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
- this.init = new AtomicBoolean(false);
- this.increase();
+ remote = getConnectAddress();
+ init = new AtomicBoolean(false);
+ connectingPromiseRef = new AtomicReference<>();
+ channelRef = new AtomicReference<>();
+ connectedPromise = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
+ disconnectedPromise = new
DefaultPromise<>(GlobalEventExecutor.INSTANCE);
+ closePromise = new DefaultPromise<>(GlobalEventExecutor.INSTANCE);
+ isReconnecting = new AtomicBoolean(false);
+ connectionListener = new ConnectionListener();
+ increase();
}
protected abstract void initBootstrap() throws Exception;
@@ -115,21 +121,22 @@ public abstract class AbstractNettyConnectionClient
extends AbstractConnectionCl
LOGGER.debug(String.format("%s aborted to reconnect cause
connection closed. ", this));
}
}
+
init.compareAndSet(false, true);
long start = System.currentTimeMillis();
- createConnectingPromise();
- Future<Void> promise = performConnect();
-
- promise.addListener(connectionListener);
+ Promise<Object> connectingPromise = getOrCreateConnectingPromise();
+ Future<Void> connectPromise = performConnect();
+ connectPromise.addListener(connectionListener);
- boolean ret =
connectingPromise.get().awaitUninterruptibly(getConnectTimeout(),
TimeUnit.MILLISECONDS);
+ boolean ret =
connectingPromise.awaitUninterruptibly(getConnectTimeout(),
TimeUnit.MILLISECONDS);
// destroy connectingPromise after used
synchronized (this) {
- connectingPromise.set(null);
+ connectingPromiseRef.set(null);
}
- if (promise.cause() != null) {
- Throwable cause = promise.cause();
+
+ if (connectPromise.cause() != null) {
+ Throwable cause = connectPromise.cause();
// 6-1 Failed to connect to provider server by other reason.
RemotingException remotingException = new RemotingException(
@@ -146,7 +153,7 @@ public abstract class AbstractNettyConnectionClient extends
AbstractConnectionCl
cause);
throw remotingException;
- } else if (!ret || !promise.isSuccess()) {
+ } else if (!ret || !connectPromise.isSuccess()) {
// 6-2 Client-side timeout
RemotingException remotingException = new RemotingException(
this,
@@ -175,6 +182,7 @@ public abstract class AbstractNettyConnectionClient extends
AbstractConnectionCl
if (!(channel instanceof io.netty.channel.Channel)) {
return;
}
+
io.netty.channel.Channel nettyChannel = ((io.netty.channel.Channel)
channel);
if (isClosed()) {
nettyChannel.close();
@@ -190,12 +198,19 @@ public abstract class AbstractNettyConnectionClient
extends AbstractConnectionCl
current.close();
}
- this.channel.set(nettyChannel);
+ channelRef.set(nettyChannel);
+
// This indicates that the connection is available.
- if (connectingPromise.get() != null) {
- connectingPromise.get().trySuccess(CONNECTED_OBJECT);
+ Promise<Object> connectingPromise = connectingPromiseRef.get();
+ if (connectingPromise != null) {
+ connectingPromise.trySuccess(CONNECTED_OBJECT);
}
+
nettyChannel.attr(CONNECTION).set(this);
+
+ // Notify the connection is available.
+ connectedPromise.trySuccess(null);
+
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(String.format("%s connected ", this));
}
@@ -206,8 +221,9 @@ public abstract class AbstractNettyConnectionClient extends
AbstractConnectionCl
if (!(channel instanceof io.netty.channel.Channel)) {
return;
}
+
io.netty.channel.Channel nettyChannel = (io.netty.channel.Channel)
channel;
- if (this.channel.compareAndSet(nettyChannel, null)) {
+ if (channelRef.compareAndSet(nettyChannel, null)) {
// Ensure the channel is closed
if (nettyChannel.isOpen()) {
nettyChannel.close();
@@ -229,16 +245,17 @@ public abstract class AbstractNettyConnectionClient
extends AbstractConnectionCl
}
private io.netty.channel.Channel getNettyChannel() {
- return channel.get();
+ return channelRef.get();
}
protected void clearNettyChannel() {
- channel.set(null);
+ channelRef.set(null);
}
@Override
- public Object getChannel(Boolean generalizable) {
- return Boolean.TRUE.equals(generalizable) ? getNettyChannel() :
getChannel();
+ @SuppressWarnings("unchecked")
+ public <T> T getChannel(Boolean generalizable) {
+ return Boolean.TRUE.equals(generalizable) ? (T) getNettyChannel() :
(T) getChannel();
}
@Override
@@ -259,20 +276,19 @@ public abstract class AbstractNettyConnectionClient
extends AbstractConnectionCl
}
}
- createConnectingPromise();
- connectingPromise.get().awaitUninterruptibly(getConnectTimeout(),
TimeUnit.MILLISECONDS);
+
getOrCreateConnectingPromise().awaitUninterruptibly(getConnectTimeout(),
TimeUnit.MILLISECONDS);
// destroy connectingPromise after used
synchronized (this) {
- connectingPromise.set(null);
+ connectingPromiseRef.set(null);
}
nettyChannel = getNettyChannel();
return nettyChannel != null && nettyChannel.isActive();
}
- @Override
- public void createConnectingPromise() {
- connectingPromise.compareAndSet(null, new
DefaultPromise<>(GlobalEventExecutor.INSTANCE));
+ private Promise<Object> getOrCreateConnectingPromise() {
+ connectingPromiseRef.compareAndSet(null, new
DefaultPromise<>(GlobalEventExecutor.INSTANCE));
+ return connectingPromiseRef.get();
}
public Promise<Void> getClosePromise() {
@@ -290,12 +306,22 @@ public abstract class AbstractNettyConnectionClient
extends AbstractConnectionCl
null,
"Failed to send request " + request + ", cause: The
channel to " + remote + " is closed!");
}
- return ((io.netty.channel.Channel)
getChannel()).writeAndFlush(request);
+ return getNettyChannel().writeAndFlush(request);
+ }
+
+ @Override
+ public void addConnectedListener(Runnable func) {
+ connectedPromise.addListener(future -> func.run());
+ }
+
+ @Override
+ public void addDisconnectedListener(Runnable func) {
+ disconnectedPromise.addListener(future -> func.run());
}
@Override
public void addCloseListener(Runnable func) {
- getClosePromise().addListener(future -> func.run());
+ closePromise.addListener(future -> func.run());
}
@Override
@@ -335,6 +361,10 @@ public abstract class AbstractNettyConnectionClient
extends AbstractConnectionCl
"%s is reconnecting, attempt=%d cause=%s",
connectionClient, 0, future.cause().getMessage()));
}
+
+ // Notify the connection is unavailable.
+ disconnectedPromise.trySuccess(null);
+
connectivityExecutor.schedule(
() -> {
try {
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AddressUtils.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AddressUtils.java
index 438cf2c63c..85ffb29aa3 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AddressUtils.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/AddressUtils.java
@@ -34,7 +34,7 @@ public final class AddressUtils {
public static InetSocketAddress getRemoteAddress(Channel channel) {
InetSocketAddress address;
- for (int i = 0, len = ACCESSORS.size(); i < len; i++) {
+ for (int i = 0, size = ACCESSORS.size(); i < size; i++) {
address = ACCESSORS.get(i).getRemoteAddress(channel);
if (address != null) {
return address;
@@ -45,7 +45,7 @@ public final class AddressUtils {
public static InetSocketAddress getLocalAddress(Channel channel) {
InetSocketAddress address;
- for (int i = 0, len = ACCESSORS.size(); i < len; i++) {
+ for (int i = 0, size = ACCESSORS.size(); i < size; i++) {
address = ACCESSORS.get(i).getLocalAddress(channel);
if (address != null) {
return address;
@@ -56,7 +56,7 @@ public final class AddressUtils {
public static String getRemoteAddressKey(Channel channel) {
InetSocketAddress address;
- for (int i = 0, len = ACCESSORS.size(); i < len; i++) {
+ for (int i = 0, size = ACCESSORS.size(); i < size; i++) {
ChannelAddressAccessor accessor = ACCESSORS.get(i);
address = accessor.getRemoteAddress(channel);
if (address != null) {
@@ -72,7 +72,7 @@ public final class AddressUtils {
public static String getLocalAddressKey(Channel channel) {
InetSocketAddress address;
- for (int i = 0, len = ACCESSORS.size(); i < len; i++) {
+ for (int i = 0, size = ACCESSORS.size(); i < size; i++) {
ChannelAddressAccessor accessor = ACCESSORS.get(i);
address = accessor.getLocalAddress(channel);
if (address != null) {
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
index fead015b13..5838709943 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
@@ -71,7 +71,7 @@ public final class NettyConnectionClient extends
AbstractNettyConnectionClient {
@Override
protected void initChannel(SocketChannel ch) {
NettyChannel nettyChannel = NettyChannel.getOrAddChannel(ch,
getUrl(), getChannelHandler());
- final ChannelPipeline pipeline = ch.pipeline();
+ ChannelPipeline pipeline = ch.pipeline();
NettySslContextOperator nettySslContextOperator = new
NettySslContextOperator();
if (sslContext != null) {
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
index f3c490376a..69c6738c0d 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyPortUnificationServerHandler.java
@@ -183,8 +183,7 @@ public class NettyPortUnificationServerHandler extends
ByteToMessageDecoder {
}
byte[] preface = new byte[in.readableBytes()];
in.readBytes(preface);
- Set<String> supported =
-
url.getApplicationModel().getExtensionLoader(WireProtocol.class).getSupportedExtensions();
+ Set<String> supported =
url.getApplicationModel().getSupportedExtensions(WireProtocol.class);
LOGGER.error(
INTERNAL_ERROR,
"unknown error in remoting module",
diff --git
a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
index 9d7ffefa50..654545dc52 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/Constants.java
@@ -108,6 +108,7 @@ public interface Constants {
String H2_SETTINGS_VERBOSE_ENABLED = "dubbo.protocol.triple.verbose";
String H2_SETTINGS_SERVLET_ENABLED =
"dubbo.protocol.triple.servlet.enabled";
String H3_SETTINGS_HTTP3_ENABLED = "dubbo.protocol.triple.http3.enabled";
+ String H3_SETTINGS_HTTP3_NEGOTIATION =
"dubbo.protocol.triple.http3.negotiation";
String ADAPTIVE_LOADBALANCE_ATTACHMENT_KEY = "lb_adaptive";
String ADAPTIVE_LOADBALANCE_START_TIME = "adaptive_startTime";
diff --git
a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/exchange/Http3Exchanger.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http3Exchanger.java
similarity index 56%
rename from
dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/exchange/Http3Exchanger.java
rename to
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http3Exchanger.java
index 1a265abcfc..d620ecf491 100644
---
a/dubbo-remoting/dubbo-remoting-http3/src/main/java/org/apache/dubbo/remoting/exchange/Http3Exchanger.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/Http3Exchanger.java
@@ -14,57 +14,83 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.remoting.exchange;
+package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.Configuration;
import org.apache.dubbo.common.constants.LoggerCodeConstants;
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ClassUtils;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.RemotingServer;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
import org.apache.dubbo.remoting.transport.ChannelHandlerAdapter;
-import org.apache.dubbo.remoting.transport.netty4.NettyHttp3ConnectionClient;
import org.apache.dubbo.remoting.transport.netty4.NettyHttp3Server;
+import org.apache.dubbo.rpc.Constants;
+import org.apache.dubbo.rpc.protocol.tri.h3.negotiation.Helper;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_UNSUPPORTED;
+
public final class Http3Exchanger {
private static final ErrorTypeAwareLogger LOGGER =
LoggerFactory.getErrorTypeAwareLogger(Http3Exchanger.class);
+ private static final boolean HAS_NETTY_HTTP3 =
ClassUtils.isPresent("io.netty.incubator.codec.http3.Http3");
private static final Map<String, RemotingServer> SERVERS = new
ConcurrentHashMap<>();
private static final Map<String, AbstractConnectionClient> CLIENTS = new
ConcurrentHashMap<>(16);
private static final ChannelHandler HANDLER = new ChannelHandlerAdapter();
+ private static boolean ENABLED = false;
+ private static boolean NEGOTIATION_ENABLED = true;
+
private Http3Exchanger() {}
- public static RemotingServer bind(URL url) {
- return SERVERS.computeIfAbsent(url.getAddress(), addr -> {
- try {
- return new NettyHttp3Server(url, HANDLER);
- } catch (RemotingException e) {
- throw new RuntimeException(e);
+ public static void init(Configuration configuration) {
+ boolean enabled =
configuration.getBoolean(Constants.H3_SETTINGS_HTTP3_ENABLED, false);
+ if (enabled) {
+ if (HAS_NETTY_HTTP3) {
+ ENABLED = true;
+ NEGOTIATION_ENABLED =
configuration.getBoolean(Constants.H3_SETTINGS_HTTP3_NEGOTIATION, true);
+ return;
}
- });
+ LOGGER.warn(PROTOCOL_UNSUPPORTED, "", "", "Class for netty http3
not found, HTTP/3 support is not enabled");
+ }
+ }
+
+ public static boolean isEnabled(URL url) {
+ return ENABLED || HAS_NETTY_HTTP3 &&
url.getParameter(Constants.HTTP3_KEY, false);
+ }
+
+ public static RemotingServer bind(URL url) {
+ if (isEnabled(url)) {
+ return SERVERS.computeIfAbsent(url.getAddress(), addr -> {
+ try {
+ return new NettyHttp3Server(url, HANDLER);
+ } catch (RemotingException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+ return null;
}
public static AbstractConnectionClient connect(URL url) {
return CLIENTS.compute(url.getAddress(), (address, client) -> {
- try {
- if (client == null) {
- AbstractConnectionClient connectionClient = new
NettyHttp3ConnectionClient(url, HANDLER);
- connectionClient.addCloseListener(() ->
CLIENTS.remove(address, connectionClient));
- client = connectionClient;
- } else {
- client.retain();
- }
- return client;
- } catch (RemotingException e) {
- throw new RuntimeException(e);
+ if (client == null) {
+ AbstractConnectionClient connectionClient = NEGOTIATION_ENABLED
+ ? Helper.createAutoSwitchClient(url, HANDLER)
+ : Helper.createHttp3Client(url, HANDLER);
+ connectionClient.addCloseListener(() ->
CLIENTS.remove(address, connectionClient));
+ client = connectionClient;
+ } else {
+ client.retain();
}
+ return client;
});
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServletExchanger.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServletExchanger.java
index 9e899b66d8..ffc28fc894 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServletExchanger.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/ServletExchanger.java
@@ -17,6 +17,8 @@
package org.apache.dubbo.rpc.protocol.tri;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.Configuration;
+import org.apache.dubbo.rpc.Constants;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
@@ -26,8 +28,18 @@ public final class ServletExchanger {
private static final AtomicReference<URL> url = new AtomicReference<>();
private static final AtomicReference<Integer> serverPort = new
AtomicReference<>();
+ private static boolean ENABLED = false;
+
+ public static void init(Configuration configuration) {
+ ENABLED =
configuration.getBoolean(Constants.H2_SETTINGS_SERVLET_ENABLED, false);
+ }
+
private ServletExchanger() {}
+ public static boolean isEnabled() {
+ return ENABLED;
+ }
+
public static void bind(URL url) {
ServletExchanger.url.compareAndSet(null, url);
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
index 0289e08f78..4d99980227 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleConstant.java
@@ -23,7 +23,6 @@ public class TripleConstant {
public static final String DEFAULT_VERSION = "1.0.0";
public static final String SERIALIZATION_KEY = "serialization";
- public static final String TE_KEY = "te";
public static final String HESSIAN4 = "hessian4";
public static final String HESSIAN2 = "hessian2";
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
index 3b7061dd2a..7b7dcbe7b8 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleProtocol.java
@@ -24,7 +24,6 @@ import org.apache.dubbo.common.utils.ExecutorUtil;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
import org.apache.dubbo.remoting.api.pu.DefaultPuHandler;
-import org.apache.dubbo.remoting.exchange.Http3Exchanger;
import org.apache.dubbo.remoting.exchange.PortUnificationExchanger;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;
@@ -40,7 +39,6 @@ import
org.apache.dubbo.rpc.protocol.tri.rest.mapping.RequestMappingRegistry;
import org.apache.dubbo.rpc.protocol.tri.service.TriBuiltinService;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import io.grpc.health.v1.HealthCheckResponse.ServingStatus;
@@ -54,11 +52,8 @@ import static
org.apache.dubbo.remoting.Constants.BIND_PORT_KEY;
import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_IGNORE_1_0_0_KEY;
import static
org.apache.dubbo.rpc.Constants.H2_SETTINGS_PASS_THROUGH_STANDARD_HTTP_HEADERS;
import static
org.apache.dubbo.rpc.Constants.H2_SETTINGS_RESOLVE_FALLBACK_TO_DEFAULT_KEY;
-import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_SERVLET_ENABLED;
import static
org.apache.dubbo.rpc.Constants.H2_SETTINGS_SUPPORT_NO_LOWER_HEADER_KEY;
import static org.apache.dubbo.rpc.Constants.H2_SETTINGS_VERBOSE_ENABLED;
-import static org.apache.dubbo.rpc.Constants.H3_SETTINGS_HTTP3_ENABLED;
-import static org.apache.dubbo.rpc.Constants.HTTP3_KEY;
public class TripleProtocol extends AbstractProtocol {
@@ -66,34 +61,32 @@ public class TripleProtocol extends AbstractProtocol {
private final RequestMappingRegistry mappingRegistry;
private final TriBuiltinService triBuiltinService;
private final String acceptEncodings;
- private boolean http3Bound;
public static boolean CONVERT_NO_LOWER_HEADER = false;
public static boolean IGNORE_1_0_0_VERSION = false;
public static boolean RESOLVE_FALLBACK_TO_DEFAULT = true;
public static boolean PASS_THROUGH_STANDARD_HTTP_HEADERS = false;
public static boolean VERBOSE_ENABLED = false;
- public static boolean HTTP3_ENABLED = false;
- public static boolean SERVLET_ENABLED = false;
public TripleProtocol(FrameworkModel frameworkModel) {
this.frameworkModel = frameworkModel;
triBuiltinService = new TriBuiltinService(frameworkModel);
pathResolver = frameworkModel.getDefaultExtension(PathResolver.class);
mappingRegistry =
frameworkModel.getBeanFactory().getOrRegisterBean(DefaultRequestMappingRegistry.class);
- Set<String> supported =
-
frameworkModel.getExtensionLoader(DeCompressor.class).getSupportedExtensions();
- acceptEncodings = String.join(",", supported);
+ acceptEncodings = String.join(",",
frameworkModel.getSupportedExtensions(DeCompressor.class));
+
+ // init env settings
Configuration conf =
ConfigurationUtils.getEnvConfiguration(ApplicationModel.defaultModel());
CONVERT_NO_LOWER_HEADER =
conf.getBoolean(H2_SETTINGS_SUPPORT_NO_LOWER_HEADER_KEY, true);
IGNORE_1_0_0_VERSION = conf.getBoolean(H2_SETTINGS_IGNORE_1_0_0_KEY,
false);
RESOLVE_FALLBACK_TO_DEFAULT =
conf.getBoolean(H2_SETTINGS_RESOLVE_FALLBACK_TO_DEFAULT_KEY, true);
PASS_THROUGH_STANDARD_HTTP_HEADERS =
conf.getBoolean(H2_SETTINGS_PASS_THROUGH_STANDARD_HTTP_HEADERS, false);
+ // init global settings
Configuration globalConf =
ConfigurationUtils.getGlobalConfiguration(frameworkModel.defaultApplication());
VERBOSE_ENABLED = globalConf.getBoolean(H2_SETTINGS_VERBOSE_ENABLED,
false);
- SERVLET_ENABLED = globalConf.getBoolean(H2_SETTINGS_SERVLET_ENABLED,
false);
- HTTP3_ENABLED = globalConf.getBoolean(H3_SETTINGS_HTTP3_ENABLED,
false);
+ ServletExchanger.init(globalConf);
+ Http3Exchanger.init(globalConf);
}
@Override
@@ -160,7 +153,7 @@ public class TripleProtocol extends AbstractProtocol {
private void bindServerPort(URL url) {
boolean bindPort = true;
- if (SERVLET_ENABLED) {
+ if (ServletExchanger.isEnabled()) {
int port = url.getParameter(BIND_PORT_KEY, url.getPort());
Integer serverPort = ServletExchanger.getServerPort();
if (serverPort == null) {
@@ -177,17 +170,14 @@ public class TripleProtocol extends AbstractProtocol {
PortUnificationExchanger.bind(url, new DefaultPuHandler());
}
- if (isHttp3Enabled(url)) {
- Http3Exchanger.bind(url);
- http3Bound = true;
- }
+ Http3Exchanger.bind(url);
}
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
optimizeSerialization(url);
ExecutorService streamExecutor =
getOrCreateStreamExecutor(url.getOrDefaultApplicationModel(), url);
- AbstractConnectionClient connectionClient = isHttp3Enabled(url)
+ AbstractConnectionClient connectionClient =
Http3Exchanger.isEnabled(url)
? Http3Exchanger.connect(url)
: PortUnificationExchanger.connect(url, new
DefaultPuHandler());
TripleInvoker<T> invoker =
@@ -216,15 +206,9 @@ public class TripleProtocol extends AbstractProtocol {
logger.info("Destroying protocol [{}] ...",
getClass().getSimpleName());
}
PortUnificationExchanger.close();
- if (http3Bound) {
- Http3Exchanger.close();
- }
+ Http3Exchanger.close();
pathResolver.destroy();
mappingRegistry.destroy();
super.destroy();
}
-
- public static boolean isHttp3Enabled(URL url) {
- return HTTP3_ENABLED || url.getParameter(HTTP3_KEY, false);
- }
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java
index 16e52d990f..23200fbb40 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/AbstractServerTransportListener.java
@@ -97,6 +97,7 @@ public abstract class AbstractServerTransportListener<HEADER
extends RequestMeta
doOnMetadata(metadata);
} catch (Throwable t) {
logError(t);
+ onMetadataError(metadata, t);
onError(t);
}
});
@@ -154,6 +155,10 @@ public abstract class
AbstractServerTransportListener<HEADER extends RequestMeta
// default no op
}
+ protected void onMetadataError(HEADER metadata, Throwable throwable) {
+ initializeAltSvc(url);
+ }
+
protected void onPrepareData(MESSAGE message) {
// default no op
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
index 677816bb1d..88f7dcb51c 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
@@ -37,8 +37,8 @@ import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.executor.ExecutorSupport;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.MethodDescriptor;
+import org.apache.dubbo.rpc.protocol.tri.Http3Exchanger;
import org.apache.dubbo.rpc.protocol.tri.RpcInvocationBuildContext;
-import org.apache.dubbo.rpc.protocol.tri.TripleProtocol;
import org.apache.dubbo.rpc.protocol.tri.h12.AbstractServerTransportListener;
import org.apache.dubbo.rpc.protocol.tri.h12.DefaultHttpMessageListener;
import org.apache.dubbo.rpc.protocol.tri.h12.HttpMessageListener;
@@ -113,7 +113,7 @@ public class DefaultHttp11ServerTransportListener
@Override
protected void initializeAltSvc(URL url) {
- String protocolId = TripleProtocol.isHttp3Enabled(url) ? "h3" : "h2";
+ String protocolId = Http3Exchanger.isEnabled(url) ? "h3" : "h2";
int bindPort = url.getParameter(Constants.BIND_PORT_KEY,
url.getPort());
serverChannelObserver.setAltSvc(protocolId + "=\":" + bindPort + "\"");
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
index 1e039f66a0..698778e415 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/GenericHttp2ServerTransportListener.java
@@ -38,8 +38,8 @@ import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.executor.ExecutorSupport;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.model.MethodDescriptor;
+import org.apache.dubbo.rpc.protocol.tri.Http3Exchanger;
import org.apache.dubbo.rpc.protocol.tri.RpcInvocationBuildContext;
-import org.apache.dubbo.rpc.protocol.tri.TripleProtocol;
import org.apache.dubbo.rpc.protocol.tri.h12.AbstractServerTransportListener;
import org.apache.dubbo.rpc.protocol.tri.h12.BiStreamServerCallListener;
import org.apache.dubbo.rpc.protocol.tri.h12.HttpMessageListener;
@@ -150,7 +150,7 @@ public class GenericHttp2ServerTransportListener extends
AbstractServerTransport
@Override
protected void initializeAltSvc(URL url) {
- if (TripleProtocol.isHttp3Enabled(url)) {
+ if (Http3Exchanger.isEnabled(url)) {
int bindPort = url.getParameter(Constants.BIND_PORT_KEY,
url.getPort());
serverChannelObserver.setAltSvc("h3=\":" + bindPort + "\"");
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ClientStreamFactory.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ClientStreamFactory.java
index 56a6828f99..f0db0e5805 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ClientStreamFactory.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http2/Http2ClientStreamFactory.java
@@ -26,8 +26,6 @@ import
org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
import java.util.concurrent.Executor;
-import io.netty.channel.Channel;
-
@Activate
public class Http2ClientStreamFactory implements ClientStreamFactory {
@@ -38,7 +36,6 @@ public class Http2ClientStreamFactory implements
ClientStreamFactory {
Executor executor,
TripleClientCall clientCall,
TripleWriteQueue writeQueue) {
- return new Http2TripleClientStream(
- frameworkModel, executor, (Channel) client.getChannel(true),
clientCall, writeQueue);
+ return new Http2TripleClientStream(frameworkModel, executor,
client.getChannel(true), clientCall, writeQueue);
}
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientStreamFactory.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientStreamFactory.java
index b6e2a5dc59..44b48c2d7d 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientStreamFactory.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientStreamFactory.java
@@ -27,8 +27,6 @@ import
org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
import java.util.concurrent.Executor;
-import io.netty.channel.Channel;
-
@Activate(order = -100, onClass = "io.netty.incubator.codec.quic.QuicChannel")
public class Http3ClientStreamFactory implements ClientStreamFactory {
@@ -41,7 +39,7 @@ public class Http3ClientStreamFactory implements
ClientStreamFactory {
TripleWriteQueue writeQueue) {
if (client instanceof NettyHttp3ConnectionClient) {
return new Http3TripleClientStream(
- frameworkModel, executor, (Channel)
client.getChannel(true), clientCall, writeQueue);
+ frameworkModel, executor, client.getChannel(true),
clientCall, writeQueue);
}
return null;
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientStreamFactory.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/AdaptiveClientStreamFactory.java
similarity index 67%
copy from
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientStreamFactory.java
copy to
dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/AdaptiveClientStreamFactory.java
index b6e2a5dc59..dee38d963b 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/Http3ClientStreamFactory.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/AdaptiveClientStreamFactory.java
@@ -14,13 +14,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.rpc.protocol.tri.h3;
+package org.apache.dubbo.rpc.protocol.tri.h3.negotiation;
import org.apache.dubbo.common.extension.Activate;
import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
-import org.apache.dubbo.remoting.transport.netty4.NettyHttp3ConnectionClient;
import org.apache.dubbo.rpc.model.FrameworkModel;
import org.apache.dubbo.rpc.protocol.tri.call.TripleClientCall;
+import org.apache.dubbo.rpc.protocol.tri.h12.http2.Http2TripleClientStream;
+import org.apache.dubbo.rpc.protocol.tri.h3.Http3TripleClientStream;
import org.apache.dubbo.rpc.protocol.tri.stream.ClientStream;
import org.apache.dubbo.rpc.protocol.tri.stream.ClientStreamFactory;
import org.apache.dubbo.rpc.protocol.tri.transport.TripleWriteQueue;
@@ -29,8 +30,8 @@ import java.util.concurrent.Executor;
import io.netty.channel.Channel;
-@Activate(order = -100, onClass = "io.netty.incubator.codec.quic.QuicChannel")
-public class Http3ClientStreamFactory implements ClientStreamFactory {
+@Activate(order = -90, onClass = "io.netty.incubator.codec.quic.QuicChannel")
+public class AdaptiveClientStreamFactory implements ClientStreamFactory {
@Override
public ClientStream createClientStream(
@@ -39,9 +40,12 @@ public class Http3ClientStreamFactory implements
ClientStreamFactory {
Executor executor,
TripleClientCall clientCall,
TripleWriteQueue writeQueue) {
- if (client instanceof NettyHttp3ConnectionClient) {
- return new Http3TripleClientStream(
- frameworkModel, executor, (Channel)
client.getChannel(true), clientCall, writeQueue);
+ if (client instanceof AutoSwitchConnectionClient) {
+ Channel channel = client.getChannel(true);
+ if (((AutoSwitchConnectionClient) client).isHttp3Connected()) {
+ return new Http3TripleClientStream(frameworkModel, executor,
channel, clientCall, writeQueue);
+ }
+ return new Http2TripleClientStream(frameworkModel, executor,
channel, clientCall, writeQueue);
}
return null;
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/AutoSwitchConnectionClient.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/AutoSwitchConnectionClient.java
new file mode 100644
index 0000000000..635cf999a0
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/AutoSwitchConnectionClient.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri.h3.negotiation;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
+import org.apache.dubbo.common.utils.ClassUtils;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.remoting.Channel;
+import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
+import org.apache.dubbo.rpc.protocol.tri.TripleConstant;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static
org.apache.dubbo.common.constants.LoggerCodeConstants.PROTOCOL_ERROR_CLOSE_CLIENT;
+import static
org.apache.dubbo.common.logger.LoggerFactory.getErrorTypeAwareLogger;
+
+public class AutoSwitchConnectionClient extends AbstractConnectionClient {
+
+ private static final ErrorTypeAwareLogger LOGGER =
getErrorTypeAwareLogger(AutoSwitchConnectionClient.class);
+ private static final int MAX_RETRIES = 8;
+
+ private final URL url;
+ private final AbstractConnectionClient connectionClient;
+
+ private AbstractConnectionClient http3ConnectionClient;
+ private ScheduledExecutorService executor;
+ private NegotiateClientCall clientCall;
+ private boolean negotiated = false;
+ private boolean http3Connected = false;
+ private final AtomicBoolean scheduling = new AtomicBoolean();
+ private int attempt = 0;
+
+ public AutoSwitchConnectionClient(URL url, AbstractConnectionClient
connectionClient) {
+ this.url = url;
+ this.connectionClient = connectionClient;
+ executor = Executors.newSingleThreadScheduledExecutor(new
NamedThreadFactory("Dubbo-http3-negotiation"));
+ ClassLoader tccl = Thread.currentThread().getContextClassLoader();
+ connectionClient.addConnectedListener(() -> ClassUtils.runWith(tccl,
() -> executor.execute(this::negotiate)));
+ increase();
+ if (LOGGER.isInfoEnabled()) {
+ LOGGER.info(
+ "Start HTTP/3 AutoSwitchConnectionClient {} connect to the
server {}",
+ NetUtils.getLocalAddress(),
+ url.toInetSocketAddress());
+ }
+ }
+
+ private String getBaseUrl() {
+ boolean ssl = url.getParameter(CommonConstants.SSL_ENABLED_KEY, false);
+ CharSequence scheme = ssl ? TripleConstant.HTTPS_SCHEME :
TripleConstant.HTTP_SCHEME;
+ return scheme + "://" + url.getHost() + ':' + url.getPort() + '/';
+ }
+
+ private void negotiate() {
+ if (negotiated) {
+ return;
+ }
+ scheduling.set(false);
+ if (clientCall == null) {
+ clientCall = new NegotiateClientCall(connectionClient, executor);
+ }
+ LOGGER.info("Start HTTP/3 negotiation for [{}]", getBaseUrl());
+ clientCall.start(url).whenComplete((altSvc, t) -> {
+ if (t == null) {
+ if (altSvc.contains("h3=")) {
+ negotiateSuccess();
+ return;
+ }
+ LOGGER.info(
+ "HTTP/3 negotiation succeed, but provider reply
alt-svc='{}' not support HTTP/3 for [{}]",
+ altSvc,
+ getBaseUrl());
+ return;
+ }
+ if (scheduling.compareAndSet(false, true)) {
+ reScheduleNegotiate(t);
+ }
+ });
+ }
+
+ private void negotiateSuccess() {
+ negotiated = true;
+ LOGGER.info("HTTP/3 negotiation succeed for [{}], create http3
client", getBaseUrl());
+ http3ConnectionClient = Helper.createHttp3Client(url,
connectionClient.getDelegateHandler());
+ http3ConnectionClient.addConnectedListener(() ->
setHttp3Connected(true));
+ http3ConnectionClient.addDisconnectedListener(() ->
setHttp3Connected(false));
+ negotiateEnd();
+ }
+
+ private void reScheduleNegotiate(Throwable t) {
+ if (attempt++ < MAX_RETRIES) {
+ int delay = 1 << attempt + 2;
+ LOGGER.info("HTTP/3 negotiation failed, retry after {} seconds for
[{}]", delay, getBaseUrl(), t);
+ executor.schedule(this::negotiate, delay, TimeUnit.SECONDS);
+ return;
+ }
+ LOGGER.warn(
+ PROTOCOL_ERROR_CLOSE_CLIENT,
+ "",
+ "",
+ "Max retries " + MAX_RETRIES + " reached, HTTP/3 negotiation
failed for " + getBaseUrl(),
+ t);
+ negotiateEnd();
+ }
+
+ private void negotiateEnd() {
+ scheduling.set(false);
+ executor.shutdown();
+ executor = null;
+ clientCall = null;
+ }
+
+ private void setHttp3Connected(boolean http3Connected) {
+ this.http3Connected = http3Connected;
+ LOGGER.info("Switch protocol to {} for [{}]", http3Connected ?
"HTTP/3" : "HTTP/2", url.toString(""));
+ }
+
+ public boolean isHttp3Connected() {
+ return http3Connected;
+ }
+
+ @Override
+ public boolean isConnected() {
+ return http3Connected ? http3ConnectionClient.isConnected() :
connectionClient.isConnected();
+ }
+
+ @Override
+ public InetSocketAddress getLocalAddress() {
+ return http3Connected ? http3ConnectionClient.getLocalAddress() :
connectionClient.getLocalAddress();
+ }
+
+ @Override
+ public boolean release() {
+ try {
+ connectionClient.release();
+ } catch (Throwable t) {
+ LOGGER.warn(PROTOCOL_ERROR_CLOSE_CLIENT, "", "", t.getMessage(),
t);
+ }
+ if (http3ConnectionClient != null) {
+ try {
+ http3ConnectionClient.release();
+ } catch (Throwable t) {
+ LOGGER.warn(PROTOCOL_ERROR_CLOSE_CLIENT, "", "",
t.getMessage(), t);
+ }
+ }
+ return true;
+ }
+
+ @Override
+ protected void initConnectionClient() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return http3Connected ? http3ConnectionClient.isAvailable() :
connectionClient.isAvailable();
+ }
+
+ @Override
+ public void addCloseListener(Runnable func) {
+ connectionClient.addCloseListener(func);
+ }
+
+ @Override
+ public void addConnectedListener(Runnable func) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void addDisconnectedListener(Runnable func) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void onConnected(Object channel) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void onGoaway(Object channel) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void destroy() {
+ connectionClient.destroy();
+ if (http3ConnectionClient != null) {
+ http3ConnectionClient.destroy();
+ }
+ }
+
+ @Override
+ public <T> T getChannel(Boolean generalizable) {
+ return http3Connected
+ ? http3ConnectionClient.getChannel(generalizable)
+ : connectionClient.getChannel(generalizable);
+ }
+
+ @Override
+ protected void doOpen() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected void doClose() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected void doConnect() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected void doDisConnect() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected Channel getChannel() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String toString() {
+ return "AutoSwitchConnectionClient{" + "http3Enabled=" +
http3Connected + ", http3=" + http3ConnectionClient
+ + ", http2=" + connectionClient + '}';
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/Helper.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/Helper.java
new file mode 100644
index 0000000000..0cc705272c
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/Helper.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri.h3.negotiation;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.remoting.ChannelHandler;
+import org.apache.dubbo.remoting.RemotingException;
+import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
+import org.apache.dubbo.remoting.exchange.PortUnificationExchanger;
+import
org.apache.dubbo.remoting.transport.netty4.AbstractNettyConnectionClient;
+import org.apache.dubbo.remoting.transport.netty4.NettyHttp3ConnectionClient;
+import org.apache.dubbo.rpc.protocol.tri.ExceptionUtils;
+
+public class Helper {
+
+ private Helper() {}
+
+ public static AbstractConnectionClient createAutoSwitchClient(URL url,
ChannelHandler handler) {
+ return new AutoSwitchConnectionClient(url,
PortUnificationExchanger.connect(url, handler));
+ }
+
+ public static AbstractNettyConnectionClient createHttp3Client(URL url,
ChannelHandler handler) {
+ try {
+ return new NettyHttp3ConnectionClient(url, handler);
+ } catch (RemotingException e) {
+ throw ExceptionUtils.wrap(e);
+ }
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/NegotiateClientCall.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/NegotiateClientCall.java
new file mode 100644
index 0000000000..59d33b7f55
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h3/negotiation/NegotiateClientCall.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri.h3.negotiation;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.remoting.api.connection.AbstractConnectionClient;
+import org.apache.dubbo.rpc.protocol.tri.TripleConstant;
+import org.apache.dubbo.rpc.protocol.tri.TripleHeaderEnum;
+import org.apache.dubbo.rpc.protocol.tri.transport.H2TransportListener;
+import
org.apache.dubbo.rpc.protocol.tri.transport.TripleHttp2ClientResponseHandler;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http2.DefaultHttp2Headers;
+import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
+import io.netty.handler.codec.http2.Http2Headers;
+import io.netty.handler.codec.http2.Http2HeadersFrame;
+import io.netty.handler.codec.http2.Http2StreamChannel;
+import io.netty.handler.codec.http2.Http2StreamChannelBootstrap;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.concurrent.Future;
+
+public class NegotiateClientCall {
+
+ private final AbstractConnectionClient connectionClient;
+ private final Executor executor;
+
+ public NegotiateClientCall(AbstractConnectionClient connectionClient,
Executor executor) {
+ this.connectionClient = connectionClient;
+ this.executor = executor;
+ }
+
+ public CompletableFuture<String> start(URL url) {
+ CompletableFuture<String> future = new CompletableFuture<>();
+ try {
+ Channel channel = connectionClient.getChannel(true);
+ Http2StreamChannelBootstrap bootstrap = new
Http2StreamChannelBootstrap(channel);
+ bootstrap.handler(new ChannelInboundHandlerAdapter() {
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) {
+ ctx.channel()
+ .pipeline()
+ .addLast(new ReadTimeoutHandler(12,
TimeUnit.SECONDS))
+ .addLast(new TripleHttp2ClientResponseHandler(new
Listener(executor, future)));
+ }
+ });
+ Future<Http2StreamChannel> streamFuture = bootstrap.open();
+ streamFuture.addListener(f -> {
+ if (f.isSuccess()) {
+
streamFuture.getNow().writeAndFlush(buildHeaders(url)).addListener(cf -> {
+ if (cf.isSuccess()) {
+ return;
+ }
+ future.completeExceptionally(cf.cause());
+ });
+ return;
+ }
+ future.completeExceptionally(f.cause());
+ });
+ } catch (Throwable t) {
+ future.completeExceptionally(t);
+ }
+ return future;
+ }
+
+ private Http2HeadersFrame buildHeaders(URL url) {
+ Http2Headers headers = new DefaultHttp2Headers(false);
+ boolean ssl = url.getParameter(CommonConstants.SSL_ENABLED_KEY, false);
+ CharSequence scheme = ssl ? TripleConstant.HTTPS_SCHEME :
TripleConstant.HTTP_SCHEME;
+ headers.scheme(scheme)
+ .authority(url.getAddress())
+ .method(HttpMethod.OPTIONS.asciiName())
+ .path("/")
+ .set(TripleHeaderEnum.SERVICE_TIMEOUT.name(), "10000");
+ return new DefaultHttp2HeadersFrame(headers, true);
+ }
+
+ private static final class Listener implements H2TransportListener {
+
+ private final Executor executor;
+ private final CompletableFuture<String> future;
+
+ Listener(Executor executor, CompletableFuture<String> future) {
+ this.executor = executor;
+ this.future = future;
+ }
+
+ @Override
+ public void onHeader(Http2Headers headers, boolean endStream) {
+ CharSequence line = headers.status();
+ if (line != null) {
+ HttpResponseStatus status = HttpResponseStatus.parseLine(line);
+ if (status.code() < 500) {
+ CharSequence altSvc = headers.get(HttpHeaderNames.ALT_SVC);
+ executor.execute(() ->
future.complete(String.valueOf(altSvc)));
+ return;
+ }
+ }
+ executor.execute(() -> future.completeExceptionally(new
RuntimeException("Status: " + line)));
+ }
+
+ @Override
+ public void onData(ByteBuf data, boolean endStream) {}
+
+ @Override
+ public void cancelByRemote(long errorCode) {
+ executor.execute(() -> future.completeExceptionally(new
RuntimeException("Canceled by remote")));
+ }
+ }
+}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/argument/GeneralTypeConverter.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/argument/GeneralTypeConverter.java
index 9622fb2fad..10f2738336 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/argument/GeneralTypeConverter.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/argument/GeneralTypeConverter.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.rpc.protocol.tri.rest.argument;
import org.apache.dubbo.common.io.StreamUtils;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.ClassUtils;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.DateUtils;
import org.apache.dubbo.common.utils.JsonUtils;
@@ -260,7 +261,7 @@ public class GeneralTypeConverter implements TypeConverter {
case "java.util.regex.Pattern":
return Pattern.compile(str);
case "java.lang.Class":
- return TypeUtils.loadClass(str);
+ return ClassUtils.loadClass(str);
case "[B":
return str.getBytes(UTF_8);
case "[C":
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/filter/RestExtensionExecutionFilter.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/filter/RestExtensionExecutionFilter.java
index 139acb716f..eed8ac01f4 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/filter/RestExtensionExecutionFilter.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/filter/RestExtensionExecutionFilter.java
@@ -24,6 +24,7 @@ import
org.apache.dubbo.common.extension.ExtensionAccessorAware;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.ArrayUtils;
+import org.apache.dubbo.common.utils.ClassUtils;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.http12.HttpRequest;
@@ -42,7 +43,6 @@ import
org.apache.dubbo.rpc.protocol.tri.rest.RestInitializeException;
import org.apache.dubbo.rpc.protocol.tri.rest.mapping.RadixTree;
import org.apache.dubbo.rpc.protocol.tri.rest.mapping.RadixTree.Match;
import org.apache.dubbo.rpc.protocol.tri.rest.util.RestUtils;
-import org.apache.dubbo.rpc.protocol.tri.rest.util.TypeUtils;
import java.util.ArrayList;
import java.util.Arrays;
@@ -207,7 +207,7 @@ public class RestExtensionExecutionFilter extends
RestFilterAdapter {
InstantiationStrategy strategy = new InstantiationStrategy(() ->
applicationModel);
for (String className : StringUtils.tokenize(extensionConfig)) {
try {
- Object extension =
strategy.instantiate(TypeUtils.loadClass(className));
+ Object extension =
strategy.instantiate(ClassUtils.loadClass(className));
if (extension instanceof ExtensionAccessorAware) {
((ExtensionAccessorAware)
extension).setExtensionAccessor(applicationModel);
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/RadixTree.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/RadixTree.java
index 047908cbe2..5bb1289da0 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/RadixTree.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/RadixTree.java
@@ -56,7 +56,7 @@ public final class RadixTree<T> {
if (path.isDirect()) {
KeyString key = new KeyString(path.getPath(), caseSensitive);
List<Match<T>> matches = directPathMap.computeIfAbsent(key, k ->
new ArrayList<>());
- for (int i = 0, len = matches.size(); i < len; i++) {
+ for (int i = 0, size = matches.size(); i < size; i++) {
Match<T> match = matches.get(i);
if (match.getValue().equals(value)) {
return match.getValue();
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/RestRequestHandlerMapping.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/RestRequestHandlerMapping.java
index 1fe540af6d..e75e6767a2 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/RestRequestHandlerMapping.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/RestRequestHandlerMapping.java
@@ -74,6 +74,12 @@ public final class RestRequestHandlerMapping implements
RequestHandlerMapping {
HandlerMeta meta = requestMappingRegistry.lookup(request);
if (meta == null) {
+ String path = request.attribute(RestConstants.PATH_ATTRIBUTE);
+ if (RestConstants.SLASH.equals(path) &&
HttpMethods.OPTIONS.name().equals(request.method())) {
+ handleOptionsRequest(request);
+ }
+
+ LOGGER.debug("No handler found for http request: {}", request);
return null;
}
@@ -124,7 +130,7 @@ public final class RestRequestHandlerMapping implements
RequestHandlerMapping {
private static void handleOptionsRequest(HttpRequest request) {
RequestMapping mapping =
request.attribute(RestConstants.MAPPING_ATTRIBUTE);
- MethodsCondition condition = mapping.getMethodsCondition();
+ MethodsCondition condition = mapping == null ? null :
mapping.getMethodsCondition();
if (condition == null) {
throw new HttpResultPayloadException(HttpResult.builder()
.status(HttpStatus.NO_CONTENT)
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/meta/AnnotationEnum.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/meta/AnnotationEnum.java
index f6d52406cc..c4365853e6 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/meta/AnnotationEnum.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/mapping/meta/AnnotationEnum.java
@@ -16,7 +16,7 @@
*/
package org.apache.dubbo.rpc.protocol.tri.rest.mapping.meta;
-import org.apache.dubbo.rpc.protocol.tri.rest.util.TypeUtils;
+import org.apache.dubbo.common.utils.ClassUtils;
import java.lang.annotation.Annotation;
@@ -29,7 +29,7 @@ public interface AnnotationEnum {
default Class<Annotation> loadType() {
try {
- return (Class) TypeUtils.loadClass(className());
+ return (Class) ClassUtils.loadClass(className());
} catch (Throwable t) {
return (Class) NotFound.class;
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/util/TypeUtils.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/util/TypeUtils.java
index fc77b1f10b..60b551eed8 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/util/TypeUtils.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/rest/util/TypeUtils.java
@@ -84,24 +84,6 @@ public final class TypeUtils {
private TypeUtils() {}
- public static ClassLoader getDefaultClassLoader() {
- ClassLoader cl = Thread.currentThread().getContextClassLoader();
- return cl == null ? RestUtils.class.getClassLoader() : cl;
- }
-
- public static Class<?> loadClass(String className) throws
ClassNotFoundException {
- return getDefaultClassLoader().loadClass(className);
- }
-
- public static boolean isPresent(String className) {
- try {
- loadClass(className);
- return true;
- } catch (Throwable ignored) {
- return false;
- }
- }
-
public static boolean isSimpleProperty(Class<?> type) {
return type == null || isSimpleValueType(type) || type.isArray() &&
isSimpleValueType(type.getComponentType());
}
@@ -370,6 +352,9 @@ public final class TypeUtils {
}
public static String buildSig(Method method) {
+ if (method.getParameterCount() == 0) {
+ return null;
+ }
StringBuilder sb = new StringBuilder(8);
for (Class<?> type : method.getParameterTypes()) {
String name = type.getName();
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.protocol.tri.stream.ClientStreamFactory
b/dubbo-rpc/dubbo-rpc-triple/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.protocol.tri.stream.ClientStreamFactory
index e0c95ee8f6..daccad80f1 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.protocol.tri.stream.ClientStreamFactory
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.protocol.tri.stream.ClientStreamFactory
@@ -1,2 +1,3 @@
http2=org.apache.dubbo.rpc.protocol.tri.h12.http2.Http2ClientStreamFactory
http3=org.apache.dubbo.rpc.protocol.tri.h3.Http3ClientStreamFactory
+adaptive=org.apache.dubbo.rpc.protocol.tri.h3.negotiation.AdaptiveClientStreamFactory
diff --git
a/dubbo-spring-boot/dubbo-spring-boot-starters/dubbo-seata-spring-boot-starter/pom.xml
b/dubbo-spring-boot/dubbo-spring-boot-starters/dubbo-seata-spring-boot-starter/pom.xml
index c1c7645329..6a61948120 100644
---
a/dubbo-spring-boot/dubbo-spring-boot-starters/dubbo-seata-spring-boot-starter/pom.xml
+++
b/dubbo-spring-boot/dubbo-spring-boot-starters/dubbo-seata-spring-boot-starter/pom.xml
@@ -32,6 +32,12 @@
<properties />
<dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<!-- seata dependency -->
<dependency>
<groupId>io.seata</groupId>