This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 16c5065b2f [Feature][Zeta] Added other metrics info of multi-table 
(#7338)
16c5065b2f is described below

commit 16c5065b2fe63bbf6676c33a0dd61416b25d7154
Author: corgy-w <[email protected]>
AuthorDate: Wed Aug 14 17:25:49 2024 +0800

    [Feature][Zeta] Added other metrics info of multi-table (#7338)
    
    * [Feature][Zeta] Added other metrics info of multi-table
    
    * [Feature][Zeta] Added other metrics restApi
    
    * [Feature][Zeta] Added other metrics restApi test
    
    * [Feature][Zeta] update rest-api doc
    
    * [Improve][Connector-V2] optimize code
    
    * [Improve][Connector-V2] optimize code
    
    ---------
    
    Co-authored-by: wangchao <[email protected]>
---
 docs/en/seatunnel-engine/rest-api.md               |  18 +-
 docs/zh/seatunnel-engine/rest-api.md               |  18 +-
 .../seatunnel/engine/e2e/MultiTableMetricsIT.java  | 101 ++++++++--
 .../batch_fake_multi_table_to_console.conf         | 102 ++++++++--
 .../engine/client/SeaTunnelClientTest.java         |  50 ++++-
 .../server/metrics/TaskMetricsCalcContext.java     | 211 +++++++++++++++++++
 .../server/rest/RestHttpGetCommandProcessor.java   | 223 +++++++++++++++++----
 .../server/task/SeaTunnelSourceCollector.java      |  60 +-----
 .../engine/server/task/flow/SinkFlowLifeCycle.java |  62 +-----
 9 files changed, 657 insertions(+), 188 deletions(-)

diff --git a/docs/en/seatunnel-engine/rest-api.md 
b/docs/en/seatunnel-engine/rest-api.md
index 99bba92dae..eb5eacfed1 100644
--- a/docs/en/seatunnel-engine/rest-api.md
+++ b/docs/en/seatunnel-engine/rest-api.md
@@ -192,8 +192,22 @@ This API has been deprecated, please use 
/hazelcast/rest/maps/job-info/:jobId in
     ]
   },
   "metrics": {
-    "sourceReceivedCount": "",
-    "sinkWriteCount": ""
+    "SourceReceivedCount": "",
+    "SourceReceivedQPS": "",
+    "SourceReceivedBytes": "",
+    "SourceReceivedBytesPerSeconds": "",
+    "SinkWriteCount": "",
+    "SinkWriteQPS": "",
+    "SinkWriteBytes": "",
+    "SinkWriteBytesPerSeconds": "",
+    "TableSourceReceivedCount": {},
+    "TableSourceReceivedBytes": {},
+    "TableSourceReceivedBytesPerSeconds": {},
+    "TableSourceReceivedQPS": {},
+    "TableSinkWriteCount": {},
+    "TableSinkWriteQPS": {},
+    "TableSinkWriteBytes": {},
+    "TableSinkWriteBytesPerSeconds": {}
   },
   "finishedTime": "",
   "errorMsg": null,
