This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8266544d03 [Improve][Zeta] Add an interface for batch retrieval of 
JobMetrics (#4576)
8266544d03 is described below

commit 8266544d035b6c8f1550ac91393a86fec88293eb
Author: ic4y <[email protected]>
AuthorDate: Tue Jun 20 11:03:06 2023 +0800

    [Improve][Zeta] Add an interface for batch retrieval of JobMetrics (#4576)
---
 .../starter/seatunnel/args/ClientCommandArgs.java  |  5 ++
 .../seatunnel/command/ClientExecuteCommand.java    |  3 +
 .../seatunnel/engine/client/job/JobClient.java     |  7 ++
 .../engine/client/SeaTunnelClientTest.java         | 54 ++++++++++++
 .../src/test/resources/batch_fake_to_console.conf  | 48 +++++++++++
 .../codec/SeaTunnelGetRunningJobMetricsCodec.java  | 84 +++++++++++++++++++
 .../SeaTunnelEngine.yaml                           | 16 ++++
 .../engine/server/CoordinatorService.java          | 78 +++++++++++++++++
 .../engine/server/metrics/JobMetricsUtil.java      | 80 ++++++++++++++++++
 .../server/metrics/ZetaMetricsCollector.java       | 76 +++++++++++++++++
 .../server/operation/GetJobMetricsOperation.java   |  9 +-
 ...ion.java => GetRunningJobMetricsOperation.java} | 22 ++---
 .../server/protocol/task/GetJobMetricsTask.java    |  2 +-
 ...ricsTask.java => GetRunningJobMetricsTask.java} | 17 ++--
 .../task/SeaTunnelMessageTaskFactoryProvider.java  |  5 ++
 .../ClientToServerOperationDataSerializerHook.java |  5 ++
 .../serializable/TaskDataSerializerHook.java       |  5 ++
 .../server/task/operation/GetMetricsOperation.java | 97 ++++++++++++++++++++++
 18 files changed, 587 insertions(+), 26 deletions(-)

diff --git 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
index 840266bf45..9504c9cb2c 100644
--- 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
+++ 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
@@ -78,6 +78,11 @@ public class ClientCommandArgs extends AbstractCommandArgs {
             description = "Get job metrics by JobId")
     private String metricsJobId;
 
+    @Parameter(
+            names = {"--get_running_job_metrics"},
+            description = "Gets metrics for running jobs")
+    private boolean getRunningJobMetrics = false;
+
     @Parameter(
             names = {"-l", "--list"},
             description = "list job status")
diff --git 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index 50112cb2a6..14b00540f2 100644
--- 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -97,6 +97,9 @@ public class ClientExecuteCommand implements 
Command<ClientCommandArgs> {
             if (clientCommandArgs.isListJob()) {
                 String jobStatus = 
engineClient.getJobClient().listJobStatus(true);
                 System.out.println(jobStatus);
+            } else if (clientCommandArgs.isGetRunningJobMetrics()) {
+                String runningJobMetrics = 
engineClient.getJobClient().getRunningJobMetrics();
+                System.out.println(runningJobMetrics);
             } else if (null != clientCommandArgs.getJobId()) {
                 String jobState =
                         engineClient
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
index 8405f409ee..c4a4f68a69 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobClient.java
@@ -35,6 +35,7 @@ import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobDetailStat
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobInfoCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
+import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetRunningJobMetricsCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSavePointJobCodec;
 
@@ -117,6 +118,12 @@ public class JobClient {
                 SeaTunnelGetJobMetricsCodec::decodeResponse);
     }
 
+    public String getRunningJobMetrics() {
+        return hazelcastClient.requestOnMasterAndDecodeResponse(
+                SeaTunnelGetRunningJobMetricsCodec.encodeRequest(),
+                SeaTunnelGetRunningJobMetricsCodec::decodeResponse);
+    }
+
     public void savePointJob(Long jobId) {
         PassiveCompletableFuture<Void> cancelFuture =
                 hazelcastClient.requestOnMasterAndGetCompletableFuture(
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 369120673a..85aec59c27 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -202,6 +202,8 @@ public class SeaTunnelClientTest {
 
             String jobMetrics = jobClient.getJobMetrics(jobId);
 
+            System.out.println(jobMetrics);
+
             Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_COUNT));
             Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_QPS));
             Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT));
