This is an automated email from the ASF dual-hosted git repository.

earthchen pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new b84c1386bb add ping pong for tri protocol (#12955)
b84c1386bb is described below

commit b84c1386bba2ac306a002b1a283cdb952b456047
Author: icodening <[email protected]>
AuthorDate: Wed Oct 11 11:17:41 2023 +0800

    add ping pong for tri protocol (#12955)
    
    * add ping pong for tri protocol
    
    * add ping pong for tri protocol
    
    * add ping pong for tri protocol
    
    * add ping pong for tri protocol
---
 .../transport/netty4/NettyConnectionClient.java    | 14 ++--
 .../rpc/protocol/tri/TripleHttp2Protocol.java      |  2 +
 .../rpc/protocol/tri/TriplePingPongHandler.java    | 76 ++++++++++++++++++++++
 3 files changed, 88 insertions(+), 4 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
index b1da49c8a5..e585bff61b 100644
--- 
a/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
+++ 
b/dubbo-remoting/dubbo-remoting-netty4/src/main/java/org/apache/dubbo/remoting/transport/netty4/NettyConnectionClient.java
@@ -40,15 +40,18 @@ import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoop;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.handler.ssl.SslContext;
+import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.AttributeKey;
 import io.netty.util.concurrent.DefaultPromise;
 import io.netty.util.concurrent.GlobalEventExecutor;
 import io.netty.util.concurrent.Promise;
+import org.apache.dubbo.remoting.utils.UrlUtils;
 
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static 
org.apache.dubbo.common.constants.CommonConstants.DEFAULT_CLIENT_THREADPOOL;
 import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
 import static 
org.apache.dubbo.common.constants.LoggerCodeConstants.TRANSPORT_CLIENT_CONNECT_TIMEOUT;
@@ -122,12 +125,15 @@ public class NettyConnectionClient extends 
AbstractConnectionClient {
                 }
 
 //                pipeline.addLast("logging", new 
LoggingHandler(LogLevel.INFO)); //for debug
-                // TODO support IDLE
-//                int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());
+
+                int heartbeat = UrlUtils.getHeartbeat(getUrl());
+                pipeline.addLast("client-idle-handler", new 
IdleStateHandler(heartbeat, 0, 0, MILLISECONDS));
+
                 pipeline.addLast("connectionHandler", connectionHandler);
 
                 NettyConfigOperator operator = new 
NettyConfigOperator(nettyChannel, getChannelHandler());
                 protocol.configClientPipeline(getUrl(), operator, 
nettySslContextOperator);
+                ch.closeFuture().addListener(channelFuture -> doClose());
                 // TODO support Socks5
             }
         });
@@ -271,7 +277,7 @@ public class NettyConnectionClient extends 
AbstractConnectionClient {
             try {
                 doConnect();
             } catch (RemotingException e) {
-                LOGGER.error(TRANSPORT_FAILED_RECONNECT, "", "",  "Failed to 
connect to server: " + getConnectAddress());
+                LOGGER.error(TRANSPORT_FAILED_RECONNECT, "", "", "Failed to 
connect to server: " + getConnectAddress());
             }
         }
 
@@ -349,7 +355,7 @@ public class NettyConnectionClient extends 
AbstractConnectionClient {
                 try {
                     connectionClient.doConnect();
                 } catch (RemotingException e) {
-                    LOGGER.error(TRANSPORT_FAILED_RECONNECT, "", "",  "Failed 
to connect to server: " + getConnectAddress());
+                    LOGGER.error(TRANSPORT_FAILED_RECONNECT, "", "", "Failed 
to connect to server: " + getConnectAddress());
                 }
             }, 1L, TimeUnit.SECONDS);
         }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
index 8daa1f600f..4a07c30706 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TripleHttp2Protocol.java
@@ -37,6 +37,7 @@ import org.apache.dubbo.remoting.api.AbstractWireProtocol;
 import org.apache.dubbo.remoting.api.pu.ChannelHandlerPretender;
 import org.apache.dubbo.remoting.api.pu.ChannelOperator;
 import org.apache.dubbo.remoting.api.ssl.ContextOperator;
+import org.apache.dubbo.remoting.utils.UrlUtils;
 import org.apache.dubbo.rpc.HeaderFilter;
 import org.apache.dubbo.rpc.executor.ExecutorSupport;
 import org.apache.dubbo.rpc.model.FrameworkModel;
@@ -165,6 +166,7 @@ public class TripleHttp2Protocol extends 
AbstractWireProtocol implements ScopeMo
         List<ChannelHandler> handlers = new ArrayList<>();
         handlers.add(new ChannelHandlerPretender(codec));
         handlers.add(new ChannelHandlerPretender(handler));
+        handlers.add(new ChannelHandlerPretender(new 
TriplePingPongHandler(UrlUtils.getCloseTimeout(url))));
         handlers.add(new ChannelHandlerPretender(new TripleTailHandler()));
         operator.configChannelHandler(handlers);
     }
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriplePingPongHandler.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriplePingPongHandler.java
new file mode 100644
index 0000000000..cffb6ba10e
--- /dev/null
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/TriplePingPongHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.rpc.protocol.tri;
+
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.http2.DefaultHttp2PingFrame;
+import io.netty.handler.codec.http2.Http2PingFrame;
+import io.netty.handler.timeout.IdleStateEvent;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+public class TriplePingPongHandler extends ChannelDuplexHandler {
+
+    private final long pingAckTimeout;
+
+    private ScheduledFuture<?> pingAckTimeoutFuture;
+
+    public TriplePingPongHandler(long pingAckTimeout) {
+        this.pingAckTimeout = pingAckTimeout;
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
+        if (!(msg instanceof Http2PingFrame) || pingAckTimeoutFuture == null) {
+            super.channelRead(ctx, msg);
+            return;
+        }
+        //cancel task when read anything, include http2 ping ack
+        pingAckTimeoutFuture.cancel(true);
+        pingAckTimeoutFuture = null;
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) 
throws Exception {
+        if (!(evt instanceof IdleStateEvent)) {
+            ctx.fireUserEventTriggered(evt);
+            return;
+        }
+        ctx.writeAndFlush(new DefaultHttp2PingFrame(0));
+        if (pingAckTimeoutFuture == null) {
+            pingAckTimeoutFuture = ctx.executor().schedule(new 
CloseChannelTask(ctx), pingAckTimeout, TimeUnit.MILLISECONDS);
+        }
+        //not null means last ping ack not received
+    }
+
+    private static class CloseChannelTask implements Runnable {
+
+        private final ChannelHandlerContext ctx;
+
+        public CloseChannelTask(ChannelHandlerContext ctx) {
+            this.ctx = ctx;
+        }
+
+        @Override
+        public void run() {
+            ctx.close();
+        }
+    }
+
+}

Reply via email to