This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/dev by this push:
     new fd5a182cd6 Add RPC metrics (#16121)
fd5a182cd6 is described below

commit fd5a182cd6da4c1d5f90323fb44af4162be987ef
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Jun 11 10:57:23 2024 +0800

    Add RPC metrics (#16121)
---
 docs/docs/en/guide/metrics/metrics.md              |  5 ++
 docs/docs/zh/guide/metrics/metrics.md              |  5 ++
 .../dolphinscheduler-extract-base/pom.xml          |  7 ++
 .../dolphinscheduler/extract/base/RpcMethod.java   |  2 +-
 .../base/{RpcMethod.java => SyncRequestDto.java}   | 25 +++---
 .../extract/base/client/NettyClientHandler.java    |  2 +-
 .../extract/base/client/NettyRemotingClient.java   | 83 ++++++++++++-------
 .../base/client/SyncClientMethodInvoker.java       |  9 ++-
 .../extract/base/config/NettyClientConfig.java     | 12 +++
 .../extract/base/config/NettyServerConfig.java     |  8 ++
 .../ClientSyncDurationMetrics.java}                | 58 +++++--------
 .../ClientSyncExceptionMetrics.java}               | 51 +++++-------
 .../extract/base/metrics/RpcMetrics.java           | 94 ++++++++++++++++++++++
 .../base/server/JdkDynamicServerHandler.java       |  7 +-
 .../extract/base/server/NettyRemotingServer.java   |  3 +-
 .../extract/base/utils/Constants.java              |  4 -
 .../extract/base/metrics/RpcMetricsTest.java       | 70 ++++++++++++++++
 17 files changed, 326 insertions(+), 119 deletions(-)

diff --git a/docs/docs/en/guide/metrics/metrics.md 
b/docs/docs/en/guide/metrics/metrics.md
index 3470340465..9ba7cc73e7 100644
--- a/docs/docs/en/guide/metrics/metrics.md
+++ b/docs/docs/en/guide/metrics/metrics.md
@@ -91,6 +91,11 @@ For example, you can get the master metrics by `curl 
http://localhost:5679/actua
   - stop: the number of stopped workflow instances
   - failover: the number of workflow instance fail-overs
 
+### RPC Related Metrics
+
+- ds.rpc.client.sync.request.exception.count: (counter) the number of 
exceptions occurred in sync rpc requests
+- ds.rpc.client.sync.request.duration.time: (histogram) the time cost of sync 
rpc requests
+
 ### Master Server Metrics
 
 - ds.master.overload.count: (counter) the number of times the master overloaded
diff --git a/docs/docs/zh/guide/metrics/metrics.md 
b/docs/docs/zh/guide/metrics/metrics.md
index aa620b873e..4865a14222 100644
--- a/docs/docs/zh/guide/metrics/metrics.md
+++ b/docs/docs/zh/guide/metrics/metrics.md
@@ -91,6 +91,11 @@ metrics exporter端口`server.port`是在application.yaml里定义的: 
master: `
   - stop:停止的工作流实例数量
   - failover:容错的工作流实例数量
 
+### RPC相关指标
+
+- ds.rpc.client.sync.request.exception.count: (counter) 同步rpc请求异常数
+- ds.rpc.client.sync.request.duration.time: (histogram) 同步rpc请求耗时
+
 ### Master Server指标
 
 - ds.master.overload.count: (counter) master过载次数
diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/pom.xml 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/pom.xml
index 9501d55706..0554ea3c3c 100644
--- a/dolphinscheduler-extract/dolphinscheduler-extract-base/pom.xml
+++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/pom.xml
@@ -47,6 +47,13 @@
             <artifactId>dolphinscheduler-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.dolphinscheduler</groupId>