diff --git a/docs/zh/seatunnel-engine/rest-api.md 
b/docs/zh/seatunnel-engine/rest-api.md
index 1b0166425b..d38ad61268 100644
--- a/docs/zh/seatunnel-engine/rest-api.md
+++ b/docs/zh/seatunnel-engine/rest-api.md
@@ -134,8 +134,22 @@ network:
     ]
   },
   "metrics": {
-    "sourceReceivedCount": "",
-    "sinkWriteCount": ""
+    "SourceReceivedCount": "",
+    "SourceReceivedQPS": "",
+    "SourceReceivedBytes": "",
+    "SourceReceivedBytesPerSeconds": "",
+    "SinkWriteCount": "",
+    "SinkWriteQPS": "",
+    "SinkWriteBytes": "",
+    "SinkWriteBytesPerSeconds": "",
+    "TableSourceReceivedCount": {},
+    "TableSourceReceivedBytes": {},
+    "TableSourceReceivedBytesPerSeconds": {},
+    "TableSourceReceivedQPS": {},
+    "TableSinkWriteCount": {},
+    "TableSinkWriteQPS": {},
+    "TableSinkWriteBytes": {},
+    "TableSinkWriteBytesPerSeconds": {}
   },
   "finishedTime": "",
   "errorMsg": null,
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
index 59942eb4cc..61df054d07 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/MultiTableMetricsIT.java
@@ -35,6 +35,7 @@ import org.junit.jupiter.api.Test;
 
 import com.hazelcast.client.config.ClientConfig;
 import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import io.restassured.response.Response;
 
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
@@ -82,33 +83,101 @@ public class MultiTableMetricsIT {
         Collections.singletonList(node1)
                 .forEach(
                         instance -> {
-                            given().get(
-                                            HOST
-                                                    + instance.getCluster()
-                                                            .getLocalMember()
-                                                            .getAddress()
-                                                            .getPort()
-                                                    + RestConstant.JOB_INFO_URL
-                                                    + "/"
-                                                    + batchJobProxy.getJobId())
-                                    .then()
+                            Response response =
+                                    given().get(
+                                                    HOST
+                                                            + 
instance.getCluster()
+                                                                    
.getLocalMember()
+                                                                    
.getAddress()
+                                                                    .getPort()
+                                                            + 
RestConstant.JOB_INFO_URL
+                                                            + "/"
+                                                            + 
batchJobProxy.getJobId());
+                            // In the test example, the data size of a single 
[3, "C", 100] is 13
+                            int dataSize = 13;
+                            response.prettyPrint();
+                            response.then()
                                     .statusCode(200)
                                     .body("jobName", 
equalTo("batch_fake_multi_table_to_console"))
                                     .body("jobStatus", equalTo("FINISHED"))
-                                    .body("metrics.SourceReceivedCount", 
equalTo("50"))
-                                    .body("metrics.SinkWriteCount", 
equalTo("50"))
+                                    .body("metrics.SourceReceivedCount", 
equalTo("15"))
+                                    .body("metrics.SinkWriteCount", 
equalTo("15"))
                                     .body(
                                             
"metrics.TableSourceReceivedCount.'fake.table1'",
-                                            equalTo("20"))
+                                            equalTo("10"))
                                     .body(
                                             
"metrics.TableSourceReceivedCount.'fake.public.table2'",
-                                            equalTo("30"))
+                                            equalTo("5"))
                                     .body(
                                             
"metrics.TableSinkWriteCount.'fake.table1'",
-                                            equalTo("20"))
+                                            equalTo("10"))
                                     .body(
                                             
"metrics.TableSinkWriteCount.'fake.public.table2'",
-                                            equalTo("30"));
+                                            equalTo("5"))
+                                    .body(
+                                            "metrics.SourceReceivedBytes",
+                                            equalTo(String.valueOf(dataSize * 
15)))
+                                    .body(
+                                            "metrics.SinkWriteBytes",
+                                            equalTo(String.valueOf(dataSize * 
15)))
+                                    .body(
+                                            
"metrics.TableSourceReceivedBytes.'fake.table1'",
+                                            equalTo(String.valueOf(dataSize * 
10)))
+                                    .body(
+                                            
"metrics.TableSourceReceivedBytes.'fake.public.table2'",
+                                            equalTo(String.valueOf(dataSize * 
5)))
+                                    .body(
+                                            
"metrics.TableSinkWriteBytes.'fake.table1'",
+                                            equalTo(String.valueOf(dataSize * 
10)))
+                                    .body(
+                                            
"metrics.TableSinkWriteBytes.'fake.public.table2'",
+                                            equalTo(String.valueOf(dataSize * 
5)));
+                            Assertions.assertTrue(
+                                    
Double.parseDouble(response.path("metrics.SourceReceivedQPS"))
+                                                    > 0
+                                            && Double.parseDouble(
+                                                            response.path(
+                                                                    
"metrics.TableSourceReceivedQPS.'fake.table1'"))
+                                                    > 0
+                                            && Double.parseDouble(
+                                                            response.path(
+                                                                    
"metrics.TableSourceReceivedQPS.'fake.public.table2'"))
+                                                    > 0
+                                            && Double.parseDouble(
+                                                            
response.path("metrics.SinkWriteQPS"))
+                                                    > 0
+                                            && Double.parseDouble(
+                                                            response.path(
+                                                                    
"metrics.TableSinkWriteQPS.'fake.table1'"))
+                                                    > 0
+                                            && Double.parseDouble(
+                                                            response.path(
+                                                                    
"metrics.TableSinkWriteQPS.'fake.public.table2'"))
+                                                    > 0
+                                            && Double.parseDouble(
+                                                            response.path(
+                                                                    
"metrics.SourceReceivedBytesPerSeconds"))
+                                                    > 0
+                                            && Double.parseDouble(
+                                                            response.path(
+                                                                    
"metrics.TableSourceReceivedBytesPerSeconds.'fake.table1'"))
+                                                    > 0
+                                            && Double.parseDouble(
+                                                            response.path(
+                                                                    
"metrics.TableSourceReceivedBytesPerSeconds.'fake.public.table2'"))
+                                                    > 0
+                                            && Double.parseDouble(
+                                                            response.path(
+                                                                    
"metrics.SinkWriteBytesPerSeconds"))
+                                                    > 0
+                                            && Double.parseDouble(
+                                                            response.path(
+                                                                    
"metrics.TableSinkWriteBytesPerSeconds.'fake.table1'"))
+                                                    > 0
+                                            && Double.parseDouble(
+                                                            response.path(
+                                                                    
"metrics.TableSinkWriteBytesPerSeconds.'fake.public.table2'"))
+                                                    > 0);
                         });
     }
 
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf
index c51929a0ed..7459cc150e 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fake_multi_table_to_console.conf
@@ -26,29 +26,93 @@ env {
 source {
   # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
   FakeSource {
-    result_table_name = "fake1"
-    row.num = 20
-    schema = {
-      table = "fake.table1"
-      fields {
-        name = "string"
-        age = "int"
+      result_table_name = "fake1"
+      schema = {
+        table = "fake.table1"
+        fields {
+          id = bigint
+          name = string
+          score = int
+        }
       }
+      rows = [
+        {
+          kind = INSERT
+          fields = [1, "A", 100]
+        },
+        {
+          kind = INSERT
+          fields = [2, "B", 100]
+        },
+        {
+          kind = INSERT
+          fields = [3, "C", 100]
+        },
+        {
+          kind = INSERT
+          fields = [3, "C", 100]
+        },
+        {
+          kind = INSERT
+          fields = [3, "C", 100]
+        },
+        {
+          kind = INSERT
+          fields = [3, "C", 100]
+        }
+        {
+          kind = UPDATE_BEFORE
+          fields = [1, "A", 100]
+        },
+        {
+          kind = UPDATE_AFTER
+          fields = [1, "A", 300]
+        },
+        {
+          kind = DELETE
+          fields = [2, "B", 100]
+        },
+                 {
+                   kind = INSERT
+                   fields = [2, "B", 100]
+                 }
+      ]
     }
-  }
 
-  FakeSource {
-    result_table_name = "fake2"
-    row.num = 30
-    schema = {
-      table = "fake.public.table2"
-      fields {
-        name = "string"
-        age = "int"
-        sex = "int"
+    FakeSource {
+        result_table_name = "fake2"
+        schema = {
+          table = "fake.public.table2"
+          fields {
+            id = bigint
+            name = string
+            score = int
+          }
+        }
+        rows = [
+          {
+            kind = INSERT
+            fields = [1, "A", 100]
+          },
+          {
+            kind = INSERT
+            fields = [2, "B", 100]
+          },
+          {
+            kind = DELETE
+            fields = [2, "B", 100]
+          },
+          {
+            kind = INSERT
+            fields = [3, "C", 100]
+          },
+          {
+            kind = INSERT
+            fields = [3, "C", 100]
+          }
+        ]
       }
-    }
-  }
+
 }
 
 transform {
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
index 100aa0b320..a8275a13b7 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/test/java/org/apache/seatunnel/engine/client/SeaTunnelClientTest.java
@@ -62,8 +62,12 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES_PER_SECONDS;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;
 import static org.awaitility.Awaitility.await;
@@ -592,6 +596,23 @@ public class SeaTunnelClientTest {
                     jobMetrics.contains(SOURCE_RECEIVED_COUNT + 
"#fake.public.table2"));
             Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + 
"#fake.table1"));
             Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_COUNT + 
"#fake.public.table2"));
+            Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_BYTES + 
"#fake.table1"));
+            Assertions.assertTrue(
+                    jobMetrics.contains(SOURCE_RECEIVED_BYTES + 
"#fake.public.table2"));
+            Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_BYTES + 
"#fake.table1"));
+            Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_BYTES + 
"#fake.public.table2"));
+            Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_QPS + 
"#fake.table1"));
+            Assertions.assertTrue(jobMetrics.contains(SOURCE_RECEIVED_QPS + 
"#fake.public.table2"));
+            Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_QPS + 
"#fake.table1"));
+            Assertions.assertTrue(jobMetrics.contains(SINK_WRITE_QPS + 
"#fake.public.table2"));
+            Assertions.assertTrue(
+                    jobMetrics.contains(SOURCE_RECEIVED_BYTES_PER_SECONDS + 
"#fake.table1"));
+            Assertions.assertTrue(
+                    jobMetrics.contains(SOURCE_RECEIVED_BYTES_PER_SECONDS + 
"#fake.public.table2"));
+            Assertions.assertTrue(
+                    jobMetrics.contains(SINK_WRITE_BYTES_PER_SECONDS + 
"#fake.table1"));
+            Assertions.assertTrue(
+                    jobMetrics.contains(SINK_WRITE_BYTES_PER_SECONDS + 
"#fake.public.table2"));
 
             log.info("jobMetrics : {}", jobMetrics);
             JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics);
