This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 2616e6d83 [Feature][Zeta] Support get cluster metrics from seatunnel
zeta client (#4139)
2616e6d83 is described below
commit 2616e6d83809c86c81cc399ccbb63ce89f72653b
Author: Eric <[email protected]>
AuthorDate: Thu Feb 16 15:58:56 2023 +0800
[Feature][Zeta] Support get cluster metrics from seatunnel zeta client
(#4139)
* add get cluster metrics to seatunnel zeta client
* add some test output
* format code
* fix ci error
---
.../org/apache/seatunnel/engine/e2e/ClusterIT.java | 92 +++++
.../seatunnel/engine/client/SeaTunnelClient.java | 111 +++---
.../seatunnel/engine/client/job/JobClient.java | 93 +++++
.../SeaTunnelGetClusterHealthMetricsCodec.java | 81 ++++
.../SeaTunnelEngine.yaml | 18 +-
.../engine/server/SeaTunnelHealthMonitor.java | 413 +++++++++++++++++++++
.../seatunnel/engine/server/SeaTunnelServer.java | 5 +
.../GetClusterHealthMetricsOperation.java | 54 +++
.../protocol/task/GetClusterHealthMetricsTask.java | 53 +++
.../task/SeaTunnelMessageTaskFactoryProvider.java | 5 +
.../ClientToServerOperationDataSerializerHook.java | 5 +
11 files changed, 870 insertions(+), 60 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
new file mode 100644
index 000000000..acb2fab9b
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class ClusterIT {
+
+ @Test
+ public void getClusterHealthMetrics() {
+ HazelcastInstanceImpl node1 = null;
+ HazelcastInstanceImpl node2 = null;
+ SeaTunnelClient engineClient = null;
+
+ String testClusterName = "Test_getClusterHealthMetrics";
+
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+ seaTunnelConfig
+ .getHazelcastConfig()
+ .setClusterName(TestUtils.getClusterName(testClusterName));
+
+ try {
+ node1 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+ node2 =
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+ HazelcastInstanceImpl finalNode = node1;
+ Awaitility.await()
+ .atMost(10000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+ 2,
finalNode.getCluster().getMembers().size()));
+
+ ClientConfig clientConfig =
ConfigProvider.locateAndGetClientConfig();
+
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
+ engineClient = new SeaTunnelClient(clientConfig);
+
+ Map<String, String> clusterHealthMetrics =
engineClient.getClusterHealthMetrics();
+ System.out.println(
+ "=====================================cluster
metrics==================================================");
+ for (Map.Entry<String, String> entry :
clusterHealthMetrics.entrySet()) {
+ System.out.println(entry.getKey());
+ System.out.println(entry.getValue());
+ System.out.println(
+
"======================================================================================================");
+ }
+ Assertions.assertEquals(2, clusterHealthMetrics.size());
+
+ } finally {
+ if (engineClient != null) {
+ engineClient.shutdown();
+ }
+
+ if (node1 != null) {
+ node1.shutdown();
+ }
+
+ if (node2 != null) {
+ node2.shutdown();
+ }
+ }
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index f4526ea26..7b38112b6 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -17,36 +17,37 @@
package org.apache.seatunnel.engine.client;
-import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.engine.client.job.JobClient;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
import
org.apache.seatunnel.engine.client.job.JobMetricsRunner.JobMetricsSummary;
import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
-import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
-import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobDetailStatusCodec;
-import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobInfoCodec;
-import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec;
-import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
-import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec;
+import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetClusterHealthMetricsCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
-import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSavePointJobCodec;
import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.cluster.Member;
import com.hazelcast.logging.ILogger;
+import lombok.Getter;
import lombok.NonNull;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
public class SeaTunnelClient implements SeaTunnelClientInstance {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final SeaTunnelHazelcastClient hazelcastClient;
+ @Getter private final JobClient jobClient;
public SeaTunnelClient(@NonNull ClientConfig clientConfig) {
this.hazelcastClient = new SeaTunnelHazelcastClient(clientConfig);
+ this.jobClient = new JobClient(this.hazelcastClient);
}
@Override
@@ -90,17 +91,15 @@ public class SeaTunnelClient implements
SeaTunnelClientInstance {
*
* @param jobId jobId
*/
+ @Deprecated
public String getJobDetailStatus(Long jobId) {
- return hazelcastClient.requestOnMasterAndDecodeResponse(
- SeaTunnelGetJobDetailStatusCodec.encodeRequest(jobId),
- SeaTunnelGetJobDetailStatusCodec::decodeResponse);
+ return jobClient.getJobDetailStatus(jobId);
}
/** list all jobId and job status */
+ @Deprecated
public String listJobStatus() {
- return hazelcastClient.requestOnMasterAndDecodeResponse(
- SeaTunnelListJobStatusCodec.encodeRequest(),
- SeaTunnelListJobStatusCodec::decodeResponse);
+ return jobClient.listJobStatus();
}
/**
@@ -108,63 +107,57 @@ public class SeaTunnelClient implements
SeaTunnelClientInstance {
*
* @param jobId jobId
*/
+ @Deprecated
public String getJobStatus(Long jobId) {
- int jobStatusOrdinal =
- hazelcastClient.requestOnMasterAndDecodeResponse(
- SeaTunnelGetJobStatusCodec.encodeRequest(jobId),
- SeaTunnelGetJobStatusCodec::decodeResponse);
- return JobStatus.values()[jobStatusOrdinal].toString();
+ return jobClient.getJobStatus(jobId);
}
+ @Deprecated
public String getJobMetrics(Long jobId) {
- return hazelcastClient.requestOnMasterAndDecodeResponse(
- SeaTunnelGetJobMetricsCodec.encodeRequest(jobId),
- SeaTunnelGetJobMetricsCodec::decodeResponse);
+ return jobClient.getJobMetrics(jobId);
}
+ @Deprecated
public void savePointJob(Long jobId) {
- PassiveCompletableFuture<Void> cancelFuture =
- hazelcastClient.requestOnMasterAndGetCompletableFuture(
- SeaTunnelSavePointJobCodec.encodeRequest(jobId));
-
- cancelFuture.join();
+ jobClient.savePointJob(jobId);
}
+ @Deprecated
public void cancelJob(Long jobId) {
- PassiveCompletableFuture<Void> cancelFuture =
- hazelcastClient.requestOnMasterAndGetCompletableFuture(
- SeaTunnelCancelJobCodec.encodeRequest(jobId));
-
- cancelFuture.join();
+ jobClient.cancelJob(jobId);
}
public JobDAGInfo getJobInfo(Long jobId) {
- return hazelcastClient
- .getSerializationService()
- .toObject(
- hazelcastClient.requestOnMasterAndDecodeResponse(
- SeaTunnelGetJobInfoCodec.encodeRequest(jobId),
- SeaTunnelGetJobInfoCodec::decodeResponse));
+ return jobClient.getJobInfo(jobId);
}
public JobMetricsSummary getJobMetricsSummary(Long jobId) {
- long sourceReadCount = 0L;
- long sinkWriteCount = 0L;
- String jobMetrics = getJobMetrics(jobId);
- try {
- JsonNode jsonNode = OBJECT_MAPPER.readTree(jobMetrics);
- JsonNode sourceReaders = jsonNode.get("SourceReceivedCount");
- JsonNode sinkWriters = jsonNode.get("SinkWriteCount");
- for (int i = 0; i < sourceReaders.size(); i++) {
- JsonNode sourceReader = sourceReaders.get(i);
- JsonNode sinkWriter = sinkWriters.get(i);
- sourceReadCount += sourceReader.get("value").asLong();
- sinkWriteCount += sinkWriter.get("value").asLong();
- }
- return new JobMetricsSummary(sourceReadCount, sinkWriteCount);
- // Add NullPointerException because of metrics information can be
empty like {}
- } catch (JsonProcessingException | NullPointerException e) {
- return new JobMetricsSummary(sourceReadCount, sinkWriteCount);
- }
+ return jobClient.getJobMetricsSummary(jobId);
+ }
+
+ public Map<String, String> getClusterHealthMetrics() {
+ Set<Member> members =
hazelcastClient.getHazelcastInstance().getCluster().getMembers();
+ Map<String, String> healthMetricsMap = new HashMap<>();
+ members.stream()
+ .forEach(
+ member -> {
+ String metrics =
+ hazelcastClient.requestAndDecodeResponse(
+ member.getUuid(),
+
SeaTunnelGetClusterHealthMetricsCodec.encodeRequest(),
+
SeaTunnelGetClusterHealthMetricsCodec::decodeResponse);
+ String[] split = metrics.split(",");
+ Map<String, String> kvMap = new LinkedHashMap<>();
+ Arrays.stream(split)
+ .forEach(
+ kv -> {
+ String[] kvArr = kv.split("=");
+ kvMap.put(kvArr[0], kvArr[1]);
+ });
+ healthMetricsMap.put(
+ member.getAddress().toString(),
JsonUtils.toJsonString(kvMap));
+ });
+
+ return healthMetricsMap;
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
index 498702544..476aa71ea 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
@@ -17,13 +17,28 @@
package org.apache.seatunnel.engine.client.job;
+import
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+
import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
+import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobDetailStatusCodec;
+import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobInfoCodec;
+import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec;
+import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
+import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec;
+import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSavePointJobCodec;
import lombok.NonNull;
public class JobClient {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final SeaTunnelHazelcastClient hazelcastClient;
public JobClient(@NonNull SeaTunnelHazelcastClient hazelcastClient) {
@@ -40,4 +55,82 @@ public class JobClient {
public ClientJobProxy createJobProxy(@NonNull JobImmutableInformation
jobImmutableInformation) {
return new ClientJobProxy(hazelcastClient, jobImmutableInformation);
}
+
+ public String getJobDetailStatus(Long jobId) {
+ return hazelcastClient.requestOnMasterAndDecodeResponse(
+ SeaTunnelGetJobDetailStatusCodec.encodeRequest(jobId),
+ SeaTunnelGetJobDetailStatusCodec::decodeResponse);
+ }
+
+ /** list all jobId and job status */
+ public String listJobStatus() {
+ return hazelcastClient.requestOnMasterAndDecodeResponse(
+ SeaTunnelListJobStatusCodec.encodeRequest(),
+ SeaTunnelListJobStatusCodec::decodeResponse);
+ }
+
+ /**
+ * get one job status
+ *
+ * @param jobId jobId
+ */
+ public String getJobStatus(Long jobId) {
+ int jobStatusOrdinal =
+ hazelcastClient.requestOnMasterAndDecodeResponse(
+ SeaTunnelGetJobStatusCodec.encodeRequest(jobId),
+ SeaTunnelGetJobStatusCodec::decodeResponse);
+ return JobStatus.values()[jobStatusOrdinal].toString();
+ }
+
+ public String getJobMetrics(Long jobId) {
+ return hazelcastClient.requestOnMasterAndDecodeResponse(
+ SeaTunnelGetJobMetricsCodec.encodeRequest(jobId),
+ SeaTunnelGetJobMetricsCodec::decodeResponse);
+ }
+
+ public void savePointJob(Long jobId) {
+ PassiveCompletableFuture<Void> cancelFuture =
+ hazelcastClient.requestOnMasterAndGetCompletableFuture(
+ SeaTunnelSavePointJobCodec.encodeRequest(jobId));
+
+ cancelFuture.join();
+ }
+
+ public void cancelJob(Long jobId) {
+ PassiveCompletableFuture<Void> cancelFuture =
+ hazelcastClient.requestOnMasterAndGetCompletableFuture(
+ SeaTunnelCancelJobCodec.encodeRequest(jobId));
+
+ cancelFuture.join();
+ }
+
+ public JobDAGInfo getJobInfo(Long jobId) {
+ return hazelcastClient
+ .getSerializationService()
+ .toObject(
+ hazelcastClient.requestOnMasterAndDecodeResponse(
+ SeaTunnelGetJobInfoCodec.encodeRequest(jobId),
+ SeaTunnelGetJobInfoCodec::decodeResponse));
+ }
+
+ public JobMetricsRunner.JobMetricsSummary getJobMetricsSummary(Long jobId)
{
+ long sourceReadCount = 0L;
+ long sinkWriteCount = 0L;
+ String jobMetrics = getJobMetrics(jobId);
+ try {
+ JsonNode jsonNode = OBJECT_MAPPER.readTree(jobMetrics);
+ JsonNode sourceReaders = jsonNode.get("SourceReceivedCount");
+ JsonNode sinkWriters = jsonNode.get("SinkWriteCount");
+ for (int i = 0; i < sourceReaders.size(); i++) {
+ JsonNode sourceReader = sourceReaders.get(i);
+ JsonNode sinkWriter = sinkWriters.get(i);
+ sourceReadCount += sourceReader.get("value").asLong();
+ sinkWriteCount += sinkWriter.get("value").asLong();
+ }
+ return new JobMetricsRunner.JobMetricsSummary(sourceReadCount,
sinkWriteCount);
+ // Add NullPointerException because of metrics information can be
empty like {}
+ } catch (JsonProcessingException | NullPointerException e) {
+ return new JobMetricsRunner.JobMetricsSummary(sourceReadCount,
sinkWriteCount);
+ }
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetClusterHealthMetricsCodec.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetClusterHealthMetricsCodec.java
new file mode 100644
index 000000000..14fe321f7
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetClusterHealthMetricsCodec.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.protocol.codec;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.client.impl.protocol.Generated;
+import com.hazelcast.client.impl.protocol.codec.builtin.StringCodec;
+
+import static
com.hazelcast.client.impl.protocol.ClientMessage.ForwardFrameIterator;
+import static com.hazelcast.client.impl.protocol.ClientMessage.Frame;
+import static
com.hazelcast.client.impl.protocol.ClientMessage.PARTITION_ID_FIELD_OFFSET;
+import static
com.hazelcast.client.impl.protocol.ClientMessage.RESPONSE_BACKUP_ACKS_FIELD_OFFSET;
+import static
com.hazelcast.client.impl.protocol.ClientMessage.TYPE_FIELD_OFFSET;
+import static
com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESSAGE;
+import static
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES;
+import static
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES;
+import static
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt;
+
+/*
+ * This file is auto-generated by the Hazelcast Client Protocol Code Generator.
+ * To change this file, edit the templates or the protocol
+ * definitions on the https://github.com/hazelcast/hazelcast-client-protocol
+ * and regenerate it.
+ */
+@Generated("96c8a873ec6eee0bda3a16b1f849a137")
+public final class SeaTunnelGetClusterHealthMetricsCodec {
+ // hex: 0xDE0B00
+ public static final int REQUEST_MESSAGE_TYPE = 14551808;
+ // hex: 0xDE0B01
+ public static final int RESPONSE_MESSAGE_TYPE = 14551809;
+ private static final int REQUEST_INITIAL_FRAME_SIZE =
+ PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+ private static final int RESPONSE_INITIAL_FRAME_SIZE =
+ RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
+
+ private SeaTunnelGetClusterHealthMetricsCodec() {}
+
+ public static ClientMessage encodeRequest() {
+ ClientMessage clientMessage = ClientMessage.createForEncode();
+ clientMessage.setRetryable(true);
+ clientMessage.setOperationName("SeaTunnel.GetClusterHealthMetrics");
+ Frame initialFrame = new Frame(new byte[REQUEST_INITIAL_FRAME_SIZE],
UNFRAGMENTED_MESSAGE);
+ encodeInt(initialFrame.content, TYPE_FIELD_OFFSET,
REQUEST_MESSAGE_TYPE);
+ encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
+ clientMessage.add(initialFrame);
+ return clientMessage;
+ }
+
+ public static ClientMessage encodeResponse(String response) {
+ ClientMessage clientMessage = ClientMessage.createForEncode();
+ Frame initialFrame = new Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE],
UNFRAGMENTED_MESSAGE);
+ encodeInt(initialFrame.content, TYPE_FIELD_OFFSET,
RESPONSE_MESSAGE_TYPE);
+ clientMessage.add(initialFrame);
+
+ StringCodec.encode(clientMessage, response);
+ return clientMessage;
+ }
+
+ /** */
+ public static String decodeResponse(ClientMessage clientMessage) {
+ ForwardFrameIterator iterator = clientMessage.frameIterator();
+ // empty initial frame
+ iterator.next();
+ return StringCodec.decode(iterator);
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
index 1c70091c7..78317d825 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
@@ -207,4 +207,20 @@ methods:
nullable: false
since: 2.0
doc: ''
- response: {}
\ No newline at end of file
+ response: {}
+
+ - id: 11
+ name: getClusterHealthMetrics
+ since: 2.0
+ doc: ''
+ request:
+ retryable: true
+ partitionIdentifier: -1
+ params: []
+ response:
+ params:
+ - name: response
+ type: String
+ nullable: false
+ since: 2.0
+ doc: ''
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelHealthMonitor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelHealthMonitor.java
new file mode 100644
index 000000000..b7900871e
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelHealthMonitor.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server;
+
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.diagnostics.HealthMonitorLevel;
+import com.hazelcast.internal.memory.MemoryStats;
+import com.hazelcast.internal.metrics.DoubleGauge;
+import com.hazelcast.internal.metrics.LongGauge;
+import com.hazelcast.internal.metrics.MetricsRegistry;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.spi.properties.ClusterProperty;
+import lombok.Getter;
+
+import static com.hazelcast.internal.diagnostics.HealthMonitorLevel.valueOf;
+import static
com.hazelcast.spi.properties.ClusterProperty.HEALTH_MONITORING_THRESHOLD_CPU_PERCENTAGE;
+import static
com.hazelcast.spi.properties.ClusterProperty.HEALTH_MONITORING_THRESHOLD_MEMORY_PERCENTAGE;
+import static java.lang.String.format;
+
+public class SeaTunnelHealthMonitor {
+ private static final String[] UNITS = new String[] {"", "K", "M", "G",
"T", "P", "E"};
+ private static final double PERCENTAGE_MULTIPLIER = 100d;
+ private static final double THRESHOLD_PERCENTAGE_INVOCATIONS = 70;
+ private static final double THRESHOLD_INVOCATIONS = 1000;
+
+ private final ILogger logger;
+ private final Node node;
+ private final HealthMonitorLevel monitorLevel;
+ private final int thresholdMemoryPercentage;
+ private final int thresholdCPUPercentage;
+ private final MetricsRegistry metricRegistry;
+
+ @Getter private final SeaTunnelHealthMetrics healthMetrics;
+
+ public SeaTunnelHealthMonitor(Node node) {
+ this.node = node;
+ this.logger =
node.getLogger(com.hazelcast.internal.diagnostics.HealthMonitor.class);
+ this.metricRegistry = node.nodeEngine.getMetricsRegistry();
+ this.monitorLevel = getHealthMonitorLevel();
+ this.thresholdMemoryPercentage =
+
node.getProperties().getInteger(HEALTH_MONITORING_THRESHOLD_MEMORY_PERCENTAGE);
+ this.thresholdCPUPercentage =
+
node.getProperties().getInteger(HEALTH_MONITORING_THRESHOLD_CPU_PERCENTAGE);
+ this.healthMetrics = new SeaTunnelHealthMetrics();
+ }
+
+ private HealthMonitorLevel getHealthMonitorLevel() {
+ String healthMonitorLevel =
+
node.getProperties().getString(ClusterProperty.HEALTH_MONITORING_LEVEL);
+ return valueOf(healthMonitorLevel);
+ }
+
+ /**
+ * Given a number, returns that number as a percentage string.
+ *
+ * @param p the given number
+ * @return a string of the given number as a format float with two decimal
places and a period
+ */
+ private static String percentageString(double p) {
+ return format("%.2f%%", p);
+ }
+
+ @SuppressWarnings("checkstyle:magicnumber")
+ private static String numberToUnit(long number) {
+ for (int i = 6; i > 0; i--) {
+ // 1024 is for 1024 kb is 1 MB etc
+ double step = Math.pow(1024, i);
+ if (number > step) {
+ return format("%3.1f%s", number / step, UNITS[i]);
+ }
+ }
+ return Long.toString(number);
+ }
+
+ public class SeaTunnelHealthMetrics {
+ final LongGauge clientEndpointCount =
metricRegistry.newLongGauge("client.endpoint.count");
+ final LongGauge clusterTimeDiff =
+ metricRegistry.newLongGauge("cluster.clock.clusterTimeDiff");
+
+ final LongGauge executorAsyncQueueSize =
+ metricRegistry.newLongGauge("executor.hz:async.queueSize");
+ final LongGauge executorClientQueueSize =
+ metricRegistry.newLongGauge("executor.hz:client.queueSize");
+ final LongGauge executorQueryClientQueueSize =
+
metricRegistry.newLongGauge("executor.hz:client.query.queueSize");
+ final LongGauge executorBlockingClientQueueSize =
+
metricRegistry.newLongGauge("executor.hz:client.blocking.queueSize");
+ final LongGauge executorClusterQueueSize =
+ metricRegistry.newLongGauge("executor.hz:cluster.queueSize");
+ final LongGauge executorScheduledQueueSize =
+ metricRegistry.newLongGauge("executor.hz:scheduled.queueSize");
+ final LongGauge executorSystemQueueSize =
+ metricRegistry.newLongGauge("executor.hz:system.queueSize");
+ final LongGauge executorIoQueueSize =
+ metricRegistry.newLongGauge("executor.hz:io.queueSize");
+ final LongGauge executorQueryQueueSize =
+ metricRegistry.newLongGauge("executor.hz:query.queueSize");
+ final LongGauge executorMapLoadQueueSize =
+ metricRegistry.newLongGauge("executor.hz:map-load.queueSize");
+ final LongGauge executorMapLoadAllKeysQueueSize =
+
metricRegistry.newLongGauge("executor.hz:map-loadAllKeys.queueSize");
+
+ final LongGauge eventQueueSize =
metricRegistry.newLongGauge("event.eventQueueSize");
+
+ final LongGauge gcMinorCount =
metricRegistry.newLongGauge("gc.minorCount");
+ final LongGauge gcMinorTime =
metricRegistry.newLongGauge("gc.minorTime");
+ final LongGauge gcMajorCount =
metricRegistry.newLongGauge("gc.majorCount");
+ final LongGauge gcMajorTime =
metricRegistry.newLongGauge("gc.majorTime");
+ final LongGauge gcUnknownCount =
metricRegistry.newLongGauge("gc.unknownCount");
+ final LongGauge gcUnknownTime =
metricRegistry.newLongGauge("gc.unknownTime");
+
+ final LongGauge runtimeAvailableProcessors =
+ metricRegistry.newLongGauge("runtime.availableProcessors");
+ final LongGauge runtimeMaxMemory =
metricRegistry.newLongGauge("runtime.maxMemory");
+ final LongGauge runtimeFreeMemory =
metricRegistry.newLongGauge("runtime.freeMemory");
+ final LongGauge runtimeTotalMemory =
metricRegistry.newLongGauge("runtime.totalMemory");
+ final LongGauge runtimeUsedMemory =
metricRegistry.newLongGauge("runtime.usedMemory");
+
+ final LongGauge threadPeakThreadCount =
+ metricRegistry.newLongGauge("thread.peakThreadCount");
+ final LongGauge threadThreadCount =
metricRegistry.newLongGauge("thread.threadCount");
+
+ final DoubleGauge osProcessCpuLoad =
metricRegistry.newDoubleGauge("os.processCpuLoad");
+ final DoubleGauge osSystemLoadAverage =
+ metricRegistry.newDoubleGauge("os.systemLoadAverage");
+ final DoubleGauge osSystemCpuLoad =
metricRegistry.newDoubleGauge("os.systemCpuLoad");
+ final LongGauge osTotalPhysicalMemorySize =
+ metricRegistry.newLongGauge("os.totalPhysicalMemorySize");
+ final LongGauge osFreePhysicalMemorySize =
+ metricRegistry.newLongGauge("os.freePhysicalMemorySize");
+ final LongGauge osTotalSwapSpaceSize =
metricRegistry.newLongGauge("os.totalSwapSpaceSize");
+ final LongGauge osFreeSwapSpaceSize =
metricRegistry.newLongGauge("os.freeSwapSpaceSize");
+
+ final LongGauge operationServiceExecutorQueueSize =
+ metricRegistry.newLongGauge("operation.queueSize");
+ final LongGauge operationServiceExecutorPriorityQueueSize =
+ metricRegistry.newLongGauge("operation.priorityQueueSize");
+ final LongGauge operationServiceResponseQueueSize =
+ metricRegistry.newLongGauge("operation.responseQueueSize");
+ final LongGauge operationServiceRunningOperationsCount =
+ metricRegistry.newLongGauge("operation.runningCount");
+ final LongGauge operationServiceCompletedOperationsCount =
+ metricRegistry.newLongGauge("operation.completedCount");
+ final LongGauge operationServicePendingInvocationsCount =
+ metricRegistry.newLongGauge("operation.invocations.pending");
+ final DoubleGauge operationServicePendingInvocationsPercentage =
+ metricRegistry.newDoubleGauge("operation.invocations.used");
+
+ final LongGauge proxyCount =
metricRegistry.newLongGauge("proxy.proxyCount");
+
+ final LongGauge tcpConnectionActiveCount =
+ metricRegistry.newLongGauge("tcp.connection.activeCount");
+ final LongGauge tcpConnectionCount =
metricRegistry.newLongGauge("tcp.connection.count");
+ final LongGauge tcpConnectionClientCount =
+ metricRegistry.newLongGauge("tcp.connection.clientCount");
+
+ private final StringBuilder sb = new StringBuilder();
+ private double memoryUsedOfTotalPercentage;
+ private double memoryUsedOfMaxPercentage;
+
+ public void update() {
+ memoryUsedOfTotalPercentage =
+ (PERCENTAGE_MULTIPLIER * runtimeUsedMemory.read()) /
runtimeTotalMemory.read();
+ memoryUsedOfMaxPercentage =
+ (PERCENTAGE_MULTIPLIER * runtimeUsedMemory.read()) /
runtimeMaxMemory.read();
+ }
+
+ boolean exceedsThreshold() {
+ if (memoryUsedOfMaxPercentage > thresholdMemoryPercentage) {
+ return true;
+ }
+ if (osProcessCpuLoad.read() > thresholdCPUPercentage) {
+ return true;
+ }
+ if (osSystemCpuLoad.read() > thresholdCPUPercentage) {
+ return true;
+ }
+ if (operationServicePendingInvocationsPercentage.read()
+ > THRESHOLD_PERCENTAGE_INVOCATIONS) {
+ return true;
+ }
+ if (operationServicePendingInvocationsCount.read() >
THRESHOLD_INVOCATIONS) {
+ return true;
+ }
+ return false;
+ }
+
+ public String render() {
+ update();
+ sb.setLength(0);
+ renderProcessors();
+ renderPhysicalMemory();
+ renderSwap();
+ renderHeap();
+ renderNativeMemory();
+ renderGc();
+ renderLoad();
+ renderThread();
+ renderCluster();
+ renderEvents();
+ renderExecutors();
+ renderOperationService();
+ renderProxy();
+ renderClient();
+ renderConnection();
+ return sb.toString();
+ }
+
+ private void renderConnection() {
+ sb.append("connection.active.count=")
+ .append(tcpConnectionActiveCount.read())
+ .append(", ");
+ sb.append("client.connection.count=")
+ .append(tcpConnectionClientCount.read())
+ .append(", ");
+ sb.append("connection.count=").append(tcpConnectionCount.read());
+ }
+
+ private void renderClient() {
+
sb.append("clientEndpoint.count=").append(clientEndpointCount.read()).append(",
");
+ }
+
+ private void renderProxy() {
+ sb.append("proxy.count=").append(proxyCount.read()).append(", ");
+ }
+
+ private void renderLoad() {
+ sb.append("load.process")
+ .append('=')
+ .append(format("%.2f", osProcessCpuLoad.read()))
+ .append("%, ");
+ sb.append("load.system")
+ .append('=')
+ .append(format("%.2f", osSystemCpuLoad.read()))
+ .append("%, ");
+
+ double value = osSystemLoadAverage.read();
+ if (value < 0) {
+ sb.append("load.systemAverage").append("=n/a ");
+ } else {
+ sb.append("load.systemAverage")
+ .append('=')
+ .append(format("%.2f", osSystemLoadAverage.read()))
+ .append(", ");
+ }
+ }
+
+ private void renderProcessors() {
+
sb.append("processors=").append(runtimeAvailableProcessors.read()).append(", ");
+ }
+
+ private void renderPhysicalMemory() {
+ sb.append("physical.memory.total=")
+ .append(numberToUnit(osTotalPhysicalMemorySize.read()))
+ .append(", ");
+ sb.append("physical.memory.free=")
+ .append(numberToUnit(osFreePhysicalMemorySize.read()))
+ .append(", ");
+ }
+
+ private void renderSwap() {
+ sb.append("swap.space.total=")
+ .append(numberToUnit(osTotalSwapSpaceSize.read()))
+ .append(", ");
+ sb.append("swap.space.free=")
+ .append(numberToUnit(osFreeSwapSpaceSize.read()))
+ .append(", ");
+ }
+
+ @SuppressWarnings("checkstyle:UnnecessaryParentheses")
+ private void renderHeap() {
+ sb.append("heap.memory.used=")
+ .append(numberToUnit(runtimeUsedMemory.read()))
+ .append(", ");
+ sb.append("heap.memory.free=")
+ .append(numberToUnit(runtimeFreeMemory.read()))
+ .append(", ");
+ sb.append("heap.memory.total=")
+ .append(numberToUnit(runtimeTotalMemory.read()))
+ .append(", ");
+ sb.append("heap.memory.max=")
+ .append(numberToUnit(runtimeMaxMemory.read()))
+ .append(", ");
+ sb.append("heap.memory.used/total=")
+ .append(percentageString(memoryUsedOfTotalPercentage))
+ .append(", ");
+ sb.append("heap.memory.used/max=")
+ .append(percentageString(memoryUsedOfMaxPercentage))
+ .append((", "));
+ }
+
+ private void renderEvents() {
+ sb.append("event.q.size=").append(eventQueueSize.read()).append(",
");
+ }
+
+ private void renderCluster() {
+
sb.append("cluster.timeDiff=").append(clusterTimeDiff.read()).append(", ");
+ }
+
+ private void renderThread() {
+
sb.append("thread.count=").append(threadThreadCount.read()).append(", ");
+
sb.append("thread.peakCount=").append(threadPeakThreadCount.read()).append(",
");
+ }
+
+ private void renderGc() {
+ sb.append("minor.gc.count=").append(gcMinorCount.read()).append(",
");
+ sb.append("minor.gc.time=").append(gcMinorTime.read()).append("ms,
");
+ sb.append("major.gc.count=").append(gcMajorCount.read()).append(",
");
+ sb.append("major.gc.time=").append(gcMajorTime.read()).append("ms,
");
+
+ if (gcUnknownCount.read() > 0) {
+
sb.append("unknown.gc.count=").append(gcUnknownCount.read()).append(", ");
+
sb.append("unknown.gc.time=").append(gcUnknownTime.read()).append("ms, ");
+ }
+ }
+
+ private void renderNativeMemory() {
+ MemoryStats memoryStats = node.getNodeExtension().getMemoryStats();
+ if (memoryStats.getMaxNative() <= 0L) {
+ return;
+ }
+
+ final long maxNative = memoryStats.getMaxNative();
+ final long usedNative = memoryStats.getUsedNative();
+ final long usedMeta = memoryStats.getUsedMetadata();
+
+
sb.append("native.memory.used=").append(numberToUnit(usedNative)).append(", ");
+ sb.append("native.memory.free=")
+ .append(numberToUnit(memoryStats.getFreeNative()))
+ .append(", ");
+ sb.append("native.memory.total=")
+ .append(numberToUnit(memoryStats.getCommittedNative()))
+ .append(", ");
+
sb.append("native.memory.max=").append(numberToUnit(maxNative)).append(", ");
+
sb.append("native.meta.memory.used=").append(numberToUnit(usedMeta)).append(",
");
+ sb.append("native.meta.memory.free=")
+ .append(numberToUnit(maxNative - usedMeta))
+ .append(", ");
+ sb.append("native.meta.memory.percentage=")
+ .append(percentageString(PERCENTAGE_MULTIPLIER * usedMeta
/ maxNative))
+ .append(", ");
+ }
+
+ private void renderExecutors() {
+
sb.append("executor.q.async.size=").append(executorAsyncQueueSize.read()).append(",
");
+ sb.append("executor.q.client.size=")
+ .append(executorClientQueueSize.read())
+ .append(", ");
+ sb.append("executor.q.client.query.size=")
+ .append(executorQueryClientQueueSize.read())
+ .append(", ");
+ sb.append("executor.q.client.blocking.size=")
+ .append(executorBlockingClientQueueSize.read())
+ .append(", ");
+
sb.append("executor.q.query.size=").append(executorQueryQueueSize.read()).append(",
");
+ sb.append("executor.q.scheduled.size=")
+ .append(executorScheduledQueueSize.read())
+ .append(", ");
+
sb.append("executor.q.io.size=").append(executorIoQueueSize.read()).append(",
");
+ sb.append("executor.q.system.size=")
+ .append(executorSystemQueueSize.read())
+ .append(", ");
+ sb.append("executor.q.operations.size=")
+ .append(operationServiceExecutorQueueSize.read())
+ .append(", ");
+ sb.append("executor.q.priorityOperation.size=")
+ .append(operationServiceExecutorPriorityQueueSize.read())
+ .append(", ");
+ sb.append("operations.completed.count=")
+ .append(operationServiceCompletedOperationsCount.read())
+ .append(", ");
+ sb.append("executor.q.mapLoad.size=")
+ .append(executorMapLoadQueueSize.read())
+ .append(", ");
+ sb.append("executor.q.mapLoadAllKeys.size=")
+ .append(executorMapLoadAllKeysQueueSize.read())
+ .append(", ");
+ sb.append("executor.q.cluster.size=")
+ .append(executorClusterQueueSize.read())
+ .append(", ");
+ }
+
+ private void renderOperationService() {
+ sb.append("executor.q.response.size=")
+ .append(operationServiceResponseQueueSize.read())
+ .append(", ");
+ sb.append("operations.running.count=")
+ .append(operationServiceRunningOperationsCount.read())
+ .append(", ");
+ sb.append("operations.pending.invocations.percentage=")
+ .append(format("%.2f",
operationServicePendingInvocationsPercentage.read()))
+ .append("%, ");
+ sb.append("operations.pending.invocations.count=")
+ .append(operationServicePendingInvocationsCount.read())
+ .append(", ");
+ }
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 861b51270..07cb88870 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -37,6 +37,7 @@ import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.operationservice.LiveOperations;
import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
+import lombok.Getter;
import lombok.NonNull;
import java.util.Properties;
@@ -59,6 +60,8 @@ public class SeaTunnelServer
private CoordinatorService coordinatorService;
private ScheduledExecutorService monitorService;
+ @Getter private SeaTunnelHealthMonitor seaTunnelHealthMonitor;
+
private final SeaTunnelConfig seaTunnelConfig;
private volatile boolean isRunning = true;
@@ -105,6 +108,8 @@ public class SeaTunnelServer
0,
seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(),
TimeUnit.SECONDS);
+
+ seaTunnelHealthMonitor = new SeaTunnelHealthMonitor(((NodeEngineImpl)
engine).getNode());
}
@Override
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetClusterHealthMetricsOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetClusterHealthMetricsOperation.java
new file mode 100644
index 000000000..20e56a7be
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetClusterHealthMetricsOperation.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import
org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
+
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.AllowedDuringPassiveState;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class GetClusterHealthMetricsOperation extends Operation
+ implements IdentifiedDataSerializable, AllowedDuringPassiveState {
+
+ private String response;
+
+ public GetClusterHealthMetricsOperation() {}
+
+ @Override
+ public int getFactoryId() {
+ return ClientToServerOperationDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return
ClientToServerOperationDataSerializerHook.GET_CLUSTER_HEALTH_METRICS;
+ }
+
+ @Override
+ public void run() {
+ SeaTunnelServer service = getService();
+ response =
service.getSeaTunnelHealthMonitor().getHealthMetrics().render();
+ }
+
+ @Override
+ public Object getResponse() {
+ return response;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetClusterHealthMetricsTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetClusterHealthMetricsTask.java
new file mode 100644
index 000000000..45d25d08e
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetClusterHealthMetricsTask.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.protocol.task;
+
+import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetClusterHealthMetricsCodec;
+import
org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.nio.Connection;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class GetClusterHealthMetricsTask extends
AbstractSeaTunnelMessageTask<Void, String> {
+ protected GetClusterHealthMetricsTask(
+ ClientMessage clientMessage, Node node, Connection connection) {
+ super(
+ clientMessage,
+ node,
+ connection,
+ m -> null,
+ SeaTunnelGetClusterHealthMetricsCodec::encodeResponse);
+ }
+
+ @Override
+ protected Operation prepareOperation() {
+ return new GetClusterHealthMetricsOperation();
+ }
+
+ @Override
+ public String getMethodName() {
+ return "getClusterHealthMetrics";
+ }
+
+ @Override
+ public Object[] getParameters() {
+ return new Object[0];
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
index ea5c53eb0..6d19ada03 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.engine.server.protocol.task;
import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
+import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetClusterHealthMetricsCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobDetailStatusCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobInfoCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec;
@@ -87,5 +88,9 @@ public class SeaTunnelMessageTaskFactoryProvider implements
MessageTaskFactoryPr
SeaTunnelSavePointJobCodec.REQUEST_MESSAGE_TYPE,
(clientMessage, connection) ->
new SavePointJobTask(clientMessage, node, connection));
+ factories.put(
+ SeaTunnelGetClusterHealthMetricsCodec.REQUEST_MESSAGE_TYPE,
+ (clientMessage, connection) ->
+ new GetClusterHealthMetricsTask(clientMessage, node,
connection));
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
index 68a468b24..4cfef1cd5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.serializable;
import
org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
import org.apache.seatunnel.engine.server.operation.CancelJobOperation;
+import
org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation;
import
org.apache.seatunnel.engine.server.operation.GetJobDetailStatusOperation;
import org.apache.seatunnel.engine.server.operation.GetJobInfoOperation;
import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
@@ -57,6 +58,8 @@ public final class ClientToServerOperationDataSerializerHook
implements DataSeri
public static final int SAVEPOINT_JOB_OPERATOR = 8;
+ public static final int GET_CLUSTER_HEALTH_METRICS = 9;
+
public static final int FACTORY_ID =
FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
@@ -95,6 +98,8 @@ public final class ClientToServerOperationDataSerializerHook
implements DataSeri
return new GetJobInfoOperation();
case SAVEPOINT_JOB_OPERATOR:
return new SavePointJobOperation();
+ case GET_CLUSTER_HEALTH_METRICS:
+ return new GetClusterHealthMetricsOperation();
default:
throw new IllegalArgumentException("Unknown type id " +
typeId);
}