This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/dev by this push:
new fd5a182cd6 Add RPC metrics (#16121)
fd5a182cd6 is described below
commit fd5a182cd6da4c1d5f90323fb44af4162be987ef
Author: Wenjun Ruan <[email protected]>
AuthorDate: Tue Jun 11 10:57:23 2024 +0800
Add RPC metrics (#16121)
---
docs/docs/en/guide/metrics/metrics.md | 5 ++
docs/docs/zh/guide/metrics/metrics.md | 5 ++
.../dolphinscheduler-extract-base/pom.xml | 7 ++
.../dolphinscheduler/extract/base/RpcMethod.java | 2 +-
.../base/{RpcMethod.java => SyncRequestDto.java} | 25 +++---
.../extract/base/client/NettyClientHandler.java | 2 +-
.../extract/base/client/NettyRemotingClient.java | 83 ++++++++++++-------
.../base/client/SyncClientMethodInvoker.java | 9 ++-
.../extract/base/config/NettyClientConfig.java | 12 +++
.../extract/base/config/NettyServerConfig.java | 8 ++
.../ClientSyncDurationMetrics.java} | 58 +++++--------
.../ClientSyncExceptionMetrics.java} | 51 +++++-------
.../extract/base/metrics/RpcMetrics.java | 94 ++++++++++++++++++++++
.../base/server/JdkDynamicServerHandler.java | 7 +-
.../extract/base/server/NettyRemotingServer.java | 3 +-
.../extract/base/utils/Constants.java | 4 -
.../extract/base/metrics/RpcMetricsTest.java | 70 ++++++++++++++++
17 files changed, 326 insertions(+), 119 deletions(-)
diff --git a/docs/docs/en/guide/metrics/metrics.md
b/docs/docs/en/guide/metrics/metrics.md
index 3470340465..9ba7cc73e7 100644
--- a/docs/docs/en/guide/metrics/metrics.md
+++ b/docs/docs/en/guide/metrics/metrics.md
@@ -91,6 +91,11 @@ For example, you can get the master metrics by `curl
http://localhost:5679/actua
- stop: the number of stopped workflow instances
- failover: the number of workflow instance fail-overs
+### RPC Related Metrics
+
+- ds.rpc.client.sync.request.exception.count: (counter) the number of
exceptions occurred in sync rpc requests
+- ds.rpc.client.sync.request.duration.time: (histogram) the time cost of sync
rpc requests
+
### Master Server Metrics
- ds.master.overload.count: (counter) the number of times the master overloaded
diff --git a/docs/docs/zh/guide/metrics/metrics.md
b/docs/docs/zh/guide/metrics/metrics.md
index aa620b873e..4865a14222 100644
--- a/docs/docs/zh/guide/metrics/metrics.md
+++ b/docs/docs/zh/guide/metrics/metrics.md
@@ -91,6 +91,11 @@ metrics exporter端口`server.port`是在application.yaml里定义的:
master: `
- stop:停止的工作流实例数量
- failover:容错的工作流实例数量
+### RPC相关指标
+
+- ds.rpc.client.sync.request.exception.count: (counter) 同步rpc请求异常数
+- ds.rpc.client.sync.request.duration.time: (histogram) 同步rpc请求耗时
+
### Master Server指标
- ds.master.overload.count: (counter) master过载次数
diff --git a/dolphinscheduler-extract/dolphinscheduler-extract-base/pom.xml
b/dolphinscheduler-extract/dolphinscheduler-extract-base/pom.xml
index 9501d55706..0554ea3c3c 100644
--- a/dolphinscheduler-extract/dolphinscheduler-extract-base/pom.xml
+++ b/dolphinscheduler-extract/dolphinscheduler-extract-base/pom.xml
@@ -47,6 +47,13 @@
<artifactId>dolphinscheduler-common</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.dolphinscheduler</groupId>
+ <artifactId>dolphinscheduler-meter</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
index cd1b778c9a..2dcf689ad8 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
@@ -28,6 +28,6 @@ import java.lang.annotation.Target;
@Documented
public @interface RpcMethod {
- long timeout() default 3000L;
+ long timeout() default -1;
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/SyncRequestDto.java
similarity index 66%
copy from
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
copy to
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/SyncRequestDto.java
index cd1b778c9a..7df649cd57 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/RpcMethod.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/SyncRequestDto.java
@@ -17,17 +17,22 @@
package org.apache.dolphinscheduler.extract.base;
-import java.lang.annotation.Documented;
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
+import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
+import org.apache.dolphinscheduler.extract.base.utils.Host;
-@Target(ElementType.METHOD)
-@Retention(RetentionPolicy.RUNTIME)
-@Documented
-public @interface RpcMethod {
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
- long timeout() default 3000L;
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class SyncRequestDto {
+
+ private Host serverHost;
+ private Transporter transporter;
+ private long timeoutMillis;
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyClientHandler.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyClientHandler.java
index be570eb577..5f50e2441c 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyClientHandler.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyClientHandler.java
@@ -77,7 +77,7 @@ public class NettyClientHandler extends
ChannelInboundHandlerAdapter {
.writeAndFlush(HeartBeatTransporter.getHeartBeatTransporter())
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
if (log.isDebugEnabled()) {
- log.debug("Client send heart beat to: {}",
ChannelUtils.getRemoteAddress(ctx.channel()));
+ log.info("Client send heartbeat to: {}",
ctx.channel().remoteAddress());
}
} else {
super.userEventTriggered(ctx, evt);
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
index 8ded68669d..4aea4d6dfe 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/NettyRemotingClient.java
@@ -19,14 +19,17 @@ package org.apache.dolphinscheduler.extract.base.client;
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
import org.apache.dolphinscheduler.extract.base.IRpcResponse;
+import org.apache.dolphinscheduler.extract.base.SyncRequestDto;
import org.apache.dolphinscheduler.extract.base.config.NettyClientConfig;
import org.apache.dolphinscheduler.extract.base.exception.RemotingException;
import
org.apache.dolphinscheduler.extract.base.exception.RemotingTimeoutException;
import org.apache.dolphinscheduler.extract.base.future.ResponseFuture;
+import
org.apache.dolphinscheduler.extract.base.metrics.ClientSyncDurationMetrics;
+import
org.apache.dolphinscheduler.extract.base.metrics.ClientSyncExceptionMetrics;
+import org.apache.dolphinscheduler.extract.base.metrics.RpcMetrics;
import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder;
-import org.apache.dolphinscheduler.extract.base.utils.Constants;
import org.apache.dolphinscheduler.extract.base.utils.Host;
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
@@ -97,8 +100,8 @@ public class NettyRemotingClient implements AutoCloseable {
ch.pipeline()
.addLast("client-idle-handler",
new IdleStateHandler(
-
Constants.NETTY_CLIENT_HEART_BEAT_TIME,
0,
+
clientConfig.getHeartBeatIntervalMillis(),
0,
TimeUnit.MILLISECONDS))
.addLast(new TransporterDecoder(),
clientHandler, new TransporterEncoder());
@@ -107,38 +110,60 @@ public class NettyRemotingClient implements AutoCloseable
{
isStarted.compareAndSet(false, true);
}
- public IRpcResponse sendSync(final Host host,
- final Transporter transporter,
- final long timeoutMillis) throws
InterruptedException, RemotingException {
- final Channel channel = getOrCreateChannel(host);
- if (channel == null) {
- throw new RemotingException(String.format("connect to : %s fail",
host));
- }
+ public IRpcResponse sendSync(SyncRequestDto syncRequestDto) throws
RemotingException {
+ long start = System.currentTimeMillis();
+
+ final Host host = syncRequestDto.getServerHost();
+ final Transporter transporter = syncRequestDto.getTransporter();
+ final long timeoutMillis = syncRequestDto.getTimeoutMillis() < 0 ?
clientConfig.getConnectTimeoutMillis()
+ : syncRequestDto.getTimeoutMillis();
final long opaque = transporter.getHeader().getOpaque();
- final ResponseFuture responseFuture = new ResponseFuture(opaque,
timeoutMillis);
- channel.writeAndFlush(transporter).addListener(future -> {
- if (future.isSuccess()) {
- responseFuture.setSendOk(true);
- return;
- } else {
- responseFuture.setSendOk(false);
+
+ try {
+ final Channel channel = getOrCreateChannel(host);
+ if (channel == null) {
+ throw new RemotingException(String.format("connect to : %s
fail", host));
}
- responseFuture.setCause(future.cause());
- responseFuture.putResponse(null);
- log.error("Send Sync request {} to host {} failed", transporter,
host, responseFuture.getCause());
- });
- /*
- * sync wait for result
- */
- IRpcResponse iRpcResponse = responseFuture.waitResponse();
- if (iRpcResponse == null) {
- if (responseFuture.isSendOK()) {
- throw new RemotingTimeoutException(host.toString(),
timeoutMillis, responseFuture.getCause());
+ final ResponseFuture responseFuture = new ResponseFuture(opaque,
timeoutMillis);
+ channel.writeAndFlush(transporter).addListener(future -> {
+ if (future.isSuccess()) {
+ responseFuture.setSendOk(true);
+ return;
+ } else {
+ responseFuture.setSendOk(false);
+ }
+ responseFuture.setCause(future.cause());
+ responseFuture.putResponse(null);
+ log.error("Send Sync request {} to host {} failed",
transporter, host, responseFuture.getCause());
+ });
+ /*
+ * sync wait for result
+ */
+ IRpcResponse iRpcResponse = responseFuture.waitResponse();
+ if (iRpcResponse == null) {
+ if (responseFuture.isSendOK()) {
+ throw new RemotingTimeoutException(host.toString(),
timeoutMillis, responseFuture.getCause());
+ } else {
+ throw new RemotingException(host.toString(),
responseFuture.getCause());
+ }
+ }
+ return iRpcResponse;
+ } catch (Exception ex) {
+ ClientSyncExceptionMetrics clientSyncExceptionMetrics =
ClientSyncExceptionMetrics
+ .of(syncRequestDto)
+ .withThrowable(ex);
+
RpcMetrics.recordClientSyncRequestException(clientSyncExceptionMetrics);
+ if (ex instanceof RemotingException) {
+ throw (RemotingException) ex;
} else {
- throw new RemotingException(host.toString(),
responseFuture.getCause());
+ throw new RemotingException(ex);
}
+ } finally {
+ ClientSyncDurationMetrics clientSyncDurationMetrics =
ClientSyncDurationMetrics
+ .of(syncRequestDto)
+ .withMilliseconds(System.currentTimeMillis() - start);
+
RpcMetrics.recordClientSyncRequestDuration(clientSyncDurationMetrics);
}
- return iRpcResponse;
}
Channel getOrCreateChannel(Host host) {
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
index 4731a22d0a..ccbdad945b 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/client/SyncClientMethodInvoker.java
@@ -20,6 +20,7 @@ package org.apache.dolphinscheduler.extract.base.client;
import org.apache.dolphinscheduler.extract.base.IRpcResponse;
import org.apache.dolphinscheduler.extract.base.RpcMethod;
import org.apache.dolphinscheduler.extract.base.StandardRpcRequest;
+import org.apache.dolphinscheduler.extract.base.SyncRequestDto;
import
org.apache.dolphinscheduler.extract.base.exception.MethodInvocationException;
import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterHeader;
@@ -41,8 +42,12 @@ class SyncClientMethodInvoker extends
AbstractClientMethodInvoker {
transporter.setBody(JsonSerializer.serialize(StandardRpcRequest.of(args)));
transporter.setHeader(TransporterHeader.of(methodIdentifier));
- IRpcResponse iRpcResponse =
- nettyRemotingClient.sendSync(serverHost, transporter,
sync.timeout());
+ SyncRequestDto syncRequestDto = SyncRequestDto.builder()
+ .timeoutMillis(sync.timeout())
+ .transporter(transporter)
+ .serverHost(serverHost)
+ .build();
+ IRpcResponse iRpcResponse =
nettyRemotingClient.sendSync(syncRequestDto);
if (!iRpcResponse.isSuccess()) {
throw MethodInvocationException.of(iRpcResponse.getMessage());
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyClientConfig.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyClientConfig.java
index a41a439b5c..a00ff540f4 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyClientConfig.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyClientConfig.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.extract.base.config;
+import java.time.Duration;
+
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -64,4 +66,14 @@ public class NettyClientConfig {
@Builder.Default
private int connectTimeoutMillis = 3000;
+ /**
+ * Will send {@link
org.apache.dolphinscheduler.extract.base.protocal.HeartBeatTransporter} to
netty server every
+ * heartBeatIntervalMillis, used to keep the {@link
io.netty.channel.Channel} active.
+ */
+ @Builder.Default
+ private long heartBeatIntervalMillis = Duration.ofSeconds(10).toMillis();
+
+ @Builder.Default
+ private int defaultRpcTimeoutMillis = 10_000;
+
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java
index 9d4a2ee3d2..6a38b08c15 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java
@@ -17,6 +17,8 @@
package org.apache.dolphinscheduler.extract.base.config;
+import java.time.Duration;
+
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -66,6 +68,12 @@ public class NettyServerConfig {
@Builder.Default
private int workerThread = Runtime.getRuntime().availableProcessors() * 2;
+ /**
+ * If done's receive any data from a {@link io.netty.channel.Channel}
during 180s then will close it.
+ */
+ @Builder.Default
+ private long connectionIdleTime = Duration.ofSeconds(60).toMillis();
+
/**
* listen port
*/
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/ClientSyncDurationMetrics.java
similarity index 53%
copy from
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java
copy to
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/ClientSyncDurationMetrics.java
index 9d4a2ee3d2..60124363b7 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyServerConfig.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/ClientSyncDurationMetrics.java
@@ -15,7 +15,11 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.extract.base.config;
+package org.apache.dolphinscheduler.extract.base.metrics;
+
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.extract.base.SyncRequestDto;
+import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -26,49 +30,27 @@ import lombok.NoArgsConstructor;
@Builder
@NoArgsConstructor
@AllArgsConstructor
-public class NettyServerConfig {
-
- private String serverName;
-
- /**
- * init the server connectable queue
- */
- @Builder.Default
- private int soBacklog = 1024;
+public class ClientSyncDurationMetrics {
- /**
- * whether tpc delay
- */
- @Builder.Default
- private boolean tcpNoDelay = true;
+ private Transporter transporter;
- /**
- * whether keep alive
- */
- @Builder.Default
- private boolean soKeepalive = true;
+ private long milliseconds;
- /**
- * send buffer size
- */
@Builder.Default
- private int sendBufferSize = 65535;
+ private String clientHost = NetUtils.getHost();
- /**
- * receive buffer size
- */
- @Builder.Default
- private int receiveBufferSize = 65535;
+ private String serverHost;
- /**
- * worker threads,default get machine cpus
- */
- @Builder.Default
- private int workerThread = Runtime.getRuntime().availableProcessors() * 2;
+ public static ClientSyncDurationMetrics of(SyncRequestDto syncRequestDto) {
+ return ClientSyncDurationMetrics.builder()
+ .transporter(syncRequestDto.getTransporter())
+ .serverHost(syncRequestDto.getServerHost().getIp())
+ .build();
+ }
- /**
- * listen port
- */
- private int listenPort;
+ public ClientSyncDurationMetrics withMilliseconds(long milliseconds) {
+ this.milliseconds = milliseconds;
+ return this;
+ }
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyClientConfig.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/ClientSyncExceptionMetrics.java
similarity index 55%
copy from
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyClientConfig.java
copy to
dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/ClientSyncExceptionMetrics.java
index a41a439b5c..6f91132547 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/config/NettyClientConfig.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/ClientSyncExceptionMetrics.java
@@ -15,7 +15,11 @@
* limitations under the License.
*/
-package org.apache.dolphinscheduler.extract.base.config;
+package org.apache.dolphinscheduler.extract.base.metrics;
+
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+import org.apache.dolphinscheduler.extract.base.SyncRequestDto;
+import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
import lombok.AllArgsConstructor;
import lombok.Builder;
@@ -26,42 +30,27 @@ import lombok.NoArgsConstructor;
@Builder
@NoArgsConstructor
@AllArgsConstructor
-public class NettyClientConfig {
+public class ClientSyncExceptionMetrics {
- /**
- * worker threads,default get machine cpus
- */
- @Builder.Default
- private int workerThreads = Runtime.getRuntime().availableProcessors() * 2;
+ private Transporter transporter;
- /**
- * whether tpc delay
- */
- @Builder.Default
- private boolean tcpNoDelay = true;
+ private String clientHost;
- /**
- * whether keep alive
- */
@Builder.Default
- private boolean soKeepalive = true;
+ private String serverHost = NetUtils.getHost();
- /**
- * send buffer size
- */
- @Builder.Default
- private int sendBufferSize = 65535;
+ private Throwable throwable;
- /**
- * receive buffer size
- */
- @Builder.Default
- private int receiveBufferSize = 65535;
+ public static ClientSyncExceptionMetrics of(SyncRequestDto syncRequestDto)
{
+ return ClientSyncExceptionMetrics.builder()
+ .transporter(syncRequestDto.getTransporter())
+ .build();
- /**
- * connect timeout millis
- */
- @Builder.Default
- private int connectTimeoutMillis = 3000;
+ }
+
+ public ClientSyncExceptionMetrics withThrowable(Throwable throwable) {
+ this.throwable = throwable;
+ return this;
+ }
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/RpcMetrics.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/RpcMetrics.java
new file mode 100644
index 0000000000..c2dbbfefa6
--- /dev/null
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/metrics/RpcMetrics.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.extract.base.metrics;
+
+import org.apache.dolphinscheduler.extract.base.protocal.Transporter;
+import org.apache.dolphinscheduler.extract.base.protocal.TransporterHeader;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.Timer;
+
+public class RpcMetrics {
+
+ private static final Map<String, Timer> rpcRequestDurationTimer = new
ConcurrentHashMap<>();
+
+ private static final Map<String, Counter> rpcRequestExceptionCounter = new
ConcurrentHashMap<>();
+
+ public static void
recordClientSyncRequestException(ClientSyncExceptionMetrics
clientSyncExceptionMetrics) {
+ recordClientSyncRequestException(
+ clientSyncExceptionMetrics.getThrowable(),
+ Optional.of(clientSyncExceptionMetrics)
+ .map(ClientSyncExceptionMetrics::getTransporter)
+ .map(Transporter::getHeader)
+ .map(TransporterHeader::getMethodIdentifier)
+ .orElseGet(() -> "unknown"),
+ clientSyncExceptionMetrics.getClientHost(),
+ clientSyncExceptionMetrics.getServerHost());
+ }
+
+ public static void recordClientSyncRequestException(final Throwable
throwable,
+ final String
methodName,
+ final String
clientHost,
+ final String
serverHost) {
+ final String exceptionType = throwable == null ? "unknown" :
throwable.getClass().getSimpleName();
+ final Counter counter =
rpcRequestExceptionCounter.computeIfAbsent(exceptionType,
+ (et) ->
Counter.builder("ds.rpc.client.sync.request.exception.count")
+ .tag("method_name", methodName)
+ .tag("client_host", clientHost)
+ .tag("server_host", serverHost)
+ .tag("exception_name", et)
+ .description("rpc sync request exception counter for
exception type: " + et)
+ .register(Metrics.globalRegistry));
+ counter.increment();
+ }
+
+ public static void
recordClientSyncRequestDuration(ClientSyncDurationMetrics
clientSyncDurationMetrics) {
+ recordClientSyncRequestDuration(
+ Optional.of(clientSyncDurationMetrics)
+ .map(ClientSyncDurationMetrics::getTransporter)
+ .map(Transporter::getHeader)
+ .map(TransporterHeader::getMethodIdentifier)
+ .orElseGet(() -> "unknown"),
+ clientSyncDurationMetrics.getMilliseconds(),
+ clientSyncDurationMetrics.getClientHost(),
+ clientSyncDurationMetrics.getServerHost());
+ }
+
+ public static void recordClientSyncRequestDuration(final String methodName,
+ final long milliseconds,
+ final String clientHost,
+ final String
serverHost) {
+ rpcRequestDurationTimer.computeIfAbsent(methodName,
+ (method) ->
Timer.builder("ds.rpc.client.sync.request.duration.time")
+ .tag("method_name", method)
+ .tag("client_host", clientHost)
+ .tag("server_host", serverHost)
+ .publishPercentiles(0.5, 0.75, 0.95, 0.99)
+ .publishPercentileHistogram()
+ .description("time cost of sync rpc request, unit ms")
+ .register(Metrics.globalRegistry))
+ .record(milliseconds, TimeUnit.MILLISECONDS);
+ }
+
+}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
index f57ff0b609..4f9a7034c8 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/JdkDynamicServerHandler.java
@@ -38,6 +38,7 @@ import io.netty.channel.ChannelConfig;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
@Slf4j
@@ -160,7 +161,11 @@ class JdkDynamicServerHandler extends
ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
throws Exception {
if (evt instanceof IdleStateEvent) {
- ctx.channel().close();
+ IdleStateEvent event = (IdleStateEvent) evt;
+ if (event.state() == IdleState.READER_IDLE) {
+ log.warn("Not receive heart beat from: {}, will close the
channel", ctx.channel().remoteAddress());
+ ctx.close();
+ }
} else {
super.userEventTriggered(ctx, evt);
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
index 9beeaced3d..9ebf802b1e 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/server/NettyRemotingServer.java
@@ -22,7 +22,6 @@ import
org.apache.dolphinscheduler.extract.base.config.NettyServerConfig;
import org.apache.dolphinscheduler.extract.base.exception.RemoteException;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterDecoder;
import org.apache.dolphinscheduler.extract.base.protocal.TransporterEncoder;
-import org.apache.dolphinscheduler.extract.base.utils.Constants;
import org.apache.dolphinscheduler.extract.base.utils.NettyUtils;
import java.util.concurrent.ExecutorService;
@@ -135,7 +134,7 @@ class NettyRemotingServer {
.addLast("encoder", new TransporterEncoder())
.addLast("decoder", new TransporterDecoder())
.addLast("server-idle-handle",
- new IdleStateHandler(0, 0,
Constants.NETTY_SERVER_HEART_BEAT_TIME, TimeUnit.MILLISECONDS))
+ new
IdleStateHandler(serverConfig.getConnectionIdleTime(), 0, 0,
TimeUnit.MILLISECONDS))
.addLast("handler", channelHandler);
}
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/utils/Constants.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/utils/Constants.java
index 76e3872d31..94b92b2539 100644
---
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/utils/Constants.java
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/main/java/org/apache/dolphinscheduler/extract/base/utils/Constants.java
@@ -35,10 +35,6 @@ public class Constants {
public static final String SLASH = "/";
- public static final int NETTY_SERVER_HEART_BEAT_TIME = 1000 * 60 * 3 +
1000;
-
- public static final int NETTY_CLIENT_HEART_BEAT_TIME = 1000 * 6;
-
/**
* charset
*/
diff --git
a/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/metrics/RpcMetricsTest.java
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/metrics/RpcMetricsTest.java
new file mode 100644
index 0000000000..352e218206
--- /dev/null
+++
b/dolphinscheduler-extract/dolphinscheduler-extract-base/src/test/java/org/apache/dolphinscheduler/extract/base/metrics/RpcMetricsTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.extract.base.metrics;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import org.apache.dolphinscheduler.common.utils.NetUtils;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import io.micrometer.core.instrument.Metrics;
+import io.micrometer.core.instrument.simple.SimpleMeterRegistry;
+
+class RpcMetricsTest {
+
+ @BeforeEach
+ public void setup() {
+ Metrics.globalRegistry.clear();
+ Metrics.addRegistry(new SimpleMeterRegistry());
+ }
+
+ @Test
+ void testRecordClientSyncRequestException() {
+
assertThat(Metrics.globalRegistry.find("ds.rpc.client.sync.request.exception.count").counter()).isNull();
+
+ String clientHost = NetUtils.getHost();
+ String serverHost = NetUtils.getHost();
+
+ RpcMetrics.recordClientSyncRequestException(
+ new IllegalArgumentException("id is null"), "getById",
clientHost, serverHost);
+ RpcMetrics.recordClientSyncRequestException(
+ new IllegalArgumentException("name is null"), "getByName",
clientHost, serverHost);
+ RpcMetrics.recordClientSyncRequestException(
+ new IllegalArgumentException("age is null"), "getByAge",
clientHost, serverHost);
+ RpcMetrics.recordClientSyncRequestException(new
UnsupportedOperationException("update id is not supported"),
+ "updateById", clientHost, serverHost);
+
assertThat(Metrics.globalRegistry.find("ds.rpc.client.sync.request.exception.count").counter()).isNotNull();
+ }
+
+ @Test
+ void testRecordRpcRequestDuration() {
+
assertThat(Metrics.globalRegistry.find("ds.rpc.client.sync.request.duration.time").timer()).isNull();
+
+ String clientHost = NetUtils.getHost();
+ String serverHost = NetUtils.getHost();
+
+ RpcMetrics.recordClientSyncRequestDuration("getById", 100, clientHost,
serverHost);
+ RpcMetrics.recordClientSyncRequestDuration("getByName", 200,
clientHost, serverHost);
+ RpcMetrics.recordClientSyncRequestDuration("getByAge", 300,
clientHost, serverHost);
+ RpcMetrics.recordClientSyncRequestDuration("updateById", 400,
clientHost, serverHost);
+
assertThat(Metrics.globalRegistry.find("ds.rpc.client.sync.request.duration.time").timer()).isNotNull();
+ }
+
+}