This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 8871f52fde [bugfix][core] Fix the problem of incorrect association 
between metrics and nodes (#7799)
8871f52fde is described below

commit 8871f52fdec82ed1845f27b56fda0e08a62cd7f1
Author: Guangdong Liu <[email protected]>
AuthorDate: Tue Oct 15 19:50:03 2024 +0800

    [bugfix][core] Fix the problem of incorrect association between metrics and 
nodes (#7799)
---
 docs/en/seatunnel-engine/rest-api-v1.md            |  18 ++-
 docs/en/seatunnel-engine/rest-api-v2.md            |  18 ++-
 docs/zh/seatunnel-engine/rest-api-v1.md            |  21 ++-
 docs/zh/seatunnel-engine/rest-api-v2.md            |  20 ++-
 .../org/apache/seatunnel/engine/e2e/RestApiIT.java | 160 +++++++++++++++++++++
 .../seatunnel/engine/core/job/JobDAGInfo.java      |   4 +
 .../seatunnel/engine/server/dag/DAGUtils.java      |  10 +-
 .../server/metrics/TaskMetricsCalcContext.java     |   3 +-
 .../server/rest/RestHttpGetCommandProcessor.java   |  13 +-
 .../engine/server/rest/servlet/BaseServlet.java    |  31 +++-
 .../engine/server/rest/servlet/JobInfoServlet.java |  14 +-
 .../server/task/SeaTunnelSourceCollector.java      |   2 +-
 .../engine/server/task/flow/SinkFlowLifeCycle.java |  58 +++++++-
 13 files changed, 334 insertions(+), 38 deletions(-)

diff --git a/docs/en/seatunnel-engine/rest-api-v1.md 
b/docs/en/seatunnel-engine/rest-api-v1.md
index ec9d8f13b9..aaefcbc5fa 100644
--- a/docs/en/seatunnel-engine/rest-api-v1.md
+++ b/docs/en/seatunnel-engine/rest-api-v1.md
@@ -121,10 +121,19 @@ network:
     },
     "createTime": "",
     "jobDag": {
-      "vertices": [
+      "jobId": "",
+      "envOptions": [],
+      "vertexInfoMap": [
+        {
+          "vertexId": 1,
+          "type": "",
+          "vertexName": "",
+          "tablePaths": [
+            ""
+          ]
+        }
       ],
-      "edges": [
-      ]
+      "pipelineEdges": {}
     },
     "pluginJarsUrls": [
     ],
@@ -162,6 +171,7 @@ network:
   "createTime": "",
   "jobDag": {
     "jobId": "",
+    "envOptions": [],
     "vertexInfoMap": [
       {
         "vertexId": 1,
@@ -227,6 +237,7 @@ This API has been deprecated, please use 
/hazelcast/rest/maps/job-info/:jobId in
   "createTime": "",
   "jobDag": {
     "jobId": "",
+    "envOptions": [],
     "vertexInfoMap": [
       {
         "vertexId": 1,
@@ -307,6 +318,7 @@ When we can't get the job info, the response will be:
     "finishTime": "",
     "jobDag": {
       "jobId": "",
+      "envOptions": [],
       "vertexInfoMap": [
         {
           "vertexId": 1,
diff --git a/docs/en/seatunnel-engine/rest-api-v2.md 
b/docs/en/seatunnel-engine/rest-api-v2.md
index e5b9d5d718..1e7cf10d4e 100644
--- a/docs/en/seatunnel-engine/rest-api-v2.md
+++ b/docs/en/seatunnel-engine/rest-api-v2.md
@@ -88,10 +88,19 @@ seatunnel:
     },
     "createTime": "",
     "jobDag": {
-      "vertices": [
+      "jobId": "",
+      "envOptions": [],
+      "vertexInfoMap": [
+        {
+          "vertexId": 1,
+          "type": "",
+          "vertexName": "",
+          "tablePaths": [
+            ""
+          ]
+        }
       ],
-      "edges": [
-      ]
+      "pipelineEdges": {}
     },
     "pluginJarsUrls": [
     ],
@@ -129,6 +138,7 @@ seatunnel:
   "createTime": "",
   "jobDag": {
     "jobId": "",
+    "envOptions": [],
     "vertexInfoMap": [
       {
         "vertexId": 1,
@@ -194,6 +204,7 @@ This API has been deprecated, please use /job-info/:jobId 
instead
   "createTime": "",
   "jobDag": {
     "jobId": "",
+    "envOptions": [],
     "vertexInfoMap": [
       {
         "vertexId": 1,
@@ -274,6 +285,7 @@ When we can't get the job info, the response will be:
     "finishTime": "",
     "jobDag": {
       "jobId": "",
+      "envOptions": [],
       "vertexInfoMap": [
         {
           "vertexId": 1,
diff --git a/docs/zh/seatunnel-engine/rest-api-v1.md 
b/docs/zh/seatunnel-engine/rest-api-v1.md
index 5154922ec0..a59c6bbde5 100644
--- a/docs/zh/seatunnel-engine/rest-api-v1.md
+++ b/docs/zh/seatunnel-engine/rest-api-v1.md
@@ -119,10 +119,19 @@ network:
     },
     "createTime": "",
     "jobDag": {
-      "vertices": [
+      "jobId": "",
+      "envOptions": [],
+      "vertexInfoMap": [
+        {
+          "vertexId": 1,
+          "type": "",
+          "vertexName": "",
+          "tablePaths": [
+            ""
+          ]
+        }
       ],
-      "edges": [
-      ]
+      "pipelineEdges": {}
     },
     "pluginJarsUrls": [
     ],
@@ -160,6 +169,7 @@ network:
   "createTime": "",
   "jobDag": {
     "jobId": "",
+    "envOptions": [],
     "vertexInfoMap": [
       {
         "vertexId": 1,
@@ -239,6 +249,7 @@ network:
   "createTime": "",
   "jobDag": {
     "jobId": "",
+    "envOptions": [],
     "vertexInfoMap": [
       {
         "vertexId": 1,
@@ -305,6 +316,7 @@ network:
     "finishTime": "",
     "jobDag": {
       "jobId": "",
+      "envOptions": [],
       "vertexInfoMap": [
         {
           "vertexId": 1,
@@ -316,7 +328,8 @@ network:
         }
       ],
       "pipelineEdges": {}
-    },    "metrics": ""
+    },
+    "metrics": ""
   }
 ]
 ```
diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md 
b/docs/zh/seatunnel-engine/rest-api-v2.md
index df884fa18e..75a03f93fd 100644
--- a/docs/zh/seatunnel-engine/rest-api-v2.md
+++ b/docs/zh/seatunnel-engine/rest-api-v2.md
@@ -80,14 +80,21 @@ seatunnel:
     "jobId": "",
     "jobName": "",
     "jobStatus": "",
-    "envOptions": {
-    },
     "createTime": "",
     "jobDag": {
-      "vertices": [
+      "jobId": "",
+      "envOptions": [],
+      "vertexInfoMap": [
+        {
+          "vertexId": 1,
+          "type": "",
+          "vertexName": "",
+          "tablePaths": [
+            ""
+          ]
+        }
       ],
-      "edges": [
-      ]
+      "pipelineEdges": {}
     },
     "pluginJarsUrls": [
     ],
@@ -125,6 +132,7 @@ seatunnel:
   "createTime": "",
   "jobDag": {
     "jobId": "",
+    "envOptions": [],
     "vertexInfoMap": [
       {
         "vertexId": 1,
@@ -204,6 +212,7 @@ seatunnel:
   "createTime": "",
   "jobDag": {
     "jobId": "",
+    "envOptions": [],
     "vertexInfoMap": [
       {
         "vertexId": 1,
@@ -270,6 +279,7 @@ seatunnel:
     "finishTime": "",
     "jobDag": {
       "jobId": "",
+      "envOptions": [],
       "vertexInfoMap": [
         {
           "vertexId": 1,
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index 8e5b15cc3d..0dd90eddda 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -280,6 +280,55 @@ public class RestApiIT {
                                                                     + 
RestConstant.RUNNING_JOBS_URL)
                                                     .then()
                                                     .statusCode(200)
+                                                    .body(
+                                                            "[0].jobDag.jobId",
+                                                            equalTo(
+                                                                    
Long.toString(
+                                                                            
clientJobProxy
+                                                                               
     .getJobId())))
+                                                    
.body("[0].jobDag.pipelineEdges", hasKey("1"))
+                                                    .body(
+                                                            
"[0].jobDag.pipelineEdges['1']",
+                                                            hasSize(1))
+                                                    .body(
+                                                            
"[0].jobDag.pipelineEdges['1'][0].inputVertexId",
+                                                            equalTo("1"))
+                                                    .body(
+                                                            
"[0].jobDag.pipelineEdges['1'][0].targetVertexId",
+                                                            equalTo("2"))
+                                                    
.body("[0].jobDag.vertexInfoMap", hasSize(2))
+                                                    .body(
+                                                            
"[0].jobDag.vertexInfoMap[0].vertexId",
+                                                            equalTo(1))
+                                                    .body(
+                                                            
"[0].jobDag.vertexInfoMap[0].type",
+                                                            equalTo("source"))
+                                                    .body(
+                                                            
"[0].jobDag.vertexInfoMap[0].vertexName",
+                                                            equalTo(
+                                                                    
"pipeline-1 [Source[0]-FakeSource]"))
+                                                    .body(
+                                                            
"[0].jobDag.vertexInfoMap[0].tablePaths[0]",
+                                                            equalTo("fake"))
+                                                    .body(
+                                                            
"[0].jobDag.vertexInfoMap[1].vertexId",
+                                                            equalTo(2))
+                                                    .body(
+                                                            
"[0].jobDag.vertexInfoMap[1].type",
+                                                            equalTo("sink"))
+                                                    .body(
+                                                            
"[0].jobDag.vertexInfoMap[1].vertexName",
+                                                            equalTo(
+                                                                    
"pipeline-1 [Sink[0]-LocalFile-MultiTableSink]"))
+                                                    .body(
+                                                            
"[0].jobDag.vertexInfoMap[1].tablePaths[0]",
+                                                            equalTo("fake"))
+                                                    .body(
+                                                            
"[0].jobDag.envOptions.'job.mode'",
+                                                            
equalTo("STREAMING"))
+                                                    .body(
+                                                            
"[0].jobDag.envOptions.'checkpoint.interval'",
+                                                            equalTo("5000"))
                                                     .body("[0].jobName", 
equalTo("fake_to_file"))
                                                     .body("[0].jobStatus", 
equalTo("RUNNING"));
 
@@ -293,6 +342,55 @@ public class RestApiIT {
                                                                     + 
RestConstant.RUNNING_JOBS_URL)
                                                     .then()
                                                     .statusCode(200)
+                                                    .body(
+                                                            "[0].jobDag.jobId",
+                                                            equalTo(
+                                                                    
Long.toString(
+                                                                            
clientJobProxy
+                                                                               
     .getJobId())))
+                                                    
.body("[0].jobDag.pipelineEdges", hasKey("1"))
+                                                    .body(
+                                                            
"[0].jobDag.pipelineEdges['1']",
+                                                            hasSize(1))
+                                                    .body(
+                                                            
"[0].jobDag.pipelineEdges['1'][0].inputVertexId",
+                                                            equalTo("1"))
+                                                    .body(
+                                                            
"[0].jobDag.pipelineEdges['1'][0].targetVertexId",
+                                                            equalTo("2"))
+                                                    
.body("[0].jobDag.vertexInfoMap", hasSize(2))
+                                                    .body(
+                                                            
"[0].jobDag.vertexInfoMap[0].vertexId",
+                                                            equalTo(1))
+                                                    .body(
+                                                            
"[0].jobDag.vertexInfoMap[0].type",
+                                                            equalTo("source"))
+                                                    .body(
+                                                            
"[0].jobDag.vertexInfoMap[0].vertexName",
+                                                            equalTo(
+                                                                    
"pipeline-1 [Source[0]-FakeSource]"))
+                                                    .body(
+                                                            
"[0].jobDag.vertexInfoMap[0].tablePaths[0]",
+                                                            equalTo("fake"))
+                                                    .body(
+                                                            
"[0].jobDag.vertexInfoMap[1].vertexId",
+                                                            equalTo(2))
+                                                    .body(
+                                                            
"[0].jobDag.vertexInfoMap[1].type",
+                                                            equalTo("sink"))
+                                                    .body(
+                                                            
"[0].jobDag.vertexInfoMap[1].vertexName",
+                                                            equalTo(
+                                                                    
"pipeline-1 [Sink[0]-LocalFile-MultiTableSink]"))
+                                                    .body(
+                                                            
"[0].jobDag.vertexInfoMap[1].tablePaths[0]",
+                                                            equalTo("fake"))
+                                                    .body(
+                                                            
"[0].jobDag.envOptions.'job.mode'",
+                                                            
equalTo("STREAMING"))
+                                                    .body(
+                                                            
"[0].jobDag.envOptions.'checkpoint.interval'",
+                                                            equalTo("5000"))
                                                     .body("[0].jobName", 
equalTo("fake_to_file"))
                                                     .body("[0].jobStatus", 
equalTo("RUNNING"));
                                         }));
@@ -314,6 +412,57 @@ public class RestApiIT {
                                                                 + 
batchJobProxy.getJobId())
                                                 .then()
                                                 .statusCode(200)
+                                                .body(
+                                                        "jobDag.jobId",
+                                                        equalTo(
+                                                                Long.toString(
+                                                                        
batchJobProxy.getJobId())))
+                                                .body("jobDag.pipelineEdges", 
hasKey("1"))
+                                                
.body("jobDag.pipelineEdges['1']", hasSize(1))
+                                                .body(
+                                                        
"jobDag.pipelineEdges['1'][0].inputVertexId",
+                                                        equalTo("1"))
+                                                .body(
+                                                        
"jobDag.pipelineEdges['1'][0].targetVertexId",
+                                                        equalTo("2"))
+                                                .body("jobDag.vertexInfoMap", 
hasSize(2))
+                                                .body(
+                                                        
"jobDag.vertexInfoMap[0].vertexId",
+                                                        equalTo(1))
+                                                .body(
+                                                        
"jobDag.vertexInfoMap[0].type",
+                                                        equalTo("source"))
+                                                .body(
+                                                        
"jobDag.vertexInfoMap[0].vertexName",
+                                                        equalTo(
+                                                                "pipeline-1 
[Source[0]-FakeSource]"))
+                                                .body(
+                                                        
"jobDag.vertexInfoMap[0].tablePaths[0]",
+                                                        equalTo("fake"))
+                                                .body(
+                                                        
"jobDag.vertexInfoMap[1].vertexId",
+                                                        equalTo(2))
+                                                .body(
+                                                        
"jobDag.vertexInfoMap[1].type",
+                                                        equalTo("sink"))
+                                                .body(
+                                                        
"jobDag.vertexInfoMap[1].vertexName",
+                                                        equalTo(
+                                                                "pipeline-1 
[Sink[0]-console-MultiTableSink]"))
+                                                .body(
+                                                        
"jobDag.vertexInfoMap[1].tablePaths[0]",
+                                                        equalTo("fake"))
+                                                .body(
+                                                        
"metrics.TableSourceReceivedCount.fake",
+                                                        equalTo("5"))
+                                                .body(
+                                                        
"metrics.TableSinkWriteCount.fake",
+                                                        equalTo("5"))
+                                                
.body("metrics.SinkWriteCount", equalTo("5"))
+                                                
.body("metrics.SourceReceivedCount", equalTo("5"))
+                                                .body(
+                                                        
"jobDag.envOptions.'job.mode'",
+                                                        equalTo("BATCH"))
                                                 .body("jobName", 
equalTo("fake_to_console"))
                                                 .body("jobStatus", 
equalTo("FINISHED"));
 
@@ -369,6 +518,17 @@ public class RestApiIT {
                                                 .body(
                                                         
"jobDag.vertexInfoMap[1].tablePaths[0]",
                                                         equalTo("fake"))
+                                                .body(
+                                                        
"metrics.TableSourceReceivedCount.fake",
+                                                        equalTo("5"))
+                                                .body(
+                                                        
"metrics.TableSinkWriteCount.fake",
+                                                        equalTo("5"))
+                                                
.body("metrics.SinkWriteCount", equalTo("5"))
+                                                
.body("metrics.SourceReceivedCount", equalTo("5"))
+                                                .body(
+                                                        
"jobDag.envOptions.'job.mode'",
+                                                        equalTo("BATCH"))
                                                 .body("jobName", 
equalTo("fake_to_console"))
                                                 .body("jobStatus", 
equalTo("FINISHED"));
                                     });
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
index ee6326acbd..aea57beef3 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
 
 import com.hazelcast.internal.json.JsonArray;
 import com.hazelcast.internal.json.JsonObject;
+import com.hazelcast.internal.util.JsonUtil;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -34,6 +35,7 @@ import java.util.Map;
 @Data
 public class JobDAGInfo implements Serializable {
     Long jobId;
+    Map<String, Object> envOptions;
     Map<Integer, List<Edge>> pipelineEdges;
     Map<Long, VertexInfo> vertexInfoMap;
 
@@ -54,6 +56,8 @@ public class JobDAGInfo implements Serializable {
         JsonObject jsonObject = new JsonObject();
         jsonObject.add("jobId", jobId.toString());
         jsonObject.add("pipelineEdges", pipelineEdgesJsonObject);
+        jsonObject.add("envOptions", JsonUtil.toJsonObject(envOptions));
+
         JsonArray vertexInfoMapString = new JsonArray();
         for (Map.Entry<Long, VertexInfo> entry : vertexInfoMap.entrySet()) {
             JsonObject vertexInfoJsonObj = new JsonObject();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
index e7b41de73d..bbe6d77e00 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
@@ -86,7 +86,10 @@ public class DAGUtils {
                                         });
                     });
             return new JobDAGInfo(
-                    jobImmutableInformation.getJobId(), pipelineWithEdges, 
vertexInfoMap);
+                    jobImmutableInformation.getJobId(),
+                    logicalDag.getJobConfig().getEnvOptions(),
+                    pipelineWithEdges,
+                    vertexInfoMap);
         } else {
             // Generate LogicalPlan DAG
             List<Edge> edges =
@@ -130,7 +133,10 @@ public class DAGUtils {
                                             },
                                             Collectors.toList()));
             return new JobDAGInfo(
-                    jobImmutableInformation.getJobId(), pipelineWithEdges, 
vertexInfoMap);
+                    jobImmutableInformation.getJobId(),
+                    logicalDag.getJobConfig().getEnvOptions(),
+                    pipelineWithEdges,
+                    vertexInfoMap);
         }
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java
index eab9ecbd34..6890421f9f 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java
@@ -122,14 +122,13 @@ public class TaskMetricsCalcContext {
         }
     }
 
-    public void updateMetrics(Object data) {
+    public void updateMetrics(Object data, String tableId) {
         count.inc();
         QPS.markEvent();
         if (data instanceof SeaTunnelRow) {
             SeaTunnelRow row = (SeaTunnelRow) data;
             bytes.inc(row.getBytesSize());
             bytesPerSeconds.markEvent(row.getBytesSize());
-            String tableId = row.getTableId();
 
             if (StringUtils.isNotBlank(tableId)) {
                 String tableName = TablePath.of(tableId).getFullName();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index d052629f2e..b860dbc1c7 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -35,6 +35,7 @@ import 
org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobInfo;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.dag.DAGUtils;
 import org.apache.seatunnel.engine.server.log.Log4j2HttpGetCommandProcessor;
 import org.apache.seatunnel.engine.server.master.JobHistoryService.JobState;
 import 
org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation;
@@ -692,19 +693,23 @@ public class RestHttpGetCommandProcessor extends 
HttpCommandProcessor<HttpGetCom
             jobStatus = 
seaTunnelServer.getCoordinatorService().getJobStatus(jobId);
         }
 
+        JobDAGInfo jobDAGInfo =
+                DAGUtils.getJobDAGInfo(
+                        logicalDag,
+                        jobImmutableInformation,
+                        
getSeaTunnelServer(false).getSeaTunnelConfig().getEngineConfig(),
+                        true);
+
         jobInfoJson
                 .add(RestConstant.JOB_ID, String.valueOf(jobId))
                 .add(RestConstant.JOB_NAME, 
logicalDag.getJobConfig().getName())
                 .add(RestConstant.JOB_STATUS, jobStatus.toString())
-                .add(
-                        RestConstant.ENV_OPTIONS,
-                        
JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions()))
                 .add(
                         RestConstant.CREATE_TIME,
                         DateTimeUtils.toString(
                                 jobImmutableInformation.getCreateTime(),
                                 DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
-                .add(RestConstant.JOB_DAG, logicalDag.getLogicalDagAsJson())
+                .add(RestConstant.JOB_DAG, jobDAGInfo.toJsonObject())
                 .add(
                         RestConstant.PLUGIN_JARS_URLS,
                         (JsonValue)
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
index ce5fc74d3c..5553e2c85e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
@@ -37,6 +37,7 @@ import org.apache.seatunnel.engine.core.job.JobInfo;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.CoordinatorService;
 import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.dag.DAGUtils;
 import org.apache.seatunnel.engine.server.master.JobHistoryService;
 import org.apache.seatunnel.engine.server.operation.CancelJobOperation;
 import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
@@ -57,6 +58,7 @@ import com.hazelcast.internal.json.JsonArray;
 import com.hazelcast.internal.json.JsonObject;
 import com.hazelcast.internal.json.JsonValue;
 import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.internal.util.JsonUtil;
 import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
 import com.hazelcast.spi.impl.NodeEngineImpl;
 
@@ -74,7 +76,6 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
-import static com.hazelcast.internal.util.JsonUtil.toJsonObject;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS;
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
@@ -182,19 +183,26 @@ public class BaseServlet extends HttpServlet {
             jobStatus = 
seaTunnelServer.getCoordinatorService().getJobStatus(jobId);
         }
 
+        JobDAGInfo jobDAGInfo =
+                DAGUtils.getJobDAGInfo(
+                        logicalDag,
+                        jobImmutableInformation,
+                        
getSeaTunnelServer(false).getSeaTunnelConfig().getEngineConfig(),
+                        true);
+
         jobInfoJson
                 .add(RestConstant.JOB_ID, String.valueOf(jobId))
                 .add(RestConstant.JOB_NAME, 
logicalDag.getJobConfig().getName())
                 .add(RestConstant.JOB_STATUS, jobStatus.toString())
                 .add(
                         RestConstant.ENV_OPTIONS,
-                        
toJsonObject(logicalDag.getJobConfig().getEnvOptions()))
+                        
JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions()))
                 .add(
                         RestConstant.CREATE_TIME,
                         DateTimeUtils.toString(
                                 jobImmutableInformation.getCreateTime(),
                                 DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
-                .add(RestConstant.JOB_DAG, logicalDag.getLogicalDagAsJson())
+                .add(RestConstant.JOB_DAG, jobDAGInfo.toJsonObject())
                 .add(
                         RestConstant.PLUGIN_JARS_URLS,
                         (JsonValue)
@@ -210,7 +218,7 @@ public class BaseServlet extends HttpServlet {
                 .add(
                         RestConstant.IS_START_WITH_SAVE_POINT,
                         jobImmutableInformation.isStartWithSavePoint())
-                .add(RestConstant.METRICS, 
toJsonObject(getJobMetrics(jobMetrics)));
+                .add(RestConstant.METRICS, 
metricsToJsonObject(getJobMetrics(jobMetrics)));
 
         return jobInfoJson;
     }
@@ -288,7 +296,7 @@ public class BaseServlet extends HttpServlet {
                                 DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
                 .add(RestConstant.JOB_DAG, jobDAGInfo.toJsonObject())
                 .add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
-                .add(RestConstant.METRICS, 
toJsonObject(getJobMetrics(jobMetrics)));
+                .add(RestConstant.METRICS, 
metricsToJsonObject(getJobMetrics(jobMetrics)));
     }
 
     private Map<String, Object> getJobMetrics(String jobMetrics) {
@@ -575,4 +583,17 @@ public class BaseServlet extends HttpServlet {
                         jobImmutableInformation.isStartWithSavePoint());
         voidPassiveCompletableFuture.join();
     }
+
+    private JsonObject metricsToJsonObject(Map<String, Object> jobMetrics) {
+        JsonObject members = new JsonObject();
+        jobMetrics.forEach(
+                (key, value) -> {
+                    if (value instanceof Map) {
+                        members.add(key, metricsToJsonObject((Map<String, 
Object>) value));
+                    } else {
+                        members.add(key, value.toString());
+                    }
+                });
+        return members;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/JobInfoServlet.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/JobInfoServlet.java
index 16683e6826..d41635a9eb 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/JobInfoServlet.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/JobInfoServlet.java
@@ -48,18 +48,18 @@ public class JobInfoServlet extends BaseServlet {
         if (jobId != null && jobId.length() > 1) {
             jobId = jobId.substring(1);
         } else {
-            jobId = "";
+            throw new IllegalArgumentException("The jobId must not be empty.");
         }
 
         IMap<Object, Object> jobInfoMap =
                 
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_INFO);
-        JobInfo jobInfo = (JobInfo) jobInfoMap.get(Long.valueOf(jobId));
+        Object jobInfo = jobInfoMap.get(Long.valueOf(jobId));
         IMap<Object, Object> finishedJobStateMap =
                 
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE);
-        JobState finishedJobState = (JobState) 
finishedJobStateMap.get(Long.valueOf(jobId));
-        if (!jobId.isEmpty() && jobInfo != null) {
-            writeJson(resp, convertToJson(jobInfo, Long.parseLong(jobId)));
-        } else if (!jobId.isEmpty() && finishedJobState != null) {
+        Object finishedJobState = finishedJobStateMap.get(Long.valueOf(jobId));
+        if (jobInfo != null) {
+            writeJson(resp, convertToJson((JobInfo) jobInfo, 
Long.parseLong(jobId)));
+        } else if (finishedJobState != null) {
             JobMetrics finishedJobMetrics =
                     (JobMetrics)
                             nodeEngine
@@ -75,7 +75,7 @@ public class JobInfoServlet extends BaseServlet {
             writeJson(
                     resp,
                     getJobInfoJson(
-                            finishedJobState,
+                            (JobState) finishedJobState,
                             finishedJobMetrics.toJsonString(),
                             finishedJobDAGInfo));
         } else {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index 7f2e34bdcb..53d206874a 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -107,7 +107,7 @@ public class SeaTunnelSourceCollector<T> implements 
Collector<T> {
                             "Unsupported row type: " + 
rowType.getClass().getName());
                 }
                 flowControlGate.audit((SeaTunnelRow) row);
-                taskMetricsCalcContext.updateMetrics(row);
+                taskMetricsCalcContext.updateMetrics(row, tableId);
             }
             sendRecordToNext(new Record<>(row));
             emptyThisPollNext = false;
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 5970d9a745..295328d821 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -23,12 +23,15 @@ import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.MultiTableResourceManager;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SinkWriter.Context;
 import org.apache.seatunnel.api.sink.SupportResourceShare;
 import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
 import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
 import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
 import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
@@ -53,7 +56,9 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -70,7 +75,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
 
     private final SinkAction<T, StateT, CommitInfoT, AggregatedCommitInfoT> 
sinkAction;
     private SinkWriter<T, CommitInfoT, StateT> writer;
-    private SinkWriter.Context writerContext;
+    private Context writerContext;
 
     private transient Optional<Serializer<CommitInfoT>> commitInfoSerializer;
     private transient Optional<Serializer<StateT>> writerStateSerializer;
@@ -97,6 +102,9 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
 
     private EventListener eventListener;
 
+    /** Mapping relationship between upstream tablepath and downstream 
tablepath. */
+    private final Map<TablePath, TablePath> tablesMaps = new HashMap<>();
+
     public SinkFlowLifeCycle(
             SinkAction<T, StateT, CommitInfoT, AggregatedCommitInfoT> 
sinkAction,
             TaskLocation taskLocation,
@@ -118,6 +126,21 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
         boolean isMulti = sinkAction.getSink() instanceof MultiTableSink;
         if (isMulti) {
             sinkTables = ((MultiTableSink) 
sinkAction.getSink()).getSinkTables();
+            String[] upstreamTablePaths =
+                    ((MultiTableSink) sinkAction.getSink())
+                            .getSinks()
+                            .keySet()
+                            .toArray(new String[0]);
+            for (int i = 0; i < ((MultiTableSink) 
sinkAction.getSink()).getSinks().size(); i++) {
+                tablesMaps.put(TablePath.of(upstreamTablePaths[i]), 
sinkTables.get(i));
+            }
+        } else {
+            Optional<CatalogTable> catalogTable = 
sinkAction.getSink().getWriteCatalogTable();
+            if (catalogTable.isPresent()) {
+                sinkTables.add(catalogTable.get().getTablePath());
+            } else {
+                sinkTables.add(TablePath.DEFAULT);
+            }
         }
         this.taskMetricsCalcContext =
                 new TaskMetricsCalcContext(metricsContext, PluginType.SINK, 
isMulti, sinkTables);
@@ -246,8 +269,39 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends 
Serializable, AggregatedCo
                 if (prepareClose) {
                     return;
                 }
+                String tableId = "";
                 writer.write((T) record.getData());
-                taskMetricsCalcContext.updateMetrics(record.getData());
+                if (record.getData() instanceof SeaTunnelRow) {
+                    if (this.sinkAction.getSink() instanceof MultiTableSink) {
+                        if (((SeaTunnelRow) record.getData()).getTableId() == 
null
+                                || ((SeaTunnelRow) 
record.getData()).getTableId().isEmpty()) {
+                            tableId = ((SeaTunnelRow) 
record.getData()).getTableId();
+                        } else {
+
+                            TablePath tablePath =
+                                    tablesMaps.get(
+                                            TablePath.of(
+                                                    ((SeaTunnelRow) 
record.getData())
+                                                            .getTableId()));
+                            tableId =
+                                    tablePath != null
+                                            ? tablePath.getFullName()
+                                            : TablePath.DEFAULT.getFullName();
+                        }
+
+                    } else {
+                        Optional<CatalogTable> writeCatalogTable =
+                                
this.sinkAction.getSink().getWriteCatalogTable();
+                        tableId =
+                                writeCatalogTable
+                                        .map(
+                                                catalogTable ->
+                                                        
catalogTable.getTablePath().getFullName())
+                                        
.orElseGet(TablePath.DEFAULT::getFullName);
+                    }
+
+                    taskMetricsCalcContext.updateMetrics(record.getData(), 
tableId);
+                }
             }
         } catch (Exception e) {
             throw new RuntimeException(e);


Reply via email to