@@ -600,10 +621,6 @@ public class SeaTunnelClientTest {
                                     Spliterators.spliteratorUnknownSize(
                                             jobMetricsStr.fieldNames(), 0),
                                     false)
-                            .filter(
-                                    metricName ->
-                                            
metricName.startsWith(SOURCE_RECEIVED_COUNT)
-                                                    || 
metricName.startsWith(SINK_WRITE_COUNT))
                             .collect(Collectors.toList());
 
             Map<String, Long> totalCount =
@@ -654,6 +671,31 @@ public class SeaTunnelClientTest {
                             .filter(e -> 
e.getKey().startsWith(SINK_WRITE_COUNT))
                             .mapToLong(Map.Entry::getValue)
                             .sum());
+            Assertions.assertEquals(
+                    totalCount.get(SOURCE_RECEIVED_BYTES),
+                    tableCount.entrySet().stream()
+                            .filter(e -> 
e.getKey().startsWith(SOURCE_RECEIVED_BYTES + "#"))
+                            .mapToLong(Map.Entry::getValue)
+                            .sum());
+            Assertions.assertEquals(
+                    totalCount.get(SINK_WRITE_BYTES),
+                    tableCount.entrySet().stream()
+                            .filter(e -> 
e.getKey().startsWith(SINK_WRITE_BYTES + "#"))
+                            .mapToLong(Map.Entry::getValue)
+                            .sum());
+            // Instantaneous rates in the same direction are directly added
+            Assertions.assertEquals(
+                    totalCount.get(SOURCE_RECEIVED_QPS),
+                    tableCount.entrySet().stream()
+                            .filter(e -> 
e.getKey().startsWith(SOURCE_RECEIVED_QPS + "#"))
+                            .mapToLong(Map.Entry::getValue)
+                            .sum());
+            Assertions.assertEquals(
+                    totalCount.get(SINK_WRITE_QPS),
+                    tableCount.entrySet().stream()
+                            .filter(e -> e.getKey().startsWith(SINK_WRITE_QPS 
+ "#"))
+                            .mapToLong(Map.Entry::getValue)
+                            .sum());
 
         } catch (ExecutionException | InterruptedException | 
JsonProcessingException e) {
             throw new RuntimeException(e);
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java
new file mode 100644
index 0000000000..eab9ecbd34
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.seatunnel.engine.server.metrics;
+
+import org.apache.seatunnel.api.common.metrics.Counter;
+import org.apache.seatunnel.api.common.metrics.Meter;
+import org.apache.seatunnel.api.common.metrics.MetricsContext;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.PluginType;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES_PER_SECONDS;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;
+
+public class TaskMetricsCalcContext {
+
+    private final MetricsContext metricsContext;
+
+    private final PluginType type;
+
+    private Counter count;
+
+    private Map<String, Counter> countPerTable = new ConcurrentHashMap<>();
+
+    private Meter QPS;
+
+    private Map<String, Meter> QPSPerTable = new ConcurrentHashMap<>();
+
+    private Counter bytes;
+
+    private Map<String, Counter> bytesPerTable = new ConcurrentHashMap<>();
+
+    private Meter bytesPerSeconds;
+
+    private Map<String, Meter> bytesPerSecondsPerTable = new 
ConcurrentHashMap<>();
+
+    public TaskMetricsCalcContext(
+            MetricsContext metricsContext,
+            PluginType type,
+            boolean isMulti,
+            List<TablePath> tables) {
+        this.metricsContext = metricsContext;
+        this.type = type;
+        initializeMetrics(isMulti, tables);
+    }
+
+    private void initializeMetrics(boolean isMulti, List<TablePath> tables) {
+        if (type.equals(PluginType.SINK)) {
+            this.initializeMetrics(
+                    isMulti,
+                    tables,
+                    SINK_WRITE_COUNT,
+                    SINK_WRITE_QPS,
+                    SINK_WRITE_BYTES,
+                    SINK_WRITE_BYTES_PER_SECONDS);
+        } else if (type.equals(PluginType.SOURCE)) {
+            this.initializeMetrics(
+                    isMulti,
+                    tables,
+                    SOURCE_RECEIVED_COUNT,
+                    SOURCE_RECEIVED_QPS,
+                    SOURCE_RECEIVED_BYTES,
+                    SOURCE_RECEIVED_BYTES_PER_SECONDS);
+        }
+    }
+
+    private void initializeMetrics(
+            boolean isMulti,
+            List<TablePath> tables,
+            String countName,
+            String qpsName,
+            String bytesName,
+            String bytesPerSecondsName) {
+        count = metricsContext.counter(countName);
+        QPS = metricsContext.meter(qpsName);
+        bytes = metricsContext.counter(bytesName);
+        bytesPerSeconds = metricsContext.meter(bytesPerSecondsName);
+        if (isMulti) {
+            tables.forEach(
+                    tablePath -> {
+                        countPerTable.put(
+                                tablePath.getFullName(),
+                                metricsContext.counter(countName + "#" + 
tablePath.getFullName()));
+                        QPSPerTable.put(
+                                tablePath.getFullName(),
+                                metricsContext.meter(qpsName + "#" + 
tablePath.getFullName()));
+                        bytesPerTable.put(
+                                tablePath.getFullName(),
+                                metricsContext.counter(bytesName + "#" + 
tablePath.getFullName()));
+                        bytesPerSecondsPerTable.put(
+                                tablePath.getFullName(),
+                                metricsContext.meter(
+                                        bytesPerSecondsName + "#" + 
tablePath.getFullName()));
+                    });
+        }
+    }
+
+    public void updateMetrics(Object data) {
+        count.inc();
+        QPS.markEvent();
+        if (data instanceof SeaTunnelRow) {
+            SeaTunnelRow row = (SeaTunnelRow) data;
+            bytes.inc(row.getBytesSize());
+            bytesPerSeconds.markEvent(row.getBytesSize());
+            String tableId = row.getTableId();
+
+            if (StringUtils.isNotBlank(tableId)) {
+                String tableName = TablePath.of(tableId).getFullName();
+
+                // Processing count
+                processMetrics(
+                        countPerTable,
+                        Counter.class,
+                        tableName,
+                        SINK_WRITE_COUNT,
+                        SOURCE_RECEIVED_COUNT,
+                        Counter::inc);
+
+                // Processing bytes
+                processMetrics(
+                        bytesPerTable,
+                        Counter.class,
+                        tableName,
+                        SINK_WRITE_BYTES,
+                        SOURCE_RECEIVED_BYTES,
+                        counter -> counter.inc(row.getBytesSize()));
+
+                // Processing QPS
+                processMetrics(
+                        QPSPerTable,
+                        Meter.class,
+                        tableName,
+                        SINK_WRITE_QPS,
+                        SOURCE_RECEIVED_QPS,
+                        Meter::markEvent);
+
+                // Processing bytes rate
+                processMetrics(
+                        bytesPerSecondsPerTable,
+                        Meter.class,
+                        tableName,
+                        SINK_WRITE_BYTES_PER_SECONDS,
+                        SOURCE_RECEIVED_BYTES_PER_SECONDS,
+                        meter -> meter.markEvent(row.getBytesSize()));
+            }
+        }
+    }
+
+    private <T> void processMetrics(
+            Map<String, T> metricMap,
+            Class<T> cls,
+            String tableName,
+            String sinkMetric,
+            String sourceMetric,
+            MetricProcessor<T> processor) {
+        T metric = metricMap.get(tableName);
+        if (Objects.nonNull(metric)) {
+            processor.process(metric);
+        } else {
+            String metricName =
+                    PluginType.SINK.equals(type)
+                            ? sinkMetric + "#" + tableName
+                            : sourceMetric + "#" + tableName;
+            T newMetric = createMetric(metricsContext, metricName, cls);
+            processor.process(newMetric);
+            metricMap.put(tableName, newMetric);
+        }
+    }
+
+    private <T> T createMetric(
+            MetricsContext metricsContext, String metricName, Class<T> 
metricClass) {
+        if (metricClass == Counter.class) {
+            return metricClass.cast(metricsContext.counter(metricName));
+        } else if (metricClass == Meter.class) {
+            return metricClass.cast(metricsContext.meter(metricName));
+        }
+        throw new IllegalArgumentException("Unsupported metric class: " + 
metricClass.getName());
+    }
+
+    @FunctionalInterface
+    interface MetricProcessor<T> {
+        void process(T t);
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index d5d60b7cbb..fec77708b6 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -44,6 +44,8 @@ import 
org.apache.seatunnel.engine.server.resourcemanager.opeartion.GetOverviewO
 import 
org.apache.seatunnel.engine.server.resourcemanager.resource.OverviewInfo;
 import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
+import org.apache.commons.lang3.ArrayUtils;
+
 import com.hazelcast.cluster.Address;
 import com.hazelcast.cluster.Cluster;
 import com.hazelcast.cluster.Member;
@@ -65,12 +67,20 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.Spliterators;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
 import static com.hazelcast.internal.ascii.rest.HttpStatusCode.SC_500;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES_PER_SECONDS;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
+import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.FINISHED_JOBS_INFO;
 import static 
org.apache.seatunnel.engine.server.rest.RestConstant.JOB_INFO_URL;
 import static org.apache.seatunnel.engine.server.rest.RestConstant.OVERVIEW;
@@ -81,14 +91,22 @@ import static 
org.apache.seatunnel.engine.server.rest.RestConstant.SYSTEM_MONITO
 
 public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCommand> {
 
-    private static final String SOURCE_RECEIVED_COUNT = "SourceReceivedCount";
     private static final String TABLE_SOURCE_RECEIVED_COUNT = 
"TableSourceReceivedCount";
-    private static final String SINK_WRITE_COUNT = "SinkWriteCount";
     private static final String TABLE_SINK_WRITE_COUNT = "TableSinkWriteCount";
+    private static final String TABLE_SOURCE_RECEIVED_QPS = 
"TableSourceReceivedQPS";
+    private static final String TABLE_SINK_WRITE_QPS = "TableSinkWriteQPS";
+    private static final String TABLE_SOURCE_RECEIVED_BYTES = 
"TableSourceReceivedBytes";
+    private static final String TABLE_SINK_WRITE_BYTES = "TableSinkWriteBytes";
+    private static final String TABLE_SOURCE_RECEIVED_BYTES_PER_SECONDS =
+            "TableSourceReceivedBytesPerSeconds";
+    private static final String TABLE_SINK_WRITE_BYTES_PER_SECONDS =
+            "TableSinkWriteBytesPerSeconds";
+
     private final Log4j2HttpGetCommandProcessor original;
     private NodeEngine nodeEngine;
 
     public RestHttpGetCommandProcessor(TextCommandService textCommandService) {
+
         this(textCommandService, new 
Log4j2HttpGetCommandProcessor(textCommandService));
     }
 
@@ -369,43 +387,165 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
 
     private Map<String, Object> getJobMetrics(String jobMetrics) {
         Map<String, Object> metricsMap = new HashMap<>();
-        long sourceReadCount = 0L;
-        long sinkWriteCount = 0L;
-        Map<String, JsonNode> tableSourceReceivedCountMap = new HashMap<>();
-        Map<String, JsonNode> tableSinkWriteCountMap = new HashMap<>();
+        // To add metrics, populate the corresponding array,
+        String[] countMetricsNames = {
+            SOURCE_RECEIVED_COUNT, SINK_WRITE_COUNT, SOURCE_RECEIVED_BYTES, 
SINK_WRITE_BYTES
+        };
+        String[] rateMetricsNames = {
+            SOURCE_RECEIVED_QPS,
+            SINK_WRITE_QPS,
+            SOURCE_RECEIVED_BYTES_PER_SECONDS,
+            SINK_WRITE_BYTES_PER_SECONDS
+        };
+        String[] tableCountMetricsNames = {
+            TABLE_SOURCE_RECEIVED_COUNT,
+            TABLE_SINK_WRITE_COUNT,
+            TABLE_SOURCE_RECEIVED_BYTES,
+            TABLE_SINK_WRITE_BYTES
+        };
+        String[] tableRateMetricsNames = {
+            TABLE_SOURCE_RECEIVED_QPS,
+            TABLE_SINK_WRITE_QPS,
+            TABLE_SOURCE_RECEIVED_BYTES_PER_SECONDS,
+            TABLE_SINK_WRITE_BYTES_PER_SECONDS
+        };
+        Long[] metricsSums =
+                Stream.generate(() -> 
0L).limit(countMetricsNames.length).toArray(Long[]::new);
+        Double[] metricsRates =
+                Stream.generate(() -> 
0D).limit(rateMetricsNames.length).toArray(Double[]::new);
+
+        // Used to store various indicators at the table
+        Map<String, JsonNode>[] tableMetricsMaps =
+                new Map[] {
+                    new HashMap<>(), // Source Received Count
+                    new HashMap<>(), // Sink Write Count
+                    new HashMap<>(), // Source Received Bytes
+                    new HashMap<>(), // Sink Write Bytes
+                    new HashMap<>(), // Source Received QPS
+                    new HashMap<>(), // Sink Write QPS
+                    new HashMap<>(), // Source Received Bytes Per Second
+                    new HashMap<>() // Sink Write Bytes Per Second
+                };
+
         try {
             JsonNode jobMetricsStr = new ObjectMapper().readTree(jobMetrics);
-            StreamSupport.stream(
-                            
Spliterators.spliteratorUnknownSize(jobMetricsStr.fieldNames(), 0),
-                            false)
-                    .filter(metricName -> metricName.contains("#"))
-                    .forEach(
+
+            jobMetricsStr
+                    .fieldNames()
+                    .forEachRemaining(
                             metricName -> {
-                                String tableName =
-                                        
TablePath.of(metricName.split("#")[1]).getFullName();
-                                if 
(metricName.startsWith(SOURCE_RECEIVED_COUNT)) {
-                                    tableSourceReceivedCountMap.put(
-                                            tableName, 
jobMetricsStr.get(metricName));
-                                }
-                                if 
(metricName.startsWith(SOURCE_RECEIVED_COUNT)) {
-                                    tableSinkWriteCountMap.put(
-                                            tableName, 
jobMetricsStr.get(metricName));
+                                if (metricName.contains("#")) {
+                                    String tableName =
+                                            
TablePath.of(metricName.split("#")[1]).getFullName();
+                                    JsonNode metricNode = 
jobMetricsStr.get(metricName);
+                                    processMetric(
+                                            metricName, tableName, metricNode, 
tableMetricsMaps);
                                 }
                             });
-            JsonNode sourceReceivedCountJson = 
jobMetricsStr.get(SOURCE_RECEIVED_COUNT);
-            JsonNode sinkWriteCountJson = jobMetricsStr.get(SINK_WRITE_COUNT);
-            for (int i = 0; i < 
jobMetricsStr.get(SOURCE_RECEIVED_COUNT).size(); i++) {
-                JsonNode sourceReader = sourceReceivedCountJson.get(i);
-                JsonNode sinkWriter = sinkWriteCountJson.get(i);
-                sourceReadCount += sourceReader.get("value").asLong();
-                sinkWriteCount += sinkWriter.get("value").asLong();
-            }
-        } catch (JsonProcessingException | NullPointerException e) {
+
+            // Aggregation summary and rate metrics
+            aggregateMetrics(
+                    jobMetricsStr,
+                    metricsSums,
+                    metricsRates,
+                    ArrayUtils.addAll(countMetricsNames, rateMetricsNames));
+
+        } catch (JsonProcessingException e) {
             return metricsMap;
         }
 
-        Map<String, Long> tableSourceReceivedCount =
-                tableSourceReceivedCountMap.entrySet().stream()
+        populateMetricsMap(
+                metricsMap,
+                tableMetricsMaps,
+                ArrayUtils.addAll(tableCountMetricsNames, 
tableRateMetricsNames),
+                countMetricsNames.length);
+        populateMetricsMap(
+                metricsMap,
+                Stream.concat(Arrays.stream(metricsSums), 
Arrays.stream(metricsRates))
+                        .toArray(Number[]::new),
+                ArrayUtils.addAll(countMetricsNames, rateMetricsNames),
+                metricsSums.length);
+
+        return metricsMap;
+    }
+
+    private void processMetric(
+            String metricName,
+            String tableName,
+            JsonNode metricNode,
+            Map<String, JsonNode>[] tableMetricsMaps) {
+        if (metricNode == null) return;
+
+        // Define index constant
+        final int SOURCE_COUNT_IDX = 0,
+                SINK_COUNT_IDX = 1,
+                SOURCE_BYTES_IDX = 2,
+                SINK_BYTES_IDX = 3,
+                SOURCE_QPS_IDX = 4,
+                SINK_QPS_IDX = 5,
+                SOURCE_BYTES_SEC_IDX = 6,
+                SINK_BYTES_SEC_IDX = 7;
+        if (metricName.startsWith(SOURCE_RECEIVED_COUNT + "#")) {
+            tableMetricsMaps[SOURCE_COUNT_IDX].put(tableName, metricNode);
+        } else if (metricName.startsWith(SINK_WRITE_COUNT + "#")) {
+            tableMetricsMaps[SINK_COUNT_IDX].put(tableName, metricNode);
+        } else if (metricName.startsWith(SOURCE_RECEIVED_BYTES + "#")) {
+            tableMetricsMaps[SOURCE_BYTES_IDX].put(tableName, metricNode);
+        } else if (metricName.startsWith(SINK_WRITE_BYTES + "#")) {
+            tableMetricsMaps[SINK_BYTES_IDX].put(tableName, metricNode);
+        } else if (metricName.startsWith(SOURCE_RECEIVED_QPS + "#")) {
+            tableMetricsMaps[SOURCE_QPS_IDX].put(tableName, metricNode);
+        } else if (metricName.startsWith(SINK_WRITE_QPS + "#")) {
+            tableMetricsMaps[SINK_QPS_IDX].put(tableName, metricNode);
+        } else if (metricName.startsWith(SOURCE_RECEIVED_BYTES_PER_SECONDS + 
"#")) {
+            tableMetricsMaps[SOURCE_BYTES_SEC_IDX].put(tableName, metricNode);
+        } else if (metricName.startsWith(SINK_WRITE_BYTES_PER_SECONDS + "#")) {
+            tableMetricsMaps[SINK_BYTES_SEC_IDX].put(tableName, metricNode);
+        }
+    }
+
+    private void aggregateMetrics(
+            JsonNode jobMetricsStr,
+            Long[] metricsSums,
+            Double[] metricsRates,
+            String[] metricsNames) {
+        for (int i = 0; i < metricsNames.length; i++) {
+            JsonNode metricNode = jobMetricsStr.get(metricsNames[i]);
+            if (metricNode != null && metricNode.isArray()) {
+                for (JsonNode node : metricNode) {
+                    // Match Rate Metrics vs. Value Metrics
+                    if (i < metricsSums.length) {
+                        metricsSums[i] += node.path("value").asLong();
+                    } else {
+                        metricsRates[i - metricsSums.length] += 
node.path("value").asDouble();
+                    }
+                }
+            }
+        }
+    }
+
+    private void populateMetricsMap(
+            Map<String, Object> metricsMap,
+            Object[] metrics,
+            String[] metricNames,
+            int countMetricNames) {
+        for (int i = 0; i < metrics.length; i++) {
+            if (metrics[i] != null) {
+                if (metrics[i] instanceof Map) {
+                    metricsMap.put(
+                            metricNames[i],
+                            aggregateMap(
+                                    (Map<String, JsonNode>) metrics[i], i >= 
countMetricNames));
+                } else {
+                    metricsMap.put(metricNames[i], metrics[i]);
+                }
+            }
+        }
+    }
+
+    public static Map<String, Object> aggregateMap(Map<String, JsonNode> 
inputMap, boolean isRate) {
+        return isRate
+                ? inputMap.entrySet().stream()
                         .collect(
                                 Collectors.toMap(
                                         Map.Entry::getKey,
@@ -413,11 +553,12 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
                                                 StreamSupport.stream(
                                                                 
entry.getValue().spliterator(),
                                                                 false)
-                                                        .mapToLong(
-                                                                node -> 
node.get("value").asLong())
-                                                        .sum()));
-        Map<String, Long> tableSinkWriteCount =
-                tableSinkWriteCountMap.entrySet().stream()
+                                                        .mapToDouble(
+                                                                node ->
+                                                                        
node.path("value")
+                                                                               
 .asDouble())
+                                                        .sum()))
+                : inputMap.entrySet().stream()
                         .collect(
                                 Collectors.toMap(
                                         Map.Entry::getKey,
@@ -426,14 +567,8 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
                                                                 
entry.getValue().spliterator(),
                                                                 false)
                                                         .mapToLong(
-                                                                node -> 
node.get("value").asLong())
+                                                                node -> 
node.path("value").asLong())
                                                         .sum()));
-
-        metricsMap.put(SOURCE_RECEIVED_COUNT, sourceReadCount);
-        metricsMap.put(SINK_WRITE_COUNT, sinkWriteCount);
-        metricsMap.put(TABLE_SOURCE_RECEIVED_COUNT, tableSourceReceivedCount);
-        metricsMap.put(TABLE_SINK_WRITE_COUNT, tableSinkWriteCount);
-        return metricsMap;
     }
 
     private SeaTunnelServer getSeaTunnelServer(boolean shouldBeMaster) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index e1b2494789..7f2e34bdcb 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -17,8 +17,6 @@
 
 package org.apache.seatunnel.engine.server.task;
 
-import org.apache.seatunnel.api.common.metrics.Counter;
-import org.apache.seatunnel.api.common.metrics.Meter;
 import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.catalog.TablePath;
@@ -30,13 +28,14 @@ import org.apache.seatunnel.api.table.type.Record;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.core.starter.flowcontrol.FlowControlGate;
 import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.server.metrics.TaskMetricsCalcContext;
 import org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle;
 
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -44,15 +43,8 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES;
-import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_BYTES_PER_SECONDS;
-import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_COUNT;
-import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SOURCE_RECEIVED_QPS;
-
 @Slf4j
 public class SeaTunnelSourceCollector<T> implements Collector<T> {
 
@@ -62,19 +54,12 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
 
     private final MetricsContext metricsContext;
 
+    private final TaskMetricsCalcContext taskMetricsCalcContext;
+
     private final AtomicBoolean schemaChangeBeforeCheckpointSignal = new 
AtomicBoolean(false);
 
     private final AtomicBoolean schemaChangeAfterCheckpointSignal = new 
AtomicBoolean(false);
 
-    private final Counter sourceReceivedCount;
-
-    private final Map<String, Counter> sourceReceivedCountPerTable = new 
ConcurrentHashMap<>();
-
-    private final Meter sourceReceivedQPS;
-    private final Counter sourceReceivedBytes;
-
-    private final Meter sourceReceivedBytesPerSeconds;
-
     private volatile boolean emptyThisPollNext;
     private final DataTypeChangeEventHandler dataTypeChangeEventHandler =
             new DataTypeChangeEventDispatcher();
@@ -98,20 +83,12 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
                     .iterator()
                     .forEachRemaining(type -> 
this.rowTypeMap.put(type.getKey(), type.getValue()));
         }
-        if (CollectionUtils.isNotEmpty(tablePaths)) {
-            tablePaths.forEach(
-                    tablePath ->
-                            sourceReceivedCountPerTable.put(
-                                    tablePath.getFullName(),
-                                    metricsContext.counter(
-                                            SOURCE_RECEIVED_COUNT
-                                                    + "#"
-                                                    + 
tablePath.getFullName())));
-        }
-        sourceReceivedCount = metricsContext.counter(SOURCE_RECEIVED_COUNT);
-        sourceReceivedQPS = metricsContext.meter(SOURCE_RECEIVED_QPS);
-        sourceReceivedBytes = metricsContext.counter(SOURCE_RECEIVED_BYTES);
-        sourceReceivedBytesPerSeconds = 
metricsContext.meter(SOURCE_RECEIVED_BYTES_PER_SECONDS);
+        this.taskMetricsCalcContext =
+                new TaskMetricsCalcContext(
+                        metricsContext,
+                        PluginType.SOURCE,
+                        CollectionUtils.isNotEmpty(tablePaths),
+                        tablePaths);
         flowControlGate = FlowControlGate.create(flowControlStrategy);
     }
 
@@ -129,26 +106,11 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
                     throw new SeaTunnelEngineException(
                             "Unsupported row type: " + 
rowType.getClass().getName());
                 }