@@ -214,6 +216,58 @@ public class SeaTunnelClientTest {
         }
     }
 
+    @Test
+    public void testGetRunningJobMetrics() throws ExecutionException, 
InterruptedException {
+        Common.setDeployMode(DeployMode.CLUSTER);
+        String filePath = TestUtils.getResource("/batch_fake_to_console.conf");
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setName("fake_to_console1");
+
+        SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
+        JobClient jobClient = seaTunnelClient.getJobClient();
+
+        ClientJobProxy execute1 =
+                seaTunnelClient.createExecutionContext(filePath, 
jobConfig).execute();
+        long jobId1 = execute1.getJobId();
+
+        execute1.waitForJobComplete();
+
+        filePath = TestUtils.getResource("streaming_fake_to_console.conf");
+        jobConfig = new JobConfig();
+        jobConfig.setName("fake_to_console2");
+        ClientJobProxy execute2 =
+                seaTunnelClient.createExecutionContext(filePath, 
jobConfig).execute();
+        ClientJobProxy execute3 =
+                seaTunnelClient.createExecutionContext(filePath, 
jobConfig).execute();
+
+        long jobId2 = execute2.getJobId();
+        long jobId3 = execute3.getJobId();
+
+        await().atMost(30000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertTrue(
+                                        
jobClient.getJobStatus(jobId1).equals("FINISHED")
+                                                && 
jobClient.getJobStatus(jobId2).equals("RUNNING")
+                                                && jobClient
+                                                        .getJobStatus(jobId3)
+                                                        .equals("RUNNING")));
+
+        System.out.println(jobClient.getRunningJobMetrics());
+
+        await().atMost(30000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            String runningJobMetrics = 
jobClient.getRunningJobMetrics();
+                            Assertions.assertTrue(
+                                    runningJobMetrics.contains(jobId2 + "")
+                                            && 
runningJobMetrics.contains(jobId3 + ""));
+                        });
+
+        jobClient.cancelJob(jobId2);
+        jobClient.cancelJob(jobId3);
+    }
+
     @Test
     public void testCancelJob() throws ExecutionException, 
