This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2616e6d83 [Feature][Zeta] Support get cluster metrics from seatunnel 
zeta client (#4139)
2616e6d83 is described below

commit 2616e6d83809c86c81cc399ccbb63ce89f72653b
Author: Eric <[email protected]>
AuthorDate: Thu Feb 16 15:58:56 2023 +0800

    [Feature][Zeta] Support get cluster metrics from seatunnel zeta client 
(#4139)
    
    * add get cluster metrics to seatunnel zeta client
    
    * add some test output
    
    * format code
    
    * fix ci error
---
 .../org/apache/seatunnel/engine/e2e/ClusterIT.java |  92 +++++
 .../seatunnel/engine/client/SeaTunnelClient.java   | 111 +++---
 .../seatunnel/engine/client/job/JobClient.java     |  93 +++++
 .../SeaTunnelGetClusterHealthMetricsCodec.java     |  81 ++++
 .../SeaTunnelEngine.yaml                           |  18 +-
 .../engine/server/SeaTunnelHealthMonitor.java      | 413 +++++++++++++++++++++
 .../seatunnel/engine/server/SeaTunnelServer.java   |   5 +
 .../GetClusterHealthMetricsOperation.java          |  54 +++
 .../protocol/task/GetClusterHealthMetricsTask.java |  53 +++
 .../task/SeaTunnelMessageTaskFactoryProvider.java  |   5 +
 .../ClientToServerOperationDataSerializerHook.java |   5 +
 11 files changed, 870 insertions(+), 60 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
new file mode 100644
index 000000000..acb2fab9b
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/ClusterIT.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.server.SeaTunnelServerStarter;
+
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class ClusterIT {
+
+    @Test
+    public void getClusterHealthMetrics() {
+        HazelcastInstanceImpl node1 = null;
+        HazelcastInstanceImpl node2 = null;
+        SeaTunnelClient engineClient = null;
+
+        String testClusterName = "Test_getClusterHealthMetrics";
+
+        SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
+        seaTunnelConfig
+                .getHazelcastConfig()
+                .setClusterName(TestUtils.getClusterName(testClusterName));
+
+        try {
+            node1 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+            node2 = 
SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig);
+            HazelcastInstanceImpl finalNode = node1;
+            Awaitility.await()
+                    .atMost(10000, TimeUnit.MILLISECONDS)
+                    .untilAsserted(
+                            () ->
+                                    Assertions.assertEquals(
+                                            2, 
finalNode.getCluster().getMembers().size()));
+
+            ClientConfig clientConfig = 
ConfigProvider.locateAndGetClientConfig();
+            
clientConfig.setClusterName(TestUtils.getClusterName(testClusterName));
+            engineClient = new SeaTunnelClient(clientConfig);
+
+            Map<String, String> clusterHealthMetrics = 
engineClient.getClusterHealthMetrics();
+            System.out.println(
+                    "=====================================cluster 
metrics==================================================");
+            for (Map.Entry<String, String> entry : 
clusterHealthMetrics.entrySet()) {
+                System.out.println(entry.getKey());
+                System.out.println(entry.getValue());
+                System.out.println(
+                        
"======================================================================================================");
+            }
+            Assertions.assertEquals(2, clusterHealthMetrics.size());
+
+        } finally {
+            if (engineClient != null) {
+                engineClient.shutdown();
+            }
+
+            if (node1 != null) {
+                node1.shutdown();
+            }
+
+            if (node2 != null) {
+                node2.shutdown();
+            }
+        }
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index f4526ea26..7b38112b6 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -17,36 +17,37 @@
 
 package org.apache.seatunnel.engine.client;
 
-import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
-import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
 import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
 
+import org.apache.seatunnel.common.utils.JsonUtils;
 import org.apache.seatunnel.engine.client.job.JobClient;
 import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
 import 
org.apache.seatunnel.engine.client.job.JobMetricsRunner.JobMetricsSummary;
 import org.apache.seatunnel.engine.common.config.JobConfig;
-import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
-import org.apache.seatunnel.engine.core.job.JobStatus;
-import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
-import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobDetailStatusCodec;
-import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobInfoCodec;
-import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec;
-import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
-import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec;
+import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetClusterHealthMetricsCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
-import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSavePointJobCodec;
 
 import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.cluster.Member;
 import com.hazelcast.logging.ILogger;
+import lombok.Getter;
 import lombok.NonNull;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
 public class SeaTunnelClient implements SeaTunnelClientInstance {
     private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private final SeaTunnelHazelcastClient hazelcastClient;
+    @Getter private final JobClient jobClient;
 
     public SeaTunnelClient(@NonNull ClientConfig clientConfig) {
         this.hazelcastClient = new SeaTunnelHazelcastClient(clientConfig);
+        this.jobClient = new JobClient(this.hazelcastClient);
     }
 
     @Override
@@ -90,17 +91,15 @@ public class SeaTunnelClient implements 
SeaTunnelClientInstance {
      *
      * @param jobId jobId
      */
+    @Deprecated
     public String getJobDetailStatus(Long jobId) {
-        return hazelcastClient.requestOnMasterAndDecodeResponse(
-                SeaTunnelGetJobDetailStatusCodec.encodeRequest(jobId),
-                SeaTunnelGetJobDetailStatusCodec::decodeResponse);
+        return jobClient.getJobDetailStatus(jobId);
     }
 
     /** list all jobId and job status */
+    @Deprecated
     public String listJobStatus() {
-        return hazelcastClient.requestOnMasterAndDecodeResponse(
-                SeaTunnelListJobStatusCodec.encodeRequest(),
-                SeaTunnelListJobStatusCodec::decodeResponse);
+        return jobClient.listJobStatus();
     }
 
     /**
@@ -108,63 +107,57 @@ public class SeaTunnelClient implements 
SeaTunnelClientInstance {
      *
      * @param jobId jobId
      */
+    @Deprecated
     public String getJobStatus(Long jobId) {
-        int jobStatusOrdinal =
-                hazelcastClient.requestOnMasterAndDecodeResponse(
-                        SeaTunnelGetJobStatusCodec.encodeRequest(jobId),
-                        SeaTunnelGetJobStatusCodec::decodeResponse);
-        return JobStatus.values()[jobStatusOrdinal].toString();
+        return jobClient.getJobStatus(jobId);
     }
 
+    @Deprecated
     public String getJobMetrics(Long jobId) {
-        return hazelcastClient.requestOnMasterAndDecodeResponse(
-                SeaTunnelGetJobMetricsCodec.encodeRequest(jobId),
-                SeaTunnelGetJobMetricsCodec::decodeResponse);
+        return jobClient.getJobMetrics(jobId);
     }
 
+    @Deprecated
     public void savePointJob(Long jobId) {
-        PassiveCompletableFuture<Void> cancelFuture =
-                hazelcastClient.requestOnMasterAndGetCompletableFuture(
-                        SeaTunnelSavePointJobCodec.encodeRequest(jobId));
-
-        cancelFuture.join();
+        jobClient.savePointJob(jobId);
     }
 
+    @Deprecated
     public void cancelJob(Long jobId) {
-        PassiveCompletableFuture<Void> cancelFuture =
-                hazelcastClient.requestOnMasterAndGetCompletableFuture(
-                        SeaTunnelCancelJobCodec.encodeRequest(jobId));
-
-        cancelFuture.join();
+        jobClient.cancelJob(jobId);
     }
 
     public JobDAGInfo getJobInfo(Long jobId) {
-        return hazelcastClient
-                .getSerializationService()
-                .toObject(
-                        hazelcastClient.requestOnMasterAndDecodeResponse(
-                                SeaTunnelGetJobInfoCodec.encodeRequest(jobId),
-                                SeaTunnelGetJobInfoCodec::decodeResponse));
+        return jobClient.getJobInfo(jobId);
     }
 
     public JobMetricsSummary getJobMetricsSummary(Long jobId) {
-        long sourceReadCount = 0L;
-        long sinkWriteCount = 0L;
-        String jobMetrics = getJobMetrics(jobId);
-        try {
-            JsonNode jsonNode = OBJECT_MAPPER.readTree(jobMetrics);
-            JsonNode sourceReaders = jsonNode.get("SourceReceivedCount");
-            JsonNode sinkWriters = jsonNode.get("SinkWriteCount");
-            for (int i = 0; i < sourceReaders.size(); i++) {
-                JsonNode sourceReader = sourceReaders.get(i);
-                JsonNode sinkWriter = sinkWriters.get(i);
-                sourceReadCount += sourceReader.get("value").asLong();
-                sinkWriteCount += sinkWriter.get("value").asLong();
-            }
-            return new JobMetricsSummary(sourceReadCount, sinkWriteCount);
-            // Add NullPointerException because of metrics information can be 
empty like {}
-        } catch (JsonProcessingException | NullPointerException e) {
-            return new JobMetricsSummary(sourceReadCount, sinkWriteCount);
-        }
+        return jobClient.getJobMetricsSummary(jobId);
+    }
+
+    public Map<String, String> getClusterHealthMetrics() {
+        Set<Member> members = 
hazelcastClient.getHazelcastInstance().getCluster().getMembers();
+        Map<String, String> healthMetricsMap = new HashMap<>();
+        members.stream()
+                .forEach(
+                        member -> {
+                            String metrics =
+                                    hazelcastClient.requestAndDecodeResponse(
+                                            member.getUuid(),
+                                            
SeaTunnelGetClusterHealthMetricsCodec.encodeRequest(),
+                                            
SeaTunnelGetClusterHealthMetricsCodec::decodeResponse);
+                            String[] split = metrics.split(",");
+                            Map<String, String> kvMap = new LinkedHashMap<>();
+                            Arrays.stream(split)
+                                    .forEach(
+                                            kv -> {
+                                                String[] kvArr = kv.split("=");
+                                                kvMap.put(kvArr[0], kvArr[1]);
+                                            });
+                            healthMetricsMap.put(
+                                    member.getAddress().toString(), 
JsonUtils.toJsonString(kvMap));
+                        });
+
+        return healthMetricsMap;
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
index 498702544..476aa71ea 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
@@ -17,13 +17,28 @@
 
 package org.apache.seatunnel.engine.client.job;
 
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+
 import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
 import org.apache.seatunnel.engine.common.Constant;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
+import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobDetailStatusCodec;
+import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobInfoCodec;
+import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec;
+import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
+import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec;
+import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSavePointJobCodec;
 
 import lombok.NonNull;
 
 public class JobClient {
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private final SeaTunnelHazelcastClient hazelcastClient;
 
     public JobClient(@NonNull SeaTunnelHazelcastClient hazelcastClient) {
@@ -40,4 +55,82 @@ public class JobClient {
     public ClientJobProxy createJobProxy(@NonNull JobImmutableInformation 
jobImmutableInformation) {
         return new ClientJobProxy(hazelcastClient, jobImmutableInformation);
     }
+
+    public String getJobDetailStatus(Long jobId) {
+        return hazelcastClient.requestOnMasterAndDecodeResponse(
+                SeaTunnelGetJobDetailStatusCodec.encodeRequest(jobId),
+                SeaTunnelGetJobDetailStatusCodec::decodeResponse);
+    }
+
+    /** list all jobId and job status */
+    public String listJobStatus() {
+        return hazelcastClient.requestOnMasterAndDecodeResponse(
+                SeaTunnelListJobStatusCodec.encodeRequest(),
+                SeaTunnelListJobStatusCodec::decodeResponse);
+    }
+
+    /**
+     * get one job status
+     *
+     * @param jobId jobId
+     */
+    public String getJobStatus(Long jobId) {
+        int jobStatusOrdinal =
+                hazelcastClient.requestOnMasterAndDecodeResponse(
+                        SeaTunnelGetJobStatusCodec.encodeRequest(jobId),
+                        SeaTunnelGetJobStatusCodec::decodeResponse);
+        return JobStatus.values()[jobStatusOrdinal].toString();
+    }
+
+    public String getJobMetrics(Long jobId) {
+        return hazelcastClient.requestOnMasterAndDecodeResponse(
+                SeaTunnelGetJobMetricsCodec.encodeRequest(jobId),
+                SeaTunnelGetJobMetricsCodec::decodeResponse);
+    }
+
+    public void savePointJob(Long jobId) {
+        PassiveCompletableFuture<Void> cancelFuture =
+                hazelcastClient.requestOnMasterAndGetCompletableFuture(
+                        SeaTunnelSavePointJobCodec.encodeRequest(jobId));
+
+        cancelFuture.join();
+    }
+
+    public void cancelJob(Long jobId) {
+        PassiveCompletableFuture<Void> cancelFuture =
+                hazelcastClient.requestOnMasterAndGetCompletableFuture(
+                        SeaTunnelCancelJobCodec.encodeRequest(jobId));
+
+        cancelFuture.join();
+    }
+
+    public JobDAGInfo getJobInfo(Long jobId) {
+        return hazelcastClient
+                .getSerializationService()
+                .toObject(
+                        hazelcastClient.requestOnMasterAndDecodeResponse(
+                                SeaTunnelGetJobInfoCodec.encodeRequest(jobId),
+                                SeaTunnelGetJobInfoCodec::decodeResponse));
+    }
+
+    public JobMetricsRunner.JobMetricsSummary getJobMetricsSummary(Long jobId) 
{
+        long sourceReadCount = 0L;
+        long sinkWriteCount = 0L;
+        String jobMetrics = getJobMetrics(jobId);
+        try {
+            JsonNode jsonNode = OBJECT_MAPPER.readTree(jobMetrics);
+            JsonNode sourceReaders = jsonNode.get("SourceReceivedCount");
+            JsonNode sinkWriters = jsonNode.get("SinkWriteCount");
+            for (int i = 0; i < sourceReaders.size(); i++) {
+                JsonNode sourceReader = sourceReaders.get(i);
+                JsonNode sinkWriter = sinkWriters.get(i);
+                sourceReadCount += sourceReader.get("value").asLong();
+                sinkWriteCount += sinkWriter.get("value").asLong();
+            }
+            return new JobMetricsRunner.JobMetricsSummary(sourceReadCount, 
sinkWriteCount);
+            // Add NullPointerException because of metrics information can be 
empty like {}
+        } catch (JsonProcessingException | NullPointerException e) {
+            return new JobMetricsRunner.JobMetricsSummary(sourceReadCount, 
sinkWriteCount);
+        }
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetClusterHealthMetricsCodec.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetClusterHealthMetricsCodec.java
new file mode 100644
index 000000000..14fe321f7
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetClusterHealthMetricsCodec.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.protocol.codec;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.client.impl.protocol.Generated;
+import com.hazelcast.client.impl.protocol.codec.builtin.StringCodec;
+
+import static 
com.hazelcast.client.impl.protocol.ClientMessage.ForwardFrameIterator;
+import static com.hazelcast.client.impl.protocol.ClientMessage.Frame;
+import static 
com.hazelcast.client.impl.protocol.ClientMessage.PARTITION_ID_FIELD_OFFSET;
+import static 
com.hazelcast.client.impl.protocol.ClientMessage.RESPONSE_BACKUP_ACKS_FIELD_OFFSET;
+import static 
com.hazelcast.client.impl.protocol.ClientMessage.TYPE_FIELD_OFFSET;
+import static 
com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESSAGE;
+import static 
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES;
+import static 
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES;
+import static 
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt;
+
+/*
+ * This file is auto-generated by the Hazelcast Client Protocol Code Generator.
+ * To change this file, edit the templates or the protocol
+ * definitions on the https://github.com/hazelcast/hazelcast-client-protocol
+ * and regenerate it.
+ */
+@Generated("96c8a873ec6eee0bda3a16b1f849a137")
+public final class SeaTunnelGetClusterHealthMetricsCodec {
+    // hex: 0xDE0B00
+    public static final int REQUEST_MESSAGE_TYPE = 14551808;
+    // hex: 0xDE0B01
+    public static final int RESPONSE_MESSAGE_TYPE = 14551809;
+    private static final int REQUEST_INITIAL_FRAME_SIZE =
+            PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+    private static final int RESPONSE_INITIAL_FRAME_SIZE =
+            RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
+
+    private SeaTunnelGetClusterHealthMetricsCodec() {}
+
+    public static ClientMessage encodeRequest() {
+        ClientMessage clientMessage = ClientMessage.createForEncode();
+        clientMessage.setRetryable(true);
+        clientMessage.setOperationName("SeaTunnel.GetClusterHealthMetrics");
+        Frame initialFrame = new Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], 
UNFRAGMENTED_MESSAGE);
+        encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, 
REQUEST_MESSAGE_TYPE);
+        encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
+        clientMessage.add(initialFrame);
+        return clientMessage;
+    }
+
+    public static ClientMessage encodeResponse(String response) {
+        ClientMessage clientMessage = ClientMessage.createForEncode();
+        Frame initialFrame = new Frame(new byte[RESPONSE_INITIAL_FRAME_SIZE], 
UNFRAGMENTED_MESSAGE);
+        encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, 
RESPONSE_MESSAGE_TYPE);
+        clientMessage.add(initialFrame);
+
+        StringCodec.encode(clientMessage, response);
+        return clientMessage;
+    }
+
+    /** */
+    public static String decodeResponse(ClientMessage clientMessage) {
+        ForwardFrameIterator iterator = clientMessage.frameIterator();
+        // empty initial frame
+        iterator.next();
+        return StringCodec.decode(iterator);
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
 
b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
index 1c70091c7..78317d825 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
@@ -207,4 +207,20 @@ methods:
           nullable: false
           since: 2.0
           doc: ''
-    response: {}
\ No newline at end of file
+    response: {}
+
+  - id: 11
+    name: getClusterHealthMetrics
+    since: 2.0
+    doc: ''
+    request:
+      retryable: true
+      partitionIdentifier: -1
+      params: []
+    response:
+      params:
+        - name: response
+          type: String
+          nullable: false
+          since: 2.0
+          doc: ''
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelHealthMonitor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelHealthMonitor.java
new file mode 100644
index 000000000..b7900871e
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelHealthMonitor.java
@@ -0,0 +1,413 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server;
+
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.diagnostics.HealthMonitorLevel;
+import com.hazelcast.internal.memory.MemoryStats;
+import com.hazelcast.internal.metrics.DoubleGauge;
+import com.hazelcast.internal.metrics.LongGauge;
+import com.hazelcast.internal.metrics.MetricsRegistry;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.spi.properties.ClusterProperty;
+import lombok.Getter;
+
+import static com.hazelcast.internal.diagnostics.HealthMonitorLevel.valueOf;
+import static 
com.hazelcast.spi.properties.ClusterProperty.HEALTH_MONITORING_THRESHOLD_CPU_PERCENTAGE;
+import static 
com.hazelcast.spi.properties.ClusterProperty.HEALTH_MONITORING_THRESHOLD_MEMORY_PERCENTAGE;
+import static java.lang.String.format;
+
+public class SeaTunnelHealthMonitor {
+    private static final String[] UNITS = new String[] {"", "K", "M", "G", 
"T", "P", "E"};
+    private static final double PERCENTAGE_MULTIPLIER = 100d;
+    private static final double THRESHOLD_PERCENTAGE_INVOCATIONS = 70;
+    private static final double THRESHOLD_INVOCATIONS = 1000;
+
+    private final ILogger logger;
+    private final Node node;
+    private final HealthMonitorLevel monitorLevel;
+    private final int thresholdMemoryPercentage;
+    private final int thresholdCPUPercentage;
+    private final MetricsRegistry metricRegistry;
+
+    @Getter private final SeaTunnelHealthMetrics healthMetrics;
+
+    public SeaTunnelHealthMonitor(Node node) {
+        this.node = node;
+        this.logger = 
node.getLogger(com.hazelcast.internal.diagnostics.HealthMonitor.class);
+        this.metricRegistry = node.nodeEngine.getMetricsRegistry();
+        this.monitorLevel = getHealthMonitorLevel();
+        this.thresholdMemoryPercentage =
+                
node.getProperties().getInteger(HEALTH_MONITORING_THRESHOLD_MEMORY_PERCENTAGE);
+        this.thresholdCPUPercentage =
+                
node.getProperties().getInteger(HEALTH_MONITORING_THRESHOLD_CPU_PERCENTAGE);
+        this.healthMetrics = new SeaTunnelHealthMetrics();
+    }
+
+    private HealthMonitorLevel getHealthMonitorLevel() {
+        String healthMonitorLevel =
+                
node.getProperties().getString(ClusterProperty.HEALTH_MONITORING_LEVEL);
+        return valueOf(healthMonitorLevel);
+    }
+
+    /**
+     * Given a number, returns that number as a percentage string.
+     *
+     * @param p the given number
+     * @return a string of the given number as a format float with two decimal 
places and a period
+     */
+    private static String percentageString(double p) {
+        return format("%.2f%%", p);
+    }
+
+    @SuppressWarnings("checkstyle:magicnumber")
+    private static String numberToUnit(long number) {
+        for (int i = 6; i > 0; i--) {
+            // 1024 is for 1024 kb is 1 MB etc
+            double step = Math.pow(1024, i);
+            if (number > step) {
+                return format("%3.1f%s", number / step, UNITS[i]);
+            }
+        }
+        return Long.toString(number);
+    }
+
+    public class SeaTunnelHealthMetrics {
+        final LongGauge clientEndpointCount = 
metricRegistry.newLongGauge("client.endpoint.count");
+        final LongGauge clusterTimeDiff =
+                metricRegistry.newLongGauge("cluster.clock.clusterTimeDiff");
+
+        final LongGauge executorAsyncQueueSize =
+                metricRegistry.newLongGauge("executor.hz:async.queueSize");
+        final LongGauge executorClientQueueSize =
+                metricRegistry.newLongGauge("executor.hz:client.queueSize");
+        final LongGauge executorQueryClientQueueSize =
+                
metricRegistry.newLongGauge("executor.hz:client.query.queueSize");
+        final LongGauge executorBlockingClientQueueSize =
+                
metricRegistry.newLongGauge("executor.hz:client.blocking.queueSize");
+        final LongGauge executorClusterQueueSize =
+                metricRegistry.newLongGauge("executor.hz:cluster.queueSize");
+        final LongGauge executorScheduledQueueSize =
+                metricRegistry.newLongGauge("executor.hz:scheduled.queueSize");
+        final LongGauge executorSystemQueueSize =
+                metricRegistry.newLongGauge("executor.hz:system.queueSize");
+        final LongGauge executorIoQueueSize =
+                metricRegistry.newLongGauge("executor.hz:io.queueSize");
+        final LongGauge executorQueryQueueSize =
+                metricRegistry.newLongGauge("executor.hz:query.queueSize");
+        final LongGauge executorMapLoadQueueSize =
+                metricRegistry.newLongGauge("executor.hz:map-load.queueSize");
+        final LongGauge executorMapLoadAllKeysQueueSize =
+                
metricRegistry.newLongGauge("executor.hz:map-loadAllKeys.queueSize");
+
+        final LongGauge eventQueueSize = 
metricRegistry.newLongGauge("event.eventQueueSize");
+
+        final LongGauge gcMinorCount = 
metricRegistry.newLongGauge("gc.minorCount");
+        final LongGauge gcMinorTime = 
metricRegistry.newLongGauge("gc.minorTime");
+        final LongGauge gcMajorCount = 
metricRegistry.newLongGauge("gc.majorCount");
+        final LongGauge gcMajorTime = 
metricRegistry.newLongGauge("gc.majorTime");
+        final LongGauge gcUnknownCount = 
metricRegistry.newLongGauge("gc.unknownCount");
+        final LongGauge gcUnknownTime = 
metricRegistry.newLongGauge("gc.unknownTime");
+
+        final LongGauge runtimeAvailableProcessors =
+                metricRegistry.newLongGauge("runtime.availableProcessors");
+        final LongGauge runtimeMaxMemory = 
metricRegistry.newLongGauge("runtime.maxMemory");
+        final LongGauge runtimeFreeMemory = 
metricRegistry.newLongGauge("runtime.freeMemory");
+        final LongGauge runtimeTotalMemory = 
metricRegistry.newLongGauge("runtime.totalMemory");
+        final LongGauge runtimeUsedMemory = 
metricRegistry.newLongGauge("runtime.usedMemory");
+
+        final LongGauge threadPeakThreadCount =
+                metricRegistry.newLongGauge("thread.peakThreadCount");
+        final LongGauge threadThreadCount = 
metricRegistry.newLongGauge("thread.threadCount");
+
+        final DoubleGauge osProcessCpuLoad = 
metricRegistry.newDoubleGauge("os.processCpuLoad");
+        final DoubleGauge osSystemLoadAverage =
+                metricRegistry.newDoubleGauge("os.systemLoadAverage");
+        final DoubleGauge osSystemCpuLoad = 
metricRegistry.newDoubleGauge("os.systemCpuLoad");
+        final LongGauge osTotalPhysicalMemorySize =
+                metricRegistry.newLongGauge("os.totalPhysicalMemorySize");
+        final LongGauge osFreePhysicalMemorySize =
+                metricRegistry.newLongGauge("os.freePhysicalMemorySize");
+        final LongGauge osTotalSwapSpaceSize = 
metricRegistry.newLongGauge("os.totalSwapSpaceSize");
+        final LongGauge osFreeSwapSpaceSize = 
metricRegistry.newLongGauge("os.freeSwapSpaceSize");
+
+        final LongGauge operationServiceExecutorQueueSize =
+                metricRegistry.newLongGauge("operation.queueSize");
+        final LongGauge operationServiceExecutorPriorityQueueSize =
+                metricRegistry.newLongGauge("operation.priorityQueueSize");
+        final LongGauge operationServiceResponseQueueSize =
+                metricRegistry.newLongGauge("operation.responseQueueSize");
+        final LongGauge operationServiceRunningOperationsCount =
+                metricRegistry.newLongGauge("operation.runningCount");
+        final LongGauge operationServiceCompletedOperationsCount =
+                metricRegistry.newLongGauge("operation.completedCount");
+        final LongGauge operationServicePendingInvocationsCount =
+                metricRegistry.newLongGauge("operation.invocations.pending");
+        final DoubleGauge operationServicePendingInvocationsPercentage =
+                metricRegistry.newDoubleGauge("operation.invocations.used");
+
+        final LongGauge proxyCount = 
metricRegistry.newLongGauge("proxy.proxyCount");
+
+        final LongGauge tcpConnectionActiveCount =
+                metricRegistry.newLongGauge("tcp.connection.activeCount");
+        final LongGauge tcpConnectionCount = 
metricRegistry.newLongGauge("tcp.connection.count");
+        final LongGauge tcpConnectionClientCount =
+                metricRegistry.newLongGauge("tcp.connection.clientCount");
+
+        private final StringBuilder sb = new StringBuilder();
+        private double memoryUsedOfTotalPercentage;
+        private double memoryUsedOfMaxPercentage;
+
+        public void update() {
+            memoryUsedOfTotalPercentage =
+                    (PERCENTAGE_MULTIPLIER * runtimeUsedMemory.read()) / 
runtimeTotalMemory.read();
+            memoryUsedOfMaxPercentage =
+                    (PERCENTAGE_MULTIPLIER * runtimeUsedMemory.read()) / 
runtimeMaxMemory.read();
+        }
+
+        boolean exceedsThreshold() {
+            if (memoryUsedOfMaxPercentage > thresholdMemoryPercentage) {
+                return true;
+            }
+            if (osProcessCpuLoad.read() > thresholdCPUPercentage) {
+                return true;
+            }
+            if (osSystemCpuLoad.read() > thresholdCPUPercentage) {
+                return true;
+            }
+            if (operationServicePendingInvocationsPercentage.read()
+                    > THRESHOLD_PERCENTAGE_INVOCATIONS) {
+                return true;
+            }
+            if (operationServicePendingInvocationsCount.read() > 
THRESHOLD_INVOCATIONS) {
+                return true;
+            }
+            return false;
+        }
+
+        public String render() {
+            update();
+            sb.setLength(0);
+            renderProcessors();
+            renderPhysicalMemory();
+            renderSwap();
+            renderHeap();
+            renderNativeMemory();
+            renderGc();
+            renderLoad();
+            renderThread();
+            renderCluster();
+            renderEvents();
+            renderExecutors();
+            renderOperationService();
+            renderProxy();
+            renderClient();
+            renderConnection();
+            return sb.toString();
+        }
+
+        private void renderConnection() {
+            sb.append("connection.active.count=")
+                    .append(tcpConnectionActiveCount.read())
+                    .append(", ");
+            sb.append("client.connection.count=")
+                    .append(tcpConnectionClientCount.read())
+                    .append(", ");
+            sb.append("connection.count=").append(tcpConnectionCount.read());
+        }
+
+        private void renderClient() {
+            
sb.append("clientEndpoint.count=").append(clientEndpointCount.read()).append(", 
");
+        }
+
+        private void renderProxy() {
+            sb.append("proxy.count=").append(proxyCount.read()).append(", ");
+        }
+
+        private void renderLoad() {
+            sb.append("load.process")
+                    .append('=')
+                    .append(format("%.2f", osProcessCpuLoad.read()))
+                    .append("%, ");
+            sb.append("load.system")
+                    .append('=')
+                    .append(format("%.2f", osSystemCpuLoad.read()))
+                    .append("%, ");
+
+            double value = osSystemLoadAverage.read();
+            if (value < 0) {
+                sb.append("load.systemAverage").append("=n/a ");
+            } else {
+                sb.append("load.systemAverage")
+                        .append('=')
+                        .append(format("%.2f", osSystemLoadAverage.read()))
+                        .append(", ");
+            }
+        }
+
+        private void renderProcessors() {
+            
sb.append("processors=").append(runtimeAvailableProcessors.read()).append(", ");
+        }
+
+        private void renderPhysicalMemory() {
+            sb.append("physical.memory.total=")
+                    .append(numberToUnit(osTotalPhysicalMemorySize.read()))
+                    .append(", ");
+            sb.append("physical.memory.free=")
+                    .append(numberToUnit(osFreePhysicalMemorySize.read()))
+                    .append(", ");
+        }
+
+        private void renderSwap() {
+            sb.append("swap.space.total=")
+                    .append(numberToUnit(osTotalSwapSpaceSize.read()))
+                    .append(", ");
+            sb.append("swap.space.free=")
+                    .append(numberToUnit(osFreeSwapSpaceSize.read()))
+                    .append(", ");
+        }
+
+        @SuppressWarnings("checkstyle:UnnecessaryParentheses")
+        private void renderHeap() {
+            sb.append("heap.memory.used=")
+                    .append(numberToUnit(runtimeUsedMemory.read()))
+                    .append(", ");
+            sb.append("heap.memory.free=")
+                    .append(numberToUnit(runtimeFreeMemory.read()))
+                    .append(", ");
+            sb.append("heap.memory.total=")
+                    .append(numberToUnit(runtimeTotalMemory.read()))
+                    .append(", ");
+            sb.append("heap.memory.max=")
+                    .append(numberToUnit(runtimeMaxMemory.read()))
+                    .append(", ");
+            sb.append("heap.memory.used/total=")
+                    .append(percentageString(memoryUsedOfTotalPercentage))
+                    .append(", ");
+            sb.append("heap.memory.used/max=")
+                    .append(percentageString(memoryUsedOfMaxPercentage))
+                    .append((", "));
+        }
+
+        private void renderEvents() {
+            sb.append("event.q.size=").append(eventQueueSize.read()).append(", 
");
+        }
+
+        private void renderCluster() {
+            
sb.append("cluster.timeDiff=").append(clusterTimeDiff.read()).append(", ");
+        }
+
+        private void renderThread() {
+            
sb.append("thread.count=").append(threadThreadCount.read()).append(", ");
+            
sb.append("thread.peakCount=").append(threadPeakThreadCount.read()).append(", 
");
+        }
+
+        private void renderGc() {
+            sb.append("minor.gc.count=").append(gcMinorCount.read()).append(", 
");
+            sb.append("minor.gc.time=").append(gcMinorTime.read()).append("ms, 
");
+            sb.append("major.gc.count=").append(gcMajorCount.read()).append(", 
");
+            sb.append("major.gc.time=").append(gcMajorTime.read()).append("ms, 
");
+
+            if (gcUnknownCount.read() > 0) {
+                
sb.append("unknown.gc.count=").append(gcUnknownCount.read()).append(", ");
+                
sb.append("unknown.gc.time=").append(gcUnknownTime.read()).append("ms, ");
+            }
+        }
+
+        private void renderNativeMemory() {
+            MemoryStats memoryStats = node.getNodeExtension().getMemoryStats();
+            if (memoryStats.getMaxNative() <= 0L) {
+                return;
+            }
+
+            final long maxNative = memoryStats.getMaxNative();
+            final long usedNative = memoryStats.getUsedNative();
+            final long usedMeta = memoryStats.getUsedMetadata();
+
+            
sb.append("native.memory.used=").append(numberToUnit(usedNative)).append(", ");
+            sb.append("native.memory.free=")
+                    .append(numberToUnit(memoryStats.getFreeNative()))
+                    .append(", ");
+            sb.append("native.memory.total=")
+                    .append(numberToUnit(memoryStats.getCommittedNative()))
+                    .append(", ");
+            
sb.append("native.memory.max=").append(numberToUnit(maxNative)).append(", ");
+            
sb.append("native.meta.memory.used=").append(numberToUnit(usedMeta)).append(", 
");
+            sb.append("native.meta.memory.free=")
+                    .append(numberToUnit(maxNative - usedMeta))
+                    .append(", ");
+            sb.append("native.meta.memory.percentage=")
+                    .append(percentageString(PERCENTAGE_MULTIPLIER * usedMeta 
/ maxNative))
+                    .append(", ");
+        }
+
+        private void renderExecutors() {
+            
sb.append("executor.q.async.size=").append(executorAsyncQueueSize.read()).append(",
 ");
+            sb.append("executor.q.client.size=")
+                    .append(executorClientQueueSize.read())
+                    .append(", ");
+            sb.append("executor.q.client.query.size=")
+                    .append(executorQueryClientQueueSize.read())
+                    .append(", ");
+            sb.append("executor.q.client.blocking.size=")
+                    .append(executorBlockingClientQueueSize.read())
+                    .append(", ");
+            
sb.append("executor.q.query.size=").append(executorQueryQueueSize.read()).append(",
 ");
+            sb.append("executor.q.scheduled.size=")
+                    .append(executorScheduledQueueSize.read())
+                    .append(", ");
+            
sb.append("executor.q.io.size=").append(executorIoQueueSize.read()).append(", 
");
+            sb.append("executor.q.system.size=")
+                    .append(executorSystemQueueSize.read())
+                    .append(", ");
+            sb.append("executor.q.operations.size=")
+                    .append(operationServiceExecutorQueueSize.read())
+                    .append(", ");
+            sb.append("executor.q.priorityOperation.size=")
+                    .append(operationServiceExecutorPriorityQueueSize.read())
+                    .append(", ");
+            sb.append("operations.completed.count=")
+                    .append(operationServiceCompletedOperationsCount.read())
+                    .append(", ");
+            sb.append("executor.q.mapLoad.size=")
+                    .append(executorMapLoadQueueSize.read())
+                    .append(", ");
+            sb.append("executor.q.mapLoadAllKeys.size=")
+                    .append(executorMapLoadAllKeysQueueSize.read())
+                    .append(", ");
+            sb.append("executor.q.cluster.size=")
+                    .append(executorClusterQueueSize.read())
+                    .append(", ");
+        }
+
+        private void renderOperationService() {
+            sb.append("executor.q.response.size=")
+                    .append(operationServiceResponseQueueSize.read())
+                    .append(", ");
+            sb.append("operations.running.count=")
+                    .append(operationServiceRunningOperationsCount.read())
+                    .append(", ");
+            sb.append("operations.pending.invocations.percentage=")
+                    .append(format("%.2f", 
operationServicePendingInvocationsPercentage.read()))
+                    .append("%, ");
+            sb.append("operations.pending.invocations.count=")
+                    .append(operationServicePendingInvocationsCount.read())
+                    .append(", ");
+        }
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 861b51270..07cb88870 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -37,6 +37,7 @@ import com.hazelcast.spi.impl.NodeEngine;
 import com.hazelcast.spi.impl.NodeEngineImpl;
 import com.hazelcast.spi.impl.operationservice.LiveOperations;
 import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
+import lombok.Getter;
 import lombok.NonNull;
 
 import java.util.Properties;
@@ -59,6 +60,8 @@ public class SeaTunnelServer
     private CoordinatorService coordinatorService;
     private ScheduledExecutorService monitorService;
 
+    @Getter private SeaTunnelHealthMonitor seaTunnelHealthMonitor;
+
     private final SeaTunnelConfig seaTunnelConfig;
 
     private volatile boolean isRunning = true;
@@ -105,6 +108,8 @@ public class SeaTunnelServer
                 0,
                 
seaTunnelConfig.getEngineConfig().getPrintExecutionInfoInterval(),
                 TimeUnit.SECONDS);
+
+        seaTunnelHealthMonitor = new SeaTunnelHealthMonitor(((NodeEngineImpl) 
engine).getNode());
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetClusterHealthMetricsOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetClusterHealthMetricsOperation.java
new file mode 100644
index 000000000..20e56a7be
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetClusterHealthMetricsOperation.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.operation;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import 
org.apache.seatunnel.engine.server.serializable.ClientToServerOperationDataSerializerHook;
+
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.AllowedDuringPassiveState;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class GetClusterHealthMetricsOperation extends Operation
+        implements IdentifiedDataSerializable, AllowedDuringPassiveState {
+
+    private String response;
+
+    public GetClusterHealthMetricsOperation() {}
+
+    @Override
+    public int getFactoryId() {
+        return ClientToServerOperationDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return 
ClientToServerOperationDataSerializerHook.GET_CLUSTER_HEALTH_METRICS;
+    }
+
+    @Override
+    public void run() {
+        SeaTunnelServer service = getService();
+        response = 
service.getSeaTunnelHealthMonitor().getHealthMetrics().render();
+    }
+
+    @Override
+    public Object getResponse() {
+        return response;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetClusterHealthMetricsTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetClusterHealthMetricsTask.java
new file mode 100644
index 000000000..45d25d08e
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetClusterHealthMetricsTask.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.protocol.task;
+
+import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetClusterHealthMetricsCodec;
+import 
org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.instance.impl.Node;
+import com.hazelcast.internal.nio.Connection;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+public class GetClusterHealthMetricsTask extends 
AbstractSeaTunnelMessageTask<Void, String> {
+    protected GetClusterHealthMetricsTask(
+            ClientMessage clientMessage, Node node, Connection connection) {
+        super(
+                clientMessage,
+                node,
+                connection,
+                m -> null,
+                SeaTunnelGetClusterHealthMetricsCodec::encodeResponse);
+    }
+
+    @Override
+    protected Operation prepareOperation() {
+        return new GetClusterHealthMetricsOperation();
+    }
+
+    @Override
+    public String getMethodName() {
+        return "getClusterHealthMetrics";
+    }
+
+    @Override
+    public Object[] getParameters() {
+        return new Object[0];
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
index ea5c53eb0..6d19ada03 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
@@ -18,6 +18,7 @@
 package org.apache.seatunnel.engine.server.protocol.task;
 
 import org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelCancelJobCodec;
+import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetClusterHealthMetricsCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobDetailStatusCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobInfoCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec;
@@ -87,5 +88,9 @@ public class SeaTunnelMessageTaskFactoryProvider implements 
MessageTaskFactoryPr
                 SeaTunnelSavePointJobCodec.REQUEST_MESSAGE_TYPE,
                 (clientMessage, connection) ->
                         new SavePointJobTask(clientMessage, node, connection));
+        factories.put(
+                SeaTunnelGetClusterHealthMetricsCodec.REQUEST_MESSAGE_TYPE,
+                (clientMessage, connection) ->
+                        new GetClusterHealthMetricsTask(clientMessage, node, 
connection));
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
index 68a468b24..4cfef1cd5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server.serializable;
 
 import 
org.apache.seatunnel.engine.common.serializeable.SeaTunnelFactoryIdConstant;
 import org.apache.seatunnel.engine.server.operation.CancelJobOperation;
+import 
org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation;
 import 
org.apache.seatunnel.engine.server.operation.GetJobDetailStatusOperation;
 import org.apache.seatunnel.engine.server.operation.GetJobInfoOperation;
 import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
@@ -57,6 +58,8 @@ public final class ClientToServerOperationDataSerializerHook 
implements DataSeri
 
     public static final int SAVEPOINT_JOB_OPERATOR = 8;
 
+    public static final int GET_CLUSTER_HEALTH_METRICS = 9;
+
     public static final int FACTORY_ID =
             FactoryIdHelper.getFactoryId(
                     
SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
@@ -95,6 +98,8 @@ public final class ClientToServerOperationDataSerializerHook 
implements DataSeri
                     return new GetJobInfoOperation();
                 case SAVEPOINT_JOB_OPERATOR:
                     return new SavePointJobOperation();
+                case GET_CLUSTER_HEALTH_METRICS:
+                    return new GetClusterHealthMetricsOperation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + 
typeId);
             }

Reply via email to