-                sourceReceivedBytes.inc(size);
-                sourceReceivedBytesPerSeconds.markEvent(size);
                 flowControlGate.audit((SeaTunnelRow) row);
-                if (StringUtils.isNotEmpty(tableId)) {
-                    String tableName = TablePath.of(tableId).getFullName();
-                    Counter sourceTableCounter = 
sourceReceivedCountPerTable.get(tableName);
-                    if (Objects.nonNull(sourceTableCounter)) {
-                        sourceTableCounter.inc();
-                    } else {
-                        Counter counter =
-                                metricsContext.counter(SOURCE_RECEIVED_COUNT + 
"#" + tableName);
-                        counter.inc();
-                        sourceReceivedCountPerTable.put(tableName, counter);
-                    }
-                }
+                taskMetricsCalcContext.updateMetrics(row);
             }
             sendRecordToNext(new Record<>(row));
             emptyThisPollNext = false;
-            sourceReceivedCount.inc();
-            sourceReceivedQPS.markEvent();
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index de8257f1e9..cacaa75aae 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -17,8 +17,6 @@
 
 package org.apache.seatunnel.engine.server.task.flow;
 
-import org.apache.seatunnel.api.common.metrics.Counter;
-import org.apache.seatunnel.api.common.metrics.Meter;
 import org.apache.seatunnel.api.common.metrics.MetricsContext;
 import org.apache.seatunnel.api.event.EventListener;
 import org.apache.seatunnel.api.serialization.Serializer;