InterruptedException {
         Common.setDeployMode(DeployMode.CLIENT);
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_to_console.conf
 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_to_console.conf
new file mode 100644
index 0000000000..e8d74830fa
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fake_to_console.conf
@@ -0,0 +1,48 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  # You can set engine configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    result_table_name = "fake"
+    parallelism = 1
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+}
+
+transform {
+}
+
+sink {
+  console {
+    source_table_name="fake"
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetRunningJobMetricsCodec.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetRunningJobMetricsCodec.java
new file mode 100644
index 0000000000..44b0f89222
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/protocol/codec/SeaTunnelGetRunningJobMetricsCodec.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.protocol.codec;
+
+import com.hazelcast.client.impl.protocol.ClientMessage;
+import com.hazelcast.client.impl.protocol.Generated;
+import com.hazelcast.client.impl.protocol.codec.builtin.StringCodec;
+
+import static 
com.hazelcast.client.impl.protocol.ClientMessage.PARTITION_ID_FIELD_OFFSET;
+import static 
com.hazelcast.client.impl.protocol.ClientMessage.RESPONSE_BACKUP_ACKS_FIELD_OFFSET;
+import static 
com.hazelcast.client.impl.protocol.ClientMessage.TYPE_FIELD_OFFSET;
+import static 
com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESSAGE;
+import static 
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES;
+import static 
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES;
+import static 
com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt;
+
+/*
+ * This file is auto-generated by the Hazelcast Client Protocol Code Generator.
+ * To change this file, edit the templates or the protocol
+ * definitions on the https://github.com/hazelcast/hazelcast-client-protocol
+ * and regenerate it.
+ */
+
+/** */
+@Generated("2a54110c40297eed90df5f79bde1171d")
+public final class SeaTunnelGetRunningJobMetricsCodec {
+    // hex: 0xDE0C00
+    public static final int REQUEST_MESSAGE_TYPE = 14552064;
+    // hex: 0xDE0C01
+    public static final int RESPONSE_MESSAGE_TYPE = 14552065;
+    private static final int REQUEST_INITIAL_FRAME_SIZE =
+            PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
+    private static final int RESPONSE_INITIAL_FRAME_SIZE =
+            RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;
+
+    private SeaTunnelGetRunningJobMetricsCodec() {}
+
+    public static ClientMessage encodeRequest() {
+        ClientMessage clientMessage = ClientMessage.createForEncode();
+        clientMessage.setRetryable(true);
+        clientMessage.setOperationName("SeaTunnel.GetRunningJobMetrics");
+        ClientMessage.Frame initialFrame =
+                new ClientMessage.Frame(new byte[REQUEST_INITIAL_FRAME_SIZE], 
UNFRAGMENTED_MESSAGE);
+        encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, 
REQUEST_MESSAGE_TYPE);
+        encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
+        clientMessage.add(initialFrame);
+        return clientMessage;
+    }
+
+    public static ClientMessage encodeResponse(java.lang.String response) {
+        ClientMessage clientMessage = ClientMessage.createForEncode();
+        ClientMessage.Frame initialFrame =
+                new ClientMessage.Frame(
+                        new byte[RESPONSE_INITIAL_FRAME_SIZE], 
UNFRAGMENTED_MESSAGE);
+        encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, 
RESPONSE_MESSAGE_TYPE);
+        clientMessage.add(initialFrame);
+
+        StringCodec.encode(clientMessage, response);
+        return clientMessage;
+    }
+
+    /** */
+    public static java.lang.String decodeResponse(ClientMessage clientMessage) 
{
+        ClientMessage.ForwardFrameIterator iterator = 
clientMessage.frameIterator();
+        // empty initial frame
+        iterator.next();
+        return StringCodec.decode(iterator);
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
 
b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
index 78317d8250..f14f86d389 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
@@ -217,6 +217,22 @@ methods:
       retryable: true
       partitionIdentifier: -1
       params: []
+    response:
+      params:
+        - name: response
+          type: String
+          nullable: false
+          since: 2.0
+          doc: ''
+
+  - id: 12
+    name: getRunningJobMetrics
+    since: 2.0
+    doc: ''
+    request:
+      retryable: true
+      partitionIdentifier: -1
+      params: [ ]
     response:
       params:
         - name: response
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index bf913a4245..d2931d0c37 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -18,7 +18,9 @@
 package org.apache.seatunnel.engine.server;
 
 import org.apache.seatunnel.api.common.metrics.JobMetrics;
+import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
 import org.apache.seatunnel.common.utils.StringFormatUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.EngineConfig;
@@ -45,9 +47,12 @@ import 
org.apache.seatunnel.engine.server.metrics.SeaTunnelMetricsContext;
 import org.apache.seatunnel.engine.server.resourcemanager.ResourceManager;
 import 
org.apache.seatunnel.engine.server.resourcemanager.ResourceManagerFactory;
 import org.apache.seatunnel.engine.server.resourcemanager.resource.SlotProfile;
+import org.apache.seatunnel.engine.server.task.operation.GetMetricsOperation;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.hazelcast.cluster.Address;
+import com.hazelcast.core.HazelcastInstanceNotActiveException;
 import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.internal.services.MembershipServiceEvent;
 import com.hazelcast.logging.ILogger;
@@ -55,9 +60,12 @@ import com.hazelcast.map.IMap;
 import com.hazelcast.spi.impl.NodeEngineImpl;
 import lombok.NonNull;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -68,6 +76,9 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
+import static org.apache.seatunnel.api.common.metrics.MetricTags.JOB_ID;
+import static 
org.apache.seatunnel.engine.server.metrics.JobMetricsUtil.toJobMetricsMap;
+
 public class CoordinatorService {
     private final NodeEngineImpl nodeEngine;
     private final ILogger logger;
@@ -530,6 +541,73 @@ public class CoordinatorService {
         return jobMetricsImap != null ? jobMetricsImap.merge(jobMetrics) : 
jobMetrics;
     }
 
+    public Map<Long, JobMetrics> getRunningJobMetrics() {
+        final Set<Long> runningJobIds = runningJobMasterMap.keySet();
+
+        Set<Address> addresses = new HashSet<>();
+        ownedSlotProfilesIMap.forEach(
+                (pipelineLocation, ownedSlotProfilesIMap) -> {
+                    if (runningJobIds.contains(pipelineLocation.getJobId())) {
+                        ownedSlotProfilesIMap
+                                .values()
+                                .forEach(
+                                        ownedSlotProfile -> {
+                                            
addresses.add(ownedSlotProfile.getWorker());
+                                        });
+                    }
+                });
+
+        List<RawJobMetrics> metrics = new ArrayList<>();
+
+        addresses.forEach(
+                address -> {
+                    try {
+                        if (nodeEngine.getClusterService().getMember(address) 
!= null) {
+                            RawJobMetrics rawJobMetrics =
+                                    (RawJobMetrics)
+                                            
NodeEngineUtil.sendOperationToMemberNode(
+                                                            nodeEngine,
+                                                            new 
GetMetricsOperation(
+                                                                    dis ->
+                                                                            
(dis.tagValue(JOB_ID)
+                                                                               
             != null
+                                                                               
     && runningJobIds
+                                                                               
             .contains(
+                                                                               
                     Long
+                                                                               
                             .parseLong(
+                                                                               
                                     dis
+                                                                               
                                             .tagValue(
+                                                                               
                                                     JOB_ID))))),
+                                                            address)
+                                                    .get();
+                            metrics.add(rawJobMetrics);
+                        }
+                    }
+                    // HazelcastInstanceNotActiveException. It means that the 
node is
+                    // offline, so waiting for the taskGroup to restore can be 
successful
+                    catch (HazelcastInstanceNotActiveException e) {
+                        logger.warning(
+                                String.format(
+                                        "get metrics with exception: %s.",
+                                        ExceptionUtils.getMessage(e)));
+                    } catch (Exception e) {
+                        throw new SeaTunnelException(e.getMessage());
+                    }
+                });
+
+        Map<Long, JobMetrics> longJobMetricsMap = toJobMetricsMap(metrics);
+
+        longJobMetricsMap.forEach(
+                (jobId, jobMetrics) -> {
+                    JobMetrics jobMetricsImap = 
jobHistoryService.getJobMetrics(jobId);
+                    if (jobMetricsImap != null) {
+                        longJobMetricsMap.put(jobId, 
jobMetricsImap.merge(jobMetrics));
+                    }
+                });
+
+        return longJobMetricsMap;
+    }
+
     public JobDAGInfo getJobInfo(long jobId) {
         JobDAGInfo jobInfo = jobHistoryService.getJobDAGInfo(jobId);
         if (jobInfo != null) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java
index 722e6a5ddb..057b0dbeb5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/JobMetricsUtil.java
@@ -17,6 +17,11 @@
 
 package org.apache.seatunnel.engine.server.metrics;
 
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.SerializationFeature;
+import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
+
 import org.apache.seatunnel.api.common.metrics.JobMetrics;
 import org.apache.seatunnel.api.common.metrics.Measurement;
 import org.apache.seatunnel.api.common.metrics.MetricTags;
@@ -35,10 +40,13 @@ import java.util.Map;
 import java.util.function.UnaryOperator;
 
 import static org.apache.seatunnel.api.common.metrics.MetricTags.ADDRESS;
+import static org.apache.seatunnel.api.common.metrics.MetricTags.JOB_ID;
 import static org.apache.seatunnel.api.common.metrics.MetricTags.MEMBER;
 
 public final class JobMetricsUtil {
 
+    private static ObjectMapper OBJECTMAPPER = new ObjectMapper();
+
     private JobMetricsUtil() {}
 
     public static String 
getTaskGroupLocationFromMetricsDescriptor(MetricDescriptor descriptor) {
@@ -68,6 +76,78 @@ public final class JobMetricsUtil {
         return JobMetrics.of(consumer.metrics);
     }
 
+    public static String toJsonString(Object o) {
+        OBJECTMAPPER.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, 
false);
+        try {
+            return 
OBJECTMAPPER.writerWithDefaultPrettyPrinter().writeValueAsString(o);
+        } catch (JsonProcessingException e) {
+            ObjectNode objectNode = OBJECTMAPPER.createObjectNode();
+            objectNode.put("err", "serialize JobMetrics err");
+            return objectNode.toString();
+        }
+    }
+
+    public static Map<Long, JobMetrics> toJobMetricsMap(List<RawJobMetrics> 
rawJobMetrics) {
+        metricsConsumer consumer = new metricsConsumer();
+        for (RawJobMetrics metrics : rawJobMetrics) {
+            if (metrics.getBlob() == null) {
+                continue;
+            }
+            consumer.timestamp = metrics.getTimestamp();
+            MetricsCompressor.extractMetrics(metrics.getBlob(), consumer);
+        }
+
+        Map<Long, JobMetrics> jobMetricsMap = 
MapUtil.createHashMap(consumer.metrics.size());
+        consumer.metrics.forEach(
+                (jobId, metrics) -> {
+                    jobMetricsMap.put(jobId, JobMetrics.of(metrics));
+                });
+
+        return jobMetricsMap;
+    }
+
+    private static class metricsConsumer implements MetricConsumer {
+
+        final Map<Long, Map<String, List<Measurement>>> metrics = new 
HashMap<>();
+        long timestamp;
+
+        @Override
+        public void consumeLong(MetricDescriptor descriptor, long value) {
+
+            String jobId = descriptor.tagValue(JOB_ID);
+            if (jobId == null) {
+                return;
+            }
+            long jobIdLong = Long.parseLong(jobId);
+            metrics.computeIfAbsent(jobIdLong, k -> new HashMap<>())
+                    .computeIfAbsent(descriptor.metric(), k -> new 
ArrayList<>())
+                    .add(measurement(descriptor, value));
+        }
+
+        @Override
+        public void consumeDouble(MetricDescriptor descriptor, double value) {
+            String jobId = descriptor.tagValue(JOB_ID);
+            if (jobId == null) {
+                return;
+            }
+            long jobIdLong = Long.parseLong(jobId);
+            metrics.computeIfAbsent(jobIdLong, k -> new HashMap<>())
+                    .computeIfAbsent(descriptor.metric(), k -> new 
ArrayList<>())
+                    .add(measurement(descriptor, value));
+        }
+
+        private Measurement measurement(MetricDescriptor descriptor, Object 
value) {
+            Map<String, String> tags = 
MapUtil.createHashMap(descriptor.tagCount());
+            for (int i = 0; i < descriptor.tagCount(); i++) {
+                tags.put(descriptor.tag(i), descriptor.tagValue(i));
+            }
+            if (descriptor.discriminator() != null || 
descriptor.discriminatorValue() != null) {
+                tags.put(descriptor.discriminator(), 
descriptor.discriminatorValue());
+            }
+            return Measurement.of(descriptor.metric(), value, timestamp, tags);
+        }
+    }
+
     private static class JobMetricsConsumer implements MetricConsumer {
 
         final Map<String, List<Measurement>> metrics = new HashMap<>();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ZetaMetricsCollector.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ZetaMetricsCollector.java
new file mode 100644
index 0000000000..8a4f5401d1
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/ZetaMetricsCollector.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.metrics;
+
+import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
+
+import com.hazelcast.cluster.Member;
+import com.hazelcast.internal.metrics.MetricDescriptor;
+import com.hazelcast.internal.metrics.collectors.MetricsCollector;
+import com.hazelcast.internal.metrics.impl.MetricsCompressor;
+import com.hazelcast.logging.ILogger;
+
+import java.util.Objects;
+import java.util.function.Predicate;
+import java.util.function.UnaryOperator;
+
+public class ZetaMetricsCollector implements MetricsCollector {
+
+    private final Predicate<MetricDescriptor> metricDescriptorPredicate;
+    private final MetricsCompressor compressor;
+    private final ILogger logger;
+    private final UnaryOperator<MetricDescriptor> addPrefixFn;
+
+    public ZetaMetricsCollector(
+            Predicate<MetricDescriptor> metricDescriptorPredicate, Member 
member, ILogger logger) {
+        Objects.requireNonNull(member, "member");
+        this.logger = Objects.requireNonNull(logger, "logger");
+
+        this.metricDescriptorPredicate = metricDescriptorPredicate;
+        this.addPrefixFn = JobMetricsUtil.addMemberPrefixFn(member);
+        this.compressor = new MetricsCompressor();
+    }
+
+    @Override
+    public void collectLong(MetricDescriptor descriptor, long value) {
+        if (metricDescriptorPredicate.test(descriptor)) {
+            compressor.addLong(addPrefixFn.apply(descriptor), value);
+        }
+    }
+
+    @Override
+    public void collectDouble(MetricDescriptor descriptor, double value) {
+        if (metricDescriptorPredicate.test(descriptor)) {
+            compressor.addDouble(addPrefixFn.apply(descriptor), value);
+        }
+    }
+
+    @Override
+    public void collectException(MetricDescriptor descriptor, Exception e) {
+        if (metricDescriptorPredicate.test(descriptor)) {
+            logger.warning("Exception when rendering job metrics: " + e, e);
+        }
+    }
+
+    @Override
+    public void collectNoValue(MetricDescriptor descriptor) {}
+
+    public RawJobMetrics getMetrics() {
+        return RawJobMetrics.of(compressor.getBlobAndReset());
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
index 965816cd13..0668647381 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
@@ -30,6 +30,8 @@ import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
+import static 
org.apache.seatunnel.engine.server.metrics.JobMetricsUtil.toJsonString;
+
 public class GetJobMetricsOperation extends Operation
         implements IdentifiedDataSerializable, AllowedDuringPassiveState {
     private long jobId;
@@ -70,9 +72,10 @@ public class GetJobMetricsOperation extends Operation
         CompletableFuture<String> future =
                 CompletableFuture.supplyAsync(
                         () -> {
-                            return service.getCoordinatorService()
-                                    .getJobMetrics(jobId)
-                                    .toJsonString();
+                            return toJsonString(
+                                    service.getCoordinatorService()
+                                            .getJobMetrics(jobId)
+                                            .getMetrics());
                         },
                         getNodeEngine()
                                 .getExecutionService()
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetRunningJobMetricsOperation.java
similarity index 80%
copy from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
copy to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetRunningJobMetricsOperation.java
index 965816cd13..9277c324f3 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetJobMetricsOperation.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/operation/GetRunningJobMetricsOperation.java
@@ -30,17 +30,14 @@ import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
-public class GetJobMetricsOperation extends Operation
+import static 
org.apache.seatunnel.engine.server.metrics.JobMetricsUtil.toJsonString;
+
+public class GetRunningJobMetricsOperation extends Operation
         implements IdentifiedDataSerializable, AllowedDuringPassiveState {
-    private long jobId;
 
     private String response;
 
-    public GetJobMetricsOperation() {}
-
-    public GetJobMetricsOperation(long jobId) {
-        this.jobId = jobId;
-    }
+    public GetRunningJobMetricsOperation() {}
 
     @Override
     public final int getFactoryId() {
@@ -49,19 +46,17 @@ public class GetJobMetricsOperation extends Operation
 
     @Override
     public int getClassId() {
-        return 
ClientToServerOperationDataSerializerHook.GET_JOB_METRICS_OPERATOR;
+        return 
ClientToServerOperationDataSerializerHook.GET_RUNNING_JOB_METRICS_OPERATOR;
     }
 
     @Override
     protected void writeInternal(ObjectDataOutput out) throws IOException {
         super.writeInternal(out);
-        out.writeLong(jobId);
     }
 
     @Override
     protected void readInternal(ObjectDataInput in) throws IOException {
         super.readInternal(in);
-        jobId = in.readLong();
     }
 
     @Override
@@ -70,13 +65,12 @@ public class GetJobMetricsOperation extends Operation
         CompletableFuture<String> future =
                 CompletableFuture.supplyAsync(
                         () -> {
-                            return service.getCoordinatorService()
-                                    .getJobMetrics(jobId)
-                                    .toJsonString();
+                            return toJsonString(
+                                    
service.getCoordinatorService().getRunningJobMetrics());
                         },
                         getNodeEngine()
                                 .getExecutionService()
-                                .getExecutor("get_job_metrics_operation"));
+                                
.getExecutor("get_running_job_metrics_operation"));
 
         try {
             response = future.get();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java
index d083a6a3aa..ea3e834f66 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java
@@ -43,7 +43,7 @@ public class GetJobMetricsTask extends 
AbstractSeaTunnelMessageTask<Long, String
 
     @Override
     public String getMethodName() {
-        return "getJobStatus";
+        return "getJobMetrics";
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetRunningJobMetricsTask.java
similarity index 74%
copy from 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java
copy to 
seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetRunningJobMetricsTask.java
index d083a6a3aa..2c13e253eb 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetJobMetricsTask.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/GetRunningJobMetricsTask.java
@@ -17,33 +17,34 @@
 
 package org.apache.seatunnel.engine.server.protocol.task;
 
-import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec;
-import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
+import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetRunningJobMetricsCodec;
+import 
org.apache.seatunnel.engine.server.operation.GetRunningJobMetricsOperation;
 
 import com.hazelcast.client.impl.protocol.ClientMessage;
 import com.hazelcast.instance.impl.Node;
 import com.hazelcast.internal.nio.Connection;
 import com.hazelcast.spi.impl.operationservice.Operation;
 
-public class GetJobMetricsTask extends AbstractSeaTunnelMessageTask<Long, 
String> {
+public class GetRunningJobMetricsTask extends 
AbstractSeaTunnelMessageTask<Void, String> {
 
-    protected GetJobMetricsTask(ClientMessage clientMessage, Node node, 
Connection connection) {
+    protected GetRunningJobMetricsTask(
+            ClientMessage clientMessage, Node node, Connection connection) {
         super(
                 clientMessage,
                 node,
                 connection,
-                SeaTunnelGetJobMetricsCodec::decodeRequest,
-                SeaTunnelGetJobMetricsCodec::encodeResponse);
+                m -> null,
+                SeaTunnelGetRunningJobMetricsCodec::encodeResponse);
     }
 
     @Override
     protected Operation prepareOperation() {
-        return new GetJobMetricsOperation(parameters);
+        return new GetRunningJobMetricsOperation();
     }
 
     @Override
     public String getMethodName() {
-        return "getJobStatus";
+        return "getRunningJobMetrics";
     }
 
     @Override
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
index 6d19ada03f..f3c88b9ebc 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/protocol/task/SeaTunnelMessageTaskFactoryProvider.java
@@ -23,6 +23,7 @@ import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobDetailStat
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobInfoCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobMetricsCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCodec;
+import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetRunningJobMetricsCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelSavePointJobCodec;
@@ -92,5 +93,9 @@ public class SeaTunnelMessageTaskFactoryProvider implements 
MessageTaskFactoryPr
                 SeaTunnelGetClusterHealthMetricsCodec.REQUEST_MESSAGE_TYPE,
                 (clientMessage, connection) ->
                         new GetClusterHealthMetricsTask(clientMessage, node, 
connection));
+        factories.put(
+                SeaTunnelGetRunningJobMetricsCodec.REQUEST_MESSAGE_TYPE,
+                (clientMessage, connection) ->
+                        new GetRunningJobMetricsTask(clientMessage, node, 
connection));
     }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
index 4cfef1cd59..188e4fe065 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/ClientToServerOperationDataSerializerHook.java
@@ -24,6 +24,7 @@ import 
org.apache.seatunnel.engine.server.operation.GetJobDetailStatusOperation;
 import org.apache.seatunnel.engine.server.operation.GetJobInfoOperation;
 import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
 import org.apache.seatunnel.engine.server.operation.GetJobStatusOperation;
+import 
org.apache.seatunnel.engine.server.operation.GetRunningJobMetricsOperation;
 import org.apache.seatunnel.engine.server.operation.PrintMessageOperation;
 import org.apache.seatunnel.engine.server.operation.SavePointJobOperation;
 import org.apache.seatunnel.engine.server.operation.SubmitJobOperation;
@@ -60,6 +61,8 @@ public final class ClientToServerOperationDataSerializerHook 
implements DataSeri
 
     public static final int GET_CLUSTER_HEALTH_METRICS = 9;
 
+    public static final int GET_RUNNING_JOB_METRICS_OPERATOR = 10;
+
     public static final int FACTORY_ID =
             FactoryIdHelper.getFactoryId(
                     
SeaTunnelFactoryIdConstant.SEATUNNEL_OPERATION_DATA_SERIALIZER_FACTORY,
@@ -100,6 +103,8 @@ public final class 
ClientToServerOperationDataSerializerHook implements DataSeri
                     return new SavePointJobOperation();
                 case GET_CLUSTER_HEALTH_METRICS:
                     return new GetClusterHealthMetricsOperation();
+                case GET_RUNNING_JOB_METRICS_OPERATOR:
+                    return new GetRunningJobMetricsOperation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + 
typeId);
             }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index 90d543cee3..c5b9baeb0d 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -25,6 +25,7 @@ import 
org.apache.seatunnel.engine.server.task.operation.CancelTaskOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.CheckTaskGroupIsExecutingOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.CleanTaskGroupContextOperation;
 import org.apache.seatunnel.engine.server.task.operation.DeployTaskOperation;
+import org.apache.seatunnel.engine.server.task.operation.GetMetricsOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.GetTaskGroupMetricsOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.NotifyTaskStatusOperation;
@@ -89,6 +90,8 @@ public class TaskDataSerializerHook implements 
DataSerializerHook {
 
     public static final int CHECK_TASKGROUP_IS_EXECUTING = 21;
 
+    public static final int GET_METRICS_OPERATION = 22;
+
     public static final int FACTORY_ID =
             FactoryIdHelper.getFactoryId(
                     
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
@@ -151,6 +154,8 @@ public class TaskDataSerializerHook implements 
DataSerializerHook {
                     return new SourceReaderEventOperation();
                 case CHECK_TASKGROUP_IS_EXECUTING:
                     return new CheckTaskGroupIsExecutingOperation();
+                case GET_METRICS_OPERATION:
+                    return new GetMetricsOperation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + 
typeId);
             }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetMetricsOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetMetricsOperation.java
new file mode 100644
index 0000000000..15003a641d
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/operation/GetMetricsOperation.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.task.operation;
+
+import org.apache.seatunnel.api.common.metrics.RawJobMetrics;
+import org.apache.seatunnel.engine.server.metrics.ZetaMetricsCollector;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
+
+import com.hazelcast.cluster.Address;
+import com.hazelcast.internal.metrics.MetricDescriptor;
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.nio.ObjectDataInput;
+import com.hazelcast.nio.ObjectDataOutput;
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+import com.hazelcast.spi.impl.NodeEngineImpl;
+import com.hazelcast.spi.impl.operationservice.Operation;
+
+import java.io.IOException;
+import java.util.function.Predicate;
+
+public class GetMetricsOperation extends Operation implements 
IdentifiedDataSerializable {
+
+    private Predicate<MetricDescriptor> metricDescriptorPredicate;
+    private RawJobMetrics response;
+
+    public GetMetricsOperation() {}
+
+    public GetMetricsOperation(Predicate<MetricDescriptor> 
metricDescriptorPredicate) {
+        this.metricDescriptorPredicate = metricDescriptorPredicate;
+    }
+
+    @Override
+    public void run() {
+        ILogger logger = getLogger();
+
+        Address callerAddress = getCallerAddress();
+
+        NodeEngineImpl nodeEngine = (NodeEngineImpl) getNodeEngine();
+        Address masterAddress = getNodeEngine().getMasterAddress();
+        if (!callerAddress.equals(masterAddress)) {
+            throw new IllegalStateException(
+                    "Caller "
+                            + callerAddress
+                            + " cannot get metrics"
+                            + " because it is not master. Master is: "
+                            + masterAddress);
+        }
+
+        ZetaMetricsCollector metricsRenderer =
+                new ZetaMetricsCollector(
+                        metricDescriptorPredicate, 
nodeEngine.getLocalMember(), logger);
+        nodeEngine.getMetricsRegistry().collect(metricsRenderer);
+        response = metricsRenderer.getMetrics();
+    }
+
+    @Override
+    protected void writeInternal(ObjectDataOutput out) throws IOException {
+        super.writeInternal(out);
+        out.writeObject(metricDescriptorPredicate);
+    }
+
+    @Override
+    protected void readInternal(ObjectDataInput in) throws IOException {
+        super.readInternal(in);
+        this.metricDescriptorPredicate = in.readObject();
+    }
+
+    @Override
+    public Object getResponse() {
+        return response;
+    }
+
+    @Override
+    public int getFactoryId() {
+        return TaskDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return TaskDataSerializerHook.GET_METRICS_OPERATION;
+    }
+}

Reply via email to