+            <artifactId>dolphinscheduler-meter</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>io.netty</groupId>
             <artifactId>netty-all</artifactId>
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
index cd1b778c9a..2dcf689ad8 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
@@ -28,6 +28,6 @@ import java.lang.annotation.Target;
 @Documented
 public @interface RpcMethod {
 
-    long timeout() default 3000L;
+    long timeout() default -1;
 
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/SyncRequestDto.java
similarity index 66%
copy from 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
copy to 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/SyncRequestDto.java
index cd1b778c9a..7df649cd57 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/SyncRequestDto.java
@@ -17,17 +17,22 @@
 
 package org.apache.dolphinscheduler.extract.base;
 
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
+import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
+import org.apache.dolphinscheduler.extract.base.utils.Host;
 
-@Target(ElementType.METHOD)
-@Retention(RetentionPolicy.RUNTIME)
-@Documented
-public @interface RpcMethod {
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
 
-    long timeout() default 3000L;
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class SyncRequestDto {
+
+    private Host serverHost;
+    private Transporter transporter;
+    private long timeoutMillis;
 
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyClientHandler.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyClientHandler.java
index be570eb577..5f50e2441c 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyClientHandler.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyClientHandler.java
@@ -77,7 +77,7 @@ public class NettyClientHandler extends 
ChannelInboundHandlerAdapter {
                     
.writeAndFlush(HeartBeatTransporter.getHeartBeatTransporter())
                     .addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
             if (log.isDebugEnabled()) {
-                log.debug("Client send heart beat to: {}", 
ChannelUtils.getRemoteAddress(ctx.channel()));
+                log.info("Client send heartbeat to: {}", 
ctx.channel().remoteAddress());
             }
         } else {
             super.userEventTriggered(ctx, evt);
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
index 8ded68669d..4aea4d6dfe 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
@@ -19,14 +19,17 @@ package org.apache.dolphinscheduler.extract.base.client;
 
 import org.apache.dolphinscheduler.common.thread.ThreadUtils;
 import org.apache.dolphinscheduler.extract.base.IRpcResponse;
+import org.apache.dolphinscheduler.extract.base.SyncRequestDto;
 import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
 import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
 import 
org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException;
 import org.apache.dolphinscheduler.extract.base.future.ResponseFuture;
+import 
org.apache.dolphinscheduler.extract.base.metrics.ClientSyncDurationMetrics;
+import 
org.apache.dolphinscheduler.extract.base.metrics.ClientSyncExceptionMetrics;
+import org.apache.dolphinscheduler.extract.base.metrics.RpcMetrics;
 import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
 import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder;
 import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder;
-import org.apache.dolphinscheduler.extract.base.utils.Constants;
 import org.apache.dolphinscheduler.extract.base.utils.Host;
 import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
 
@@ -97,8 +100,8 @@ public class NettyRemotingClient implements AutoCloseable {
                         ch.pipeline()
                                 .addLast("client-idle-handler",
                                         new IdleStateHandler(
-                                                
Constants.NETTY_CLIENT_HEART_BEAT_TIME,
                                                 0,
+                                                
clientConfig.getHeartBeatIntervalMillis(),
                                                 0,
                                                 TimeUnit.MILLISECONDS))
                                 .addLast(new TransporterDecoder(), 
clientHandler, new TransporterEncoder());
@@ -107,38 +110,60 @@ public class NettyRemotingClient implements AutoCloseable 
{
         isStarted.compareAndSet(false, true);
     }
 
-    public IRpcResponse sendSync(final Host host,
-                                 final Transporter transporter,
-                                 final long timeoutMillis) throws 
InterruptedException, RemotingException {
-        final Channel channel = getOrCreateChannel(host);
-        if (channel == null) {
-            throw new RemotingException(String.format("connect to : %s fail", 
host));
-        }
+    public IRpcResponse sendSync(SyncRequestDto syncRequestDto) throws 
RemotingException {
+        long start = System.currentTimeMillis();
+
+        final Host host = syncRequestDto.getServerHost();
+        final Transporter transporter = syncRequestDto.getTransporter();
+        final long timeoutMillis = syncRequestDto.getTimeoutMillis() < 0 ? 
clientConfig.getConnectTimeoutMillis()
+                : syncRequestDto.getTimeoutMillis();
         final long opaque = transporter.getHeader().getOpaque();
-        final ResponseFuture responseFuture = new ResponseFuture(opaque, 
timeoutMillis);
-        channel.writeAndFlush(transporter).addListener(future -> {
-            if (future.isSuccess()) {
-                responseFuture.setSendOk(true);
-                return;
-            } else {
-                responseFuture.setSendOk(false);
+
+        try {
+            final Channel channel = getOrCreateChannel(host);
+            if (channel == null) {
+                throw new RemotingException(String.format("connect to : %s 
fail", host));
             }
-            responseFuture.setCause(future.cause());
-            responseFuture.putResponse(null);
-            log.error("Send Sync request {} to host {} failed", transporter, 
host, responseFuture.getCause());
-        });
-        /*
-         * sync wait for result
-         */
-        IRpcResponse iRpcResponse = responseFuture.waitResponse();
-        if (iRpcResponse == null) {
-            if (responseFuture.isSendOK()) {
-                throw new RemotingTimeoutException(host.toString(), 
timeoutMillis, responseFuture.getCause());
+            final ResponseFuture responseFuture = new ResponseFuture(opaque, 
timeoutMillis);
+            channel.writeAndFlush(transporter).addListener(future -> {
+                if (future.isSuccess()) {
+                    responseFuture.setSendOk(true);
+                    return;
+                } else {
+                    responseFuture.setSendOk(false);
+                }
+                responseFuture.setCause(future.cause());
+                responseFuture.putResponse(null);
+                log.error("Send Sync request {} to host {} failed", 
transporter, host, responseFuture.getCause());
+            });
+            /*
+             * sync wait for result
+             */
+            IRpcResponse iRpcResponse = responseFuture.waitResponse();
+            if (iRpcResponse == null) {
+                if (responseFuture.isSendOK()) {
+                    throw new RemotingTimeoutException(host.toString(), 
timeoutMillis, responseFuture.getCause());
+                } else {
+                    throw new RemotingException(host.toString(), 
responseFuture.getCause());
+                }
+            }
+            return iRpcResponse;
+        } catch (Exception ex) {
+            ClientSyncExceptionMetrics clientSyncExceptionMetrics = 
ClientSyncExceptionMetrics
+                    .of(syncRequestDto)
+                    .withThrowable(ex);
+            
RpcMetrics.recordClientSyncRequestException(clientSyncExceptionMetrics);
+            if (ex instanceof RemotingException) {
+                throw (RemotingException) ex;
             } else {
-                throw new RemotingException(host.toString(), 
responseFuture.getCause());
+                throw new RemotingException(ex);
             }
+        } finally {
+            ClientSyncDurationMetrics clientSyncDurationMetrics = 
ClientSyncDurationMetrics
+                    .of(syncRequestDto)
+                    .withMilliseconds(System.currentTimeMillis() - start);
+            
RpcMetrics.recordClientSyncRequestDuration(clientSyncDurationMetrics);
         }
-        return iRpcResponse;
     }
 
     Channel getOrCreateChannel(Host host) {
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
index 4731a22d0a..ccbdad945b 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.extract.base.client;
 import org.apache.dolphinscheduler.extract.base.IRpcResponse;
 import org.apache.dolphinscheduler.extract.base.RpcMethod;
 import org.apache.dolphinscheduler.extract.base.StandardRpcRequest;
+import org.apache.dolphinscheduler.extract.base.SyncRequestDto;
 import 
org.apache.dolphinscheduler.extract.base.exception.MethodInvocationException;
 import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
 import org.apache.dolphinscheduler.extract.base.protocal.TransporterHeader;
@@ -41,8 +42,12 @@ class SyncClientMethodInvoker extends 
AbstractClientMethodInvoker {
         
transporter.setBody(JsonSerializer.serialize(StandardRpcRequest.of(args)));
         transporter.setHeader(TransporterHeader.of(methodIdentifier));
 
-        IRpcResponse iRpcResponse =
-                nettyRemotingClient.sendSync(serverHost, transporter, 
sync.timeout());
+        SyncRequestDto syncRequestDto = SyncRequestDto.builder()
+                .timeoutMillis(sync.timeout())
+                .transporter(transporter)
+                .serverHost(serverHost)
+                .build();
+        IRpcResponse iRpcResponse = 
nettyRemotingClient.sendSync(syncRequestDto);
         if (!iRpcResponse.isSuccess()) {
             throw MethodInvocationException.of(iRpcResponse.getMessage());
         }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyClientConfig.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyClientConfig.java
index a41a439b5c..a00ff540f4 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyClientConfig.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyClientConfig.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.extract.base.config;
 
+import java.time.Duration;
+
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -64,4 +66,14 @@ public class NettyClientConfig {
     @Builder.Default
     private int connectTimeoutMillis = 3000;
 
+    /**
+     * Will send {@link 
org.apache.dolphinscheduler.extract.base.protocal.HeartBeatTransporter} to 
netty server every
+     * heartBeatIntervalMillis, used to keep the {@link 
io.netty.channel.Channel} active.
+     */
+    @Builder.Default
+    private long heartBeatIntervalMillis = Duration.ofSeconds(10).toMillis();
+
+    @Builder.Default
+    private int defaultRpcTimeoutMillis = 10_000;
+
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java
index 9d4a2ee3d2..6a38b08c15 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java
@@ -17,6 +17,8 @@
 
 package org.apache.dolphinscheduler.extract.base.config;
 
+import java.time.Duration;
+
 import lombok.AllArgsConstructor;
 import lombok.Builder;
 import lombok.Data;
@@ -66,6 +68,12 @@ public class NettyServerConfig {
     @Builder.Default
     private int workerThread = Runtime.getRuntime().availableProcessors() * 2;
 
+    /**
+     * If done's receive any data from a {@link io.netty.channel.Channel} 
during 180s then will close it.
+     */
+    @Builder.Default
+    private long connectionIdleTime = Duration.ofSeconds(60).toMillis();
+
     /**
      * listen port
      */
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/ClientSyncDurationMetrics.java
similarity index 53%
copy from 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java
copy to 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/ClientSyncDurationMetrics.java
index 9d4a2ee3d2..60124363b7 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/ClientSyncDurationMetrics.java
@@ -15,7 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.extract.base.config;
+package org.apache.dolphinscheduler.extract.base.metrics;
+
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.extract.base.SyncRequestDto;
+import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
 
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,49 +30,27 @@ import lombok.NoArgsConstructor;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class NettyServerConfig {
-
-    private String serverName;
-
-    /**
-     * init the server connectable queue
-     */
-    @Builder.Default
-    private int soBacklog = 1024;
+public class ClientSyncDurationMetrics {
 
-    /**
-     * whether tpc delay
-     */
-    @Builder.Default
-    private boolean tcpNoDelay = true;
+    private Transporter transporter;
 
-    /**
-     * whether keep alive
-     */
-    @Builder.Default
-    private boolean soKeepalive = true;
+    private long milliseconds;
 
-    /**
-     * send buffer size
-     */
     @Builder.Default
-    private int sendBufferSize = 65535;
+    private String clientHost = NetUtils.getHost();
 
-    /**
-     * receive buffer size
-     */
-    @Builder.Default
-    private int receiveBufferSize = 65535;
+    private String serverHost;
 
-    /**
-     * worker threads,default get machine cpus
-     */
-    @Builder.Default
-    private int workerThread = Runtime.getRuntime().availableProcessors() * 2;
+    public static ClientSyncDurationMetrics of(SyncRequestDto syncRequestDto) {
+        return ClientSyncDurationMetrics.builder()
+                .transporter(syncRequestDto.getTransporter())
+                .serverHost(syncRequestDto.getServerHost().getIp())
+                .build();
+    }
 
-    /**
-     * listen port
-     */
-    private int listenPort;
+    public ClientSyncDurationMetrics withMilliseconds(long milliseconds) {
+        this.milliseconds = milliseconds;
+        return this;
+    }
 
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyClientConfig.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/ClientSyncExceptionMetrics.java
similarity index 55%
copy from 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyClientConfig.java
copy to 
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/ClientSyncExceptionMetrics.java
index a41a439b5c..6f91132547 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyClientConfig.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/ClientSyncExceptionMetrics.java
@@ -15,7 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.dolphinscheduler.extract.base.config;
+package org.apache.dolphinscheduler.extract.base.metrics;
+
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.extract.base.SyncRequestDto;
+import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
 
 import lombok.AllArgsConstructor;
 import lombok.Builder;
@@ -26,42 +30,27 @@ import lombok.NoArgsConstructor;
 @Builder
 @NoArgsConstructor
 @AllArgsConstructor
-public class NettyClientConfig {
+public class ClientSyncExceptionMetrics {
 
-    /**
-     * worker threads,default get machine cpus
-     */
-    @Builder.Default
-    private int workerThreads = Runtime.getRuntime().availableProcessors() * 2;
+    private Transporter transporter;
 
-    /**
-     * whether tpc delay
-     */
-    @Builder.Default
-    private boolean tcpNoDelay = true;
+    private String clientHost;
 
-    /**
-     * whether keep alive
-     */
     @Builder.Default
-    private boolean soKeepalive = true;
+    private String serverHost = NetUtils.getHost();
 
-    /**
-     * send buffer size
-     */
-    @Builder.Default
-    private int sendBufferSize = 65535;
+    private Throwable throwable;
 
-    /**
-     * receive buffer size
-     */
-    @Builder.Default
-    private int receiveBufferSize = 65535;
+    public static ClientSyncExceptionMetrics of(SyncRequestDto syncRequestDto) 
{
+        return ClientSyncExceptionMetrics.builder()
+                .transporter(syncRequestDto.getTransporter())
+                .build();
 
-    /**
-     * connect timeout millis
-     */
-    @Builder.Default
-    private int connectTimeoutMillis = 3000;
+    }
+
+    public ClientSyncExceptionMetrics withThrowable(Throwable throwable) {
+        this.throwable = throwable;
+        return this;
+    }
 
 }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/RpcMetrics.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/RpcMetrics.java
new file mode 100644
index 0000000000..c2dbbfefa6
--- /dev/null
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/RpcMetrics.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.extract.base.metrics;
+
+import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
+import org.apache.dolphinscheduler.extract.base.protocal.TransporterHeader;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.Timer;
+
+public class RpcMetrics {
+
+    private static final Map<String, Timer> rpcRequestDurationTimer = new 
ConcurrentHashMap<>();
+
+    private static final Map<String, Counter> rpcRequestExceptionCounter = new 
ConcurrentHashMap<>();
+
+    public static void 
recordClientSyncRequestException(ClientSyncExceptionMetrics 
clientSyncExceptionMetrics) {
+        recordClientSyncRequestException(
+                clientSyncExceptionMetrics.getThrowable(),
+                Optional.of(clientSyncExceptionMetrics)
+                        .map(ClientSyncExceptionMetrics::getTransporter)
+                        .map(Transporter::getHeader)
+                        .map(TransporterHeader::getMethodIdentifier)
+                        .orElseGet(() -> "unknown"),
+                clientSyncExceptionMetrics.getClientHost(),
+                clientSyncExceptionMetrics.getServerHost());
+    }
+
+    public static void recordClientSyncRequestException(final Throwable 
throwable,
+                                                        final String 
methodName,
+                                                        final String 
clientHost,
+                                                        final String 
serverHost) {
+        final String exceptionType = throwable == null ? "unknown" : 
throwable.getClass().getSimpleName();
+        final Counter counter = 
rpcRequestExceptionCounter.computeIfAbsent(exceptionType,
+                (et) -> 
Counter.builder("ds.rpc.client.sync.request.exception.count")
+                        .tag("method_name", methodName)
+                        .tag("client_host", clientHost)
+                        .tag("server_host", serverHost)
+                        .tag("exception_name", et)
+                        .description("rpc sync request exception counter for 
exception type: " + et)
+                        .register(Metrics.globalRegistry));
+        counter.increment();
+    }
+
+    public static void 
recordClientSyncRequestDuration(ClientSyncDurationMetrics 
clientSyncDurationMetrics) {
+        recordClientSyncRequestDuration(
+                Optional.of(clientSyncDurationMetrics)
+                        .map(ClientSyncDurationMetrics::getTransporter)
+                        .map(Transporter::getHeader)
+                        .map(TransporterHeader::getMethodIdentifier)
+                        .orElseGet(() -> "unknown"),
+                clientSyncDurationMetrics.getMilliseconds(),
+                clientSyncDurationMetrics.getClientHost(),
+                clientSyncDurationMetrics.getServerHost());
+    }
+
+    public static void recordClientSyncRequestDuration(final String methodName,
+                                                       final long milliseconds,
+                                                       final String clientHost,
+                                                       final String 
serverHost) {
+        rpcRequestDurationTimer.computeIfAbsent(methodName,
+                (method) -> 
Timer.builder("ds.rpc.client.sync.request.duration.time")
+                        .tag("method_name", method)
+                        .tag("client_host", clientHost)
+                        .tag("server_host", serverHost)
+                        .publishPercentiles(0.5, 0.75, 0.95, 0.99)
+                        .publishPercentileHistogram()
+                        .description("time cost of sync rpc request, unit ms")
+                        .register(Metrics.globalRegistry))
+                .record(milliseconds, TimeUnit.MILLISECONDS);
+    }
+
+}
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
index f57ff0b609..4f9a7034c8 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
@@ -38,6 +38,7 @@ import io.netty.channel.ChannelConfig;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 
 @Slf4j
@@ -160,7 +161,11 @@ class JdkDynamicServerHandler extends 
ChannelInboundHandlerAdapter {
     @Override
     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) 
throws Exception {
         if (evt instanceof IdleStateEvent) {
-            ctx.channel().close();
+            IdleStateEvent event = (IdleStateEvent) evt;
+            if (event.state() == IdleState.READER_IDLE) {
+                log.warn("Not receive heart beat from: {}, will close the 
channel", ctx.channel().remoteAddress());
+                ctx.close();
+            }
         } else {
             super.userEventTriggered(ctx, evt);
         }
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
index 9beeaced3d..9ebf802b1e 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
@@ -22,7 +22,6 @@ import 
org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
 import org.apache.dolphinscheduler.extract.base.exception.RemoteException;
 import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder;
 import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder;
-import org.apache.dolphinscheduler.extract.base.utils.Constants;
 import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
 
 import java.util.concurrent.ExecutorService;
@@ -135,7 +134,7 @@ class NettyRemotingServer {
                 .addLast("encoder", new TransporterEncoder())
                 .addLast("decoder", new TransporterDecoder())
                 .addLast("server-idle-handle",
-                        new IdleStateHandler(0, 0, 
Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS))
+                        new 
IdleStateHandler(serverConfig.getConnectionIdleTime(), 0, 0, 
TimeUnit.MILLISECONDS))
                 .addLast("handler", channelHandler);
     }
 
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/utils/Constants.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/utils/Constants.java
index 76e3872d31..94b92b2539 100644
--- 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/utils/Constants.java
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/utils/Constants.java
@@ -35,10 +35,6 @@ public class Constants {
 
     public static final String SLASH = "/";
 
-    public static final int NETTY_SERVER_HEART_BEAT_TIME = 1000 * 60 * 3 + 
1000;
-
-    public static final int NETTY_CLIENT_HEART_BEAT_TIME = 1000 * 6;
-
     /**
      * charset
      */
diff --git 
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/metrics/RpcMetricsTest.java
 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/metrics/RpcMetricsTest.java
new file mode 100644
index 0000000000..352e218206
--- /dev/null
+++ 
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/metrics/RpcMetricsTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.extract.base.metrics;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+
+class RpcMetricsTest {
+
+    @BeforeEach
+    public void setup() {
+        Metrics.globalRegistry.clear();
+        Metrics.addRegistry(new SimpleMeterRegistry());
+    }
+
+    @Test
+    void testRecordClientSyncRequestException() {
+        
assertThat(Metrics.globalRegistry.find("ds.rpc.client.sync.request.exception.count").counter()).isNull();
+
+        String clientHost = NetUtils.getHost();
+        String serverHost = NetUtils.getHost();
+
+        RpcMetrics.recordClientSyncRequestException(
+                new IllegalArgumentException("id is null"), "getById", 
clientHost, serverHost);
+        RpcMetrics.recordClientSyncRequestException(
+                new IllegalArgumentException("name is null"), "getByName", 
clientHost, serverHost);
+        RpcMetrics.recordClientSyncRequestException(
+                new IllegalArgumentException("age is null"), "getByAge", 
clientHost, serverHost);
+        RpcMetrics.recordClientSyncRequestException(new 
UnsupportedOperationException("update id is not supported"),
+                "updateById", clientHost, serverHost);
+        
assertThat(Metrics.globalRegistry.find("ds.rpc.client.sync.request.exception.count").counter()).isNotNull();
+    }
+
+    @Test
+    void testRecordRpcRequestDuration() {
+        
assertThat(Metrics.globalRegistry.find("ds.rpc.client.sync.request.duration.time").timer()).isNull();
+
+        String clientHost = NetUtils.getHost();
+        String serverHost = NetUtils.getHost();
+
+        RpcMetrics.recordClientSyncRequestDuration("getById", 100, clientHost, 
serverHost);
+        RpcMetrics.recordClientSyncRequestDuration("getByName", 200, 
clientHost, serverHost);
+        RpcMetrics.recordClientSyncRequestDuration("getByAge", 300, 
clientHost, serverHost);
+        RpcMetrics.recordClientSyncRequestDuration("updateById", 400, 
clientHost, serverHost);
+        
assertThat(Metrics.globalRegistry.find("ds.rpc.client.sync.request.duration.time").timer()).isNotNull();
+    }
+
+}


Reply via email to