@@ -30,13 +28,14 @@ import 
org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
 import org.apache.seatunnel.api.table.type.Record;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
 import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
 import org.apache.seatunnel.engine.server.event.JobEventListener;
 import org.apache.seatunnel.engine.server.execution.TaskLocation;
+import org.apache.seatunnel.engine.server.metrics.TaskMetricsCalcContext;
 import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
 import org.apache.seatunnel.engine.server.task.context.SinkWriterContext;
 import 
org.apache.seatunnel.engine.server.task.operation.GetTaskGroupAddressOperation;
@@ -45,8 +44,6 @@ import 
org.apache.seatunnel.engine.server.task.operation.sink.SinkPrepareCommitO
 import 
org.apache.seatunnel.engine.server.task.operation.sink.SinkRegisterOperation;
 import org.apache.seatunnel.engine.server.task.record.Barrier;
 
-import org.apache.commons.lang3.StringUtils;
-
 import com.hazelcast.cluster.Address;
 import lombok.extern.slf4j.Slf4j;
 
@@ -56,18 +53,12 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
-import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
-import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS;
-import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
-import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_QPS;
 import static org.apache.seatunnel.engine.common.utils.ExceptionUtil.sneaky;
 import static 
org.apache.seatunnel.engine.server.task.AbstractTask.serializeStates;
 
