This is an automated email from the ASF dual-hosted git repository.
zrlw pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push:
new b85111e487 [3.3] Await channel initialization to ensure http2 client
connection preface mechanism could work properly (#15460)
b85111e487 is described below
commit b85111e487ad77b4c363fab9ee7156c53c5acdda
Author: zrlw <[email protected]>
AuthorDate: Mon Jun 23 21:08:52 2025 +0800
[3.3] Await channel initialization to ensure http2 client connection
preface mechanism could work properly (#15460)
* Await channel initialization to ensure
connectionPrefaceReceivedPromiseRef had been created when necessary
* Await channel initialization by promise
* Set connection preface received promise to null if pipeline does not
contain Http2FrameCodec
---
.../transport/netty4/NettyConnectionClient.java | 105 ++++++++++++++-------
1 file changed, 72 insertions(+), 33 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
index 528bf5b226..8fcec6ddc1 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
@@ -54,6 +54,8 @@ public final class NettyConnectionClient extends
AbstractNettyConnectionClient {
private Bootstrap bootstrap;
+ private AtomicReference<Promise<Void>> channelInitializedPromiseRef;
+
private AtomicReference<Promise<Void>> connectionPrefaceReceivedPromiseRef;
public NettyConnectionClient(URL url, ChannelHandler handler) throws
RemotingException {
@@ -69,6 +71,7 @@ public final class NettyConnectionClient extends
AbstractNettyConnectionClient {
}
protected void initBootstrap() {
+ channelInitializedPromiseRef = new AtomicReference<>();
Bootstrap bootstrap = new Bootstrap();
bootstrap
.group(NettyEventLoopFactory.NIO_EVENT_LOOP_GROUP.get())
@@ -103,8 +106,16 @@ public final class NettyConnectionClient extends
AbstractNettyConnectionClient {
protocol.configClientPipeline(getUrl(), operator,
nettySslContextOperator);
ChannelHandlerContext http2FrameCodecHandlerCtx =
pipeline.context(Http2FrameCodec.class);
- if (http2FrameCodecHandlerCtx != null) {
- connectionPrefaceReceivedPromiseRef = new
AtomicReference<>();
+ if (http2FrameCodecHandlerCtx == null) {
+ // set connection preface received promise to null.
+ connectionPrefaceReceivedPromiseRef = null;
+ } else {
+ // create connection preface received promise if necessary.
+ if (connectionPrefaceReceivedPromiseRef == null) {
+ connectionPrefaceReceivedPromiseRef = new
AtomicReference<>();
+ }
+ connectionPrefaceReceivedPromiseRef.compareAndSet(
+ null, new
DefaultPromise<>(GlobalEventExecutor.INSTANCE));
pipeline.addAfter(
http2FrameCodecHandlerCtx.name(),
"client-connection-preface-handler",
@@ -114,6 +125,12 @@ public final class NettyConnectionClient extends
AbstractNettyConnectionClient {
// set null but do not close this client, it will be
reconnecting in the future
ch.closeFuture().addListener(channelFuture ->
clearNettyChannel());
// TODO support Socks5
+
+ // set channel initialized promise to success if necessary.
+ Promise<Void> channelInitializedPromise =
channelInitializedPromiseRef.get();
+ if (channelInitializedPromise != null) {
+ channelInitializedPromise.trySuccess(null);
+ }
}
});
this.bootstrap = bootstrap;
@@ -121,15 +138,15 @@ public final class NettyConnectionClient extends
AbstractNettyConnectionClient {
@Override
protected ChannelFuture performConnect() {
- if (connectionPrefaceReceivedPromiseRef != null) {
- connectionPrefaceReceivedPromiseRef.compareAndSet(null, new
DefaultPromise<>(GlobalEventExecutor.INSTANCE));
- }
+ // ChannelInitializer#initChannel will be invoked by Netty client work
thread.
return bootstrap.connect();
}
@Override
protected void doConnect() throws RemotingException {
long start = System.currentTimeMillis();
+ // re-create channel initialized promise if necessary.
+ channelInitializedPromiseRef.compareAndSet(null, new
DefaultPromise<>(GlobalEventExecutor.INSTANCE));
super.doConnect();
waitConnectionPreface(start);
}
@@ -154,38 +171,60 @@ public final class NettyConnectionClient extends
AbstractNettyConnectionClient {
* @param start start time of doConnect in milliseconds.
*/
private void waitConnectionPreface(long start) throws RemotingException {
+ // await channel initialization to ensure connection preface received
promise had been created when necessary.
+ Promise<Void> channelInitializedPromise =
channelInitializedPromiseRef.get();
+ long retainedTimeout = getConnectTimeout() -
System.currentTimeMillis() + start;
+ boolean ret =
channelInitializedPromise.awaitUninterruptibly(retainedTimeout,
TimeUnit.MILLISECONDS);
+ // destroy channel initialized promise after used.
+ channelInitializedPromiseRef.set(null);
+ if (!ret || !channelInitializedPromise.isSuccess()) {
+ // 6-2 Client-side channel initialization timeout
+ RemotingException remotingException = new RemotingException(
+ this,
+ "client(url: " + getUrl() + ") failed to connect to server
" + getConnectAddress()
+ + " client-side channel initialization timeout " +
getConnectTimeout() + "ms (elapsed: "
+ + (System.currentTimeMillis() - start) + "ms) from
netty client "
+ + NetUtils.getLocalHost() + " using dubbo version
" + Version.getVersion());
+
+ logger.error(
+ TRANSPORT_CLIENT_CONNECT_TIMEOUT,
+ "provider crash",
+ "",
+ "Client-side channel initialization timeout",
+ remotingException);
+
+ throw remotingException;
+ }
+
+ // await if connection preface received promise is not null.
if (connectionPrefaceReceivedPromiseRef == null) {
return;
}
Promise<Void> connectionPrefaceReceivedPromise =
connectionPrefaceReceivedPromiseRef.get();
- if (connectionPrefaceReceivedPromise != null) {
- long retainedTimeout = getConnectTimeout() -
System.currentTimeMillis() + start;
- boolean ret =
connectionPrefaceReceivedPromise.awaitUninterruptibly(retainedTimeout,
TimeUnit.MILLISECONDS);
- // Only process once: destroy connectionPrefaceReceivedPromise
after used
- synchronized (this) {
- connectionPrefaceReceivedPromiseRef.set(null);
- }
- if (!ret || !connectionPrefaceReceivedPromise.isSuccess()) {
- // 6-2 Client-side connection preface timeout
- RemotingException remotingException = new RemotingException(
- this,
- "client(url: " + getUrl() + ") failed to connect to
server " + getConnectAddress()
- + " client-side connection preface timeout " +
getConnectTimeout()
- + "ms (elapsed: "
- + (System.currentTimeMillis() - start) + "ms)
from netty client "
- + NetUtils.getLocalHost()
- + " using dubbo version "
- + Version.getVersion());
-
- logger.error(
- TRANSPORT_CLIENT_CONNECT_TIMEOUT,
- "provider crash",
- "",
- "Client-side connection preface timeout",
- remotingException);
-
- throw remotingException;
- }
+ retainedTimeout = getConnectTimeout() - System.currentTimeMillis() +
start;
+ ret =
connectionPrefaceReceivedPromise.awaitUninterruptibly(retainedTimeout,
TimeUnit.MILLISECONDS);
+ // destroy connection preface received promise after used.
+ connectionPrefaceReceivedPromiseRef.set(null);
+ if (!ret || !connectionPrefaceReceivedPromise.isSuccess()) {
+ // 6-2 Client-side connection preface timeout
+ RemotingException remotingException = new RemotingException(
+ this,
+ "client(url: " + getUrl() + ") failed to connect to server
" + getConnectAddress()
+ + " client-side connection preface timeout " +
getConnectTimeout()
+ + "ms (elapsed: "
+ + (System.currentTimeMillis() - start) + "ms) from
netty client "
+ + NetUtils.getLocalHost()
+ + " using dubbo version "
+ + Version.getVersion());
+
+ logger.error(
+ TRANSPORT_CLIENT_CONNECT_TIMEOUT,
+ "provider crash",
+ "",
+ "Client-side connection preface timeout",
+ remotingException);
+
+ throw remotingException;
}
}
}