This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.2 by this push:
new b84c1386bb add ping pong for tri protocol (#12955)
b84c1386bb is described below
commit b84c1386bba2ac306a002b1a283cdb952b456047
Author: icodening <[email protected]>
AuthorDate: Wed Oct 11 11:17:41 2023 +0800
add ping pong for tri protocol (#12955)
* add ping pong for tri protocol
* add ping pong for tri protocol
* add ping pong for tri protocol
* add ping pong for tri protocol
---
.../transport/netty4/NettyConnectionClient.java | 14 ++--
.../rpc/protocol/tri/TripleHttp2Protocol.java | 2 +
.../rpc/protocol/tri/TriplePingPongHandler.java | 76 ++++++++++++++++++++++
3 files changed, 88 insertions(+), 4 deletions(-)
diff --git
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
index b1da49c8a5..e585bff61b 100644
---
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
+++
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
@@ -40,15 +40,18 @@ import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoop;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
+import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;
+import org.apache.dubbo.remoting.utils.UrlUtils;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
import static
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_CLIENT_CONNECT_TIMEOUT;
@@ -122,12 +125,15 @@ public class NettyConnectionClient extends
AbstractConnectionClient {
}
// pipeline.addLast("logging", new
LoggingHandler(LogLevel.INFO)); //for debug
- // TODO support IDLE
-// int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
+
+ int heartbeat = UrlUtils.getHeartbeat(getUrl());
+ pipeline.addLast("client-idle-handler", new
IdleStateHandler(heartbeat, 0, 0, MILLISECONDS));
+
pipeline.addLast("connectionHandler", connectionHandler);
NettyConfigOperator operator = new
NettyConfigOperator(nettyChannel, getChannelHandler());
protocol.configClientPipeline(getUrl(), operator,
nettySslContextOperator);
+ ch.closeFuture().addListener(channelFuture -> doClose());
// TODO support Socks5
}
});
@@ -271,7 +277,7 @@ public class NettyConnectionClient extends
AbstractConnectionClient {
try {
doConnect();
} catch (RemotingException e) {
- LOGGER.error(TRANSPORT_FAILED_RECONNECT, "", "", "Failed to
connect to server: " + getConnectAddress());
+ LOGGER.error(TRANSPORT_FAILED_RECONNECT, "", "", "Failed to
connect to server: " + getConnectAddress());
}
}
@@ -349,7 +355,7 @@ public class NettyConnectionClient extends
AbstractConnectionClient {
try {
connectionClient.doConnect();
} catch (RemotingException e) {
- LOGGER.error(TRANSPORT_FAILED_RECONNECT, "", "", "Failed
to connect to server: " + getConnectAddress());
+ LOGGER.error(TRANSPORT_FAILED_RECONNECT, "", "", "Failed
to connect to server: " + getConnectAddress());
}
}, 1L, TimeUnit.SECONDS);
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index 8daa1f600f..4a07c30706 100644
---
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -37,6 +37,7 @@ import org.apache.dubbo.remoting.api.AbstractWireProtocol;
import org.apache.dubbo.remoting.api.pu.ChannelHandlerPretender;
import org.apache.dubbo.remoting.api.pu.ChannelOperator;
import org.apache.dubbo.remoting.api.ssl.ContextOperator;
+import org.apache.dubbo.remoting.utils.UrlUtils;
import org.apache.dubbo.rpc.HeaderFilter;
import org.apache.dubbo.rpc.executor.ExecutorSupport;
import org.apache.dubbo.rpc.model.FrameworkModel;
@@ -165,6 +166,7 @@ public class TripleHttp2Protocol extends
AbstractWireProtocol implements ScopeMo
List<ChannelHandler> handlers = new ArrayList<>();
handlers.add(new ChannelHandlerPretender(codec));
handlers.add(new ChannelHandlerPretender(handler));
+ handlers.add(new ChannelHandlerPretender(new
TriplePingPongHandler(UrlUtils.getCloseTimeout(url))));
handlers.add(new ChannelHandlerPretender(new TripleTailHandler()));
operator.configChannelHandler(handlers);
}
diff --git
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriplePingPongHandler.java
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriplePingPongHandler.java
new file mode 100644
index 0000000000..cffb6ba10e
--- /dev/null
+++
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriplePingPongHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http2.DefaultHttp2PingFrame;
+import io.netty.handler.codec.http2.Http2PingFrame;
+import io.netty.handler.timeout.IdleStateEvent;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+public class TriplePingPongHandler extends ChannelDuplexHandler {
+
+ private final long pingAckTimeout;
+
+ private ScheduledFuture<?> pingAckTimeoutFuture;
+
+ public TriplePingPongHandler(long pingAckTimeout) {
+ this.pingAckTimeout = pingAckTimeout;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
+ if (!(msg instanceof Http2PingFrame) || pingAckTimeoutFuture == null) {
+ super.channelRead(ctx, msg);
+ return;
+ }
+ //cancel task when read anything, include http2 ping ack
+ pingAckTimeoutFuture.cancel(true);
+ pingAckTimeoutFuture = null;
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
+ if (!(evt instanceof IdleStateEvent)) {
+ ctx.fireUserEventTriggered(evt);
+ return;
+ }
+ ctx.writeAndFlush(new DefaultHttp2PingFrame(0));
+ if (pingAckTimeoutFuture == null) {
+ pingAckTimeoutFuture = ctx.executor().schedule(new
CloseChannelTask(ctx), pingAckTimeout, TimeUnit.MILLISECONDS);
+ }
+ //not null means last ping ack not received
+ }
+
+ private static class CloseChannelTask implements Runnable {
+
+ private final ChannelHandlerContext ctx;
+
+ public CloseChannelTask(ChannelHandlerContext ctx) {
+ this.ctx = ctx;
+ }
+
+ @Override
+ public void run() {
+ ctx.close();
+ }
+ }
+
+}