@@ -96,15 +87,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
 
     private MetricsContext metricsContext;
 
-    private Counter sinkWriteCount;
-
-    private Map<String, Counter> sinkWriteCountPerTable = new 
ConcurrentHashMap<>();
-
-    private Meter sinkWriteQPS;
-
-    private Counter sinkWriteBytes;
-
-    private Meter sinkWriteBytesPerSeconds;
+    private TaskMetricsCalcContext taskMetricsCalcContext;
 
     private final boolean containAggCommitter;
 
@@ -129,19 +112,13 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
         this.containAggCommitter = containAggCommitter;
         this.metricsContext = metricsContext;
         this.eventListener = new JobEventListener(taskLocation, 
runningTask.getExecutionContext());
-        sinkWriteCount = metricsContext.counter(SINK_WRITE_COUNT);
-        sinkWriteQPS = metricsContext.meter(SINK_WRITE_QPS);
-        sinkWriteBytes = metricsContext.counter(SINK_WRITE_BYTES);
-        sinkWriteBytesPerSeconds = 
metricsContext.meter(SINK_WRITE_BYTES_PER_SECONDS);
-        if (sinkAction.getSink() instanceof MultiTableSink) {
-            List<TablePath> sinkTables = ((MultiTableSink) 
sinkAction.getSink()).getSinkTables();
-            sinkTables.forEach(
-                    tablePath ->
-                            sinkWriteCountPerTable.put(
-                                    tablePath.getFullName(),
-                                    metricsContext.counter(
-                                            SINK_WRITE_COUNT + "#" + 
tablePath.getFullName())));
+        List<TablePath> sinkTables = new ArrayList<>();
+        boolean isMulti = sinkAction.getSink() instanceof MultiTableSink;
+        if (isMulti) {
+            sinkTables = ((MultiTableSink) 
sinkAction.getSink()).getSinkTables();
         }
+        this.taskMetricsCalcContext =
+                new TaskMetricsCalcContext(metricsContext, PluginType.SINK, 
isMulti, sinkTables);
     }
 
     @Override
@@ -267,26 +244,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
                     return;
                 }
                 writer.write((T) record.getData());
-                sinkWriteCount.inc();
-                sinkWriteQPS.markEvent();
-                if (record.getData() instanceof SeaTunnelRow) {
-                    long size = ((SeaTunnelRow) 
record.getData()).getBytesSize();
-                    sinkWriteBytes.inc(size);
-                    sinkWriteBytesPerSeconds.markEvent(size);
-                    String tableId = ((SeaTunnelRow) 
record.getData()).getTableId();
-                    if (StringUtils.isNotBlank(tableId)) {
-                        String tableName = TablePath.of(tableId).getFullName();
-                        Counter sinkTableCounter = 
sinkWriteCountPerTable.get(tableName);
-                        if (Objects.nonNull(sinkTableCounter)) {
-                            sinkTableCounter.inc();
-                        } else {
-                            Counter counter =
-                                    metricsContext.counter(SINK_WRITE_COUNT + 
"#" + tableName);
-                            counter.inc();
-                            sinkWriteCountPerTable.put(tableName, counter);
-                        }
-                    }
-                }
+                taskMetricsCalcContext.updateMetrics(record.getData());
             }
         } catch (Exception e) {
             throw new RuntimeException(e);

Reply via email to