This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 8871f52fde [bugfix][core] Fix the problem of incorrect association
between metrics and nodes (#7799)
8871f52fde is described below
commit 8871f52fdec82ed1845f27b56fda0e08a62cd7f1
Author: Guangdong Liu <[email protected]>
AuthorDate: Tue Oct 15 19:50:03 2024 +0800
[bugfix][core] Fix the problem of incorrect association between metrics and
nodes (#7799)
---
docs/en/seatunnel-engine/rest-api-v1.md | 18 ++-
docs/en/seatunnel-engine/rest-api-v2.md | 18 ++-
docs/zh/seatunnel-engine/rest-api-v1.md | 21 ++-
docs/zh/seatunnel-engine/rest-api-v2.md | 20 ++-
.../org/apache/seatunnel/engine/e2e/RestApiIT.java | 160 +++++++++++++++++++++
.../seatunnel/engine/core/job/JobDAGInfo.java | 4 +
.../seatunnel/engine/server/dag/DAGUtils.java | 10 +-
.../server/metrics/TaskMetricsCalcContext.java | 3 +-
.../server/rest/RestHttpGetCommandProcessor.java | 13 +-
.../engine/server/rest/servlet/BaseServlet.java | 31 +++-
.../engine/server/rest/servlet/JobInfoServlet.java | 14 +-
.../server/task/SeaTunnelSourceCollector.java | 2 +-
.../engine/server/task/flow/SinkFlowLifeCycle.java | 58 +++++++-
13 files changed, 334 insertions(+), 38 deletions(-)
diff --git a/docs/en/seatunnel-engine/rest-api-v1.md
b/docs/en/seatunnel-engine/rest-api-v1.md
index ec9d8f13b9..aaefcbc5fa 100644
--- a/docs/en/seatunnel-engine/rest-api-v1.md
+++ b/docs/en/seatunnel-engine/rest-api-v1.md
@@ -121,10 +121,19 @@ network:
},
"createTime": "",
"jobDag": {
- "vertices": [
+ "jobId": "",
+ "envOptions": [],
+ "vertexInfoMap": [
+ {
+ "vertexId": 1,
+ "type": "",
+ "vertexName": "",
+ "tablePaths": [
+ ""
+ ]
+ }
],
- "edges": [
- ]
+ "pipelineEdges": {}
},
"pluginJarsUrls": [
],
@@ -162,6 +171,7 @@ network:
"createTime": "",
"jobDag": {
"jobId": "",
+ "envOptions": [],
"vertexInfoMap": [
{
"vertexId": 1,
@@ -227,6 +237,7 @@ This API has been deprecated, please use
/hazelcast/rest/maps/job-info/:jobId in
"createTime": "",
"jobDag": {
"jobId": "",
+ "envOptions": [],
"vertexInfoMap": [
{
"vertexId": 1,
@@ -307,6 +318,7 @@ When we can't get the job info, the response will be:
"finishTime": "",
"jobDag": {
"jobId": "",
+ "envOptions": [],
"vertexInfoMap": [
{
"vertexId": 1,
diff --git a/docs/en/seatunnel-engine/rest-api-v2.md
b/docs/en/seatunnel-engine/rest-api-v2.md
index e5b9d5d718..1e7cf10d4e 100644
--- a/docs/en/seatunnel-engine/rest-api-v2.md
+++ b/docs/en/seatunnel-engine/rest-api-v2.md
@@ -88,10 +88,19 @@ seatunnel:
},
"createTime": "",
"jobDag": {
- "vertices": [
+ "jobId": "",
+ "envOptions": [],
+ "vertexInfoMap": [
+ {
+ "vertexId": 1,
+ "type": "",
+ "vertexName": "",
+ "tablePaths": [
+ ""
+ ]
+ }
],
- "edges": [
- ]
+ "pipelineEdges": {}
},
"pluginJarsUrls": [
],
@@ -129,6 +138,7 @@ seatunnel:
"createTime": "",
"jobDag": {
"jobId": "",
+ "envOptions": [],
"vertexInfoMap": [
{
"vertexId": 1,
@@ -194,6 +204,7 @@ This API has been deprecated, please use /job-info/:jobId
instead
"createTime": "",
"jobDag": {
"jobId": "",
+ "envOptions": [],
"vertexInfoMap": [
{
"vertexId": 1,
@@ -274,6 +285,7 @@ When we can't get the job info, the response will be:
"finishTime": "",
"jobDag": {
"jobId": "",
+ "envOptions": [],
"vertexInfoMap": [
{
"vertexId": 1,
diff --git a/docs/zh/seatunnel-engine/rest-api-v1.md
b/docs/zh/seatunnel-engine/rest-api-v1.md
index 5154922ec0..a59c6bbde5 100644
--- a/docs/zh/seatunnel-engine/rest-api-v1.md
+++ b/docs/zh/seatunnel-engine/rest-api-v1.md
@@ -119,10 +119,19 @@ network:
},
"createTime": "",
"jobDag": {
- "vertices": [
+ "jobId": "",
+ "envOptions": [],
+ "vertexInfoMap": [
+ {
+ "vertexId": 1,
+ "type": "",
+ "vertexName": "",
+ "tablePaths": [
+ ""
+ ]
+ }
],
- "edges": [
- ]
+ "pipelineEdges": {}
},
"pluginJarsUrls": [
],
@@ -160,6 +169,7 @@ network:
"createTime": "",
"jobDag": {
"jobId": "",
+ "envOptions": [],
"vertexInfoMap": [
{
"vertexId": 1,
@@ -239,6 +249,7 @@ network:
"createTime": "",
"jobDag": {
"jobId": "",
+ "envOptions": [],
"vertexInfoMap": [
{
"vertexId": 1,
@@ -305,6 +316,7 @@ network:
"finishTime": "",
"jobDag": {
"jobId": "",
+ "envOptions": [],
"vertexInfoMap": [
{
"vertexId": 1,
@@ -316,7 +328,8 @@ network:
}
],
"pipelineEdges": {}
- }, "metrics": ""
+ },
+ "metrics": ""
}
]
```
diff --git a/docs/zh/seatunnel-engine/rest-api-v2.md
b/docs/zh/seatunnel-engine/rest-api-v2.md
index df884fa18e..75a03f93fd 100644
--- a/docs/zh/seatunnel-engine/rest-api-v2.md
+++ b/docs/zh/seatunnel-engine/rest-api-v2.md
@@ -80,14 +80,21 @@ seatunnel:
"jobId": "",
"jobName": "",
"jobStatus": "",
- "envOptions": {
- },
"createTime": "",
"jobDag": {
- "vertices": [
+ "jobId": "",
+ "envOptions": [],
+ "vertexInfoMap": [
+ {
+ "vertexId": 1,
+ "type": "",
+ "vertexName": "",
+ "tablePaths": [
+ ""
+ ]
+ }
],
- "edges": [
- ]
+ "pipelineEdges": {}
},
"pluginJarsUrls": [
],
@@ -125,6 +132,7 @@ seatunnel:
"createTime": "",
"jobDag": {
"jobId": "",
+ "envOptions": [],
"vertexInfoMap": [
{
"vertexId": 1,
@@ -204,6 +212,7 @@ seatunnel:
"createTime": "",
"jobDag": {
"jobId": "",
+ "envOptions": [],
"vertexInfoMap": [
{
"vertexId": 1,
@@ -270,6 +279,7 @@ seatunnel:
"finishTime": "",
"jobDag": {
"jobId": "",
+ "envOptions": [],
"vertexInfoMap": [
{
"vertexId": 1,
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
index 8e5b15cc3d..0dd90eddda 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/RestApiIT.java
@@ -280,6 +280,55 @@ public class RestApiIT {
+
RestConstant.RUNNING_JOBS_URL)
.then()
.statusCode(200)
+ .body(
+ "[0].jobDag.jobId",
+ equalTo(
+
Long.toString(
+
clientJobProxy
+
.getJobId())))
+
.body("[0].jobDag.pipelineEdges", hasKey("1"))
+ .body(
+
"[0].jobDag.pipelineEdges['1']",
+ hasSize(1))
+ .body(
+
"[0].jobDag.pipelineEdges['1'][0].inputVertexId",
+ equalTo("1"))
+ .body(
+
"[0].jobDag.pipelineEdges['1'][0].targetVertexId",
+ equalTo("2"))
+
.body("[0].jobDag.vertexInfoMap", hasSize(2))
+ .body(
+
"[0].jobDag.vertexInfoMap[0].vertexId",
+ equalTo(1))
+ .body(
+
"[0].jobDag.vertexInfoMap[0].type",
+ equalTo("source"))
+ .body(
+
"[0].jobDag.vertexInfoMap[0].vertexName",
+ equalTo(
+
"pipeline-1 [Source[0]-FakeSource]"))
+ .body(
+
"[0].jobDag.vertexInfoMap[0].tablePaths[0]",
+ equalTo("fake"))
+ .body(
+
"[0].jobDag.vertexInfoMap[1].vertexId",
+ equalTo(2))
+ .body(
+
"[0].jobDag.vertexInfoMap[1].type",
+ equalTo("sink"))
+ .body(
+
"[0].jobDag.vertexInfoMap[1].vertexName",
+ equalTo(
+
"pipeline-1 [Sink[0]-LocalFile-MultiTableSink]"))
+ .body(
+
"[0].jobDag.vertexInfoMap[1].tablePaths[0]",
+ equalTo("fake"))
+ .body(
+
"[0].jobDag.envOptions.'job.mode'",
+
equalTo("STREAMING"))
+ .body(
+
"[0].jobDag.envOptions.'checkpoint.interval'",
+ equalTo("5000"))
.body("[0].jobName",
equalTo("fake_to_file"))
.body("[0].jobStatus",
equalTo("RUNNING"));
@@ -293,6 +342,55 @@ public class RestApiIT {
+
RestConstant.RUNNING_JOBS_URL)
.then()
.statusCode(200)
+ .body(
+ "[0].jobDag.jobId",
+ equalTo(
+
Long.toString(
+
clientJobProxy
+
.getJobId())))
+
.body("[0].jobDag.pipelineEdges", hasKey("1"))
+ .body(
+
"[0].jobDag.pipelineEdges['1']",
+ hasSize(1))
+ .body(
+
"[0].jobDag.pipelineEdges['1'][0].inputVertexId",
+ equalTo("1"))
+ .body(
+
"[0].jobDag.pipelineEdges['1'][0].targetVertexId",
+ equalTo("2"))
+
.body("[0].jobDag.vertexInfoMap", hasSize(2))
+ .body(
+
"[0].jobDag.vertexInfoMap[0].vertexId",
+ equalTo(1))
+ .body(
+
"[0].jobDag.vertexInfoMap[0].type",
+ equalTo("source"))
+ .body(
+
"[0].jobDag.vertexInfoMap[0].vertexName",
+ equalTo(
+
"pipeline-1 [Source[0]-FakeSource]"))
+ .body(
+
"[0].jobDag.vertexInfoMap[0].tablePaths[0]",
+ equalTo("fake"))
+ .body(
+
"[0].jobDag.vertexInfoMap[1].vertexId",
+ equalTo(2))
+ .body(
+
"[0].jobDag.vertexInfoMap[1].type",
+ equalTo("sink"))
+ .body(
+
"[0].jobDag.vertexInfoMap[1].vertexName",
+ equalTo(
+
"pipeline-1 [Sink[0]-LocalFile-MultiTableSink]"))
+ .body(
+
"[0].jobDag.vertexInfoMap[1].tablePaths[0]",
+ equalTo("fake"))
+ .body(
+
"[0].jobDag.envOptions.'job.mode'",
+
equalTo("STREAMING"))
+ .body(
+
"[0].jobDag.envOptions.'checkpoint.interval'",
+ equalTo("5000"))
.body("[0].jobName",
equalTo("fake_to_file"))
.body("[0].jobStatus",
equalTo("RUNNING"));
}));
@@ -314,6 +412,57 @@ public class RestApiIT {
+
batchJobProxy.getJobId())
.then()
.statusCode(200)
+ .body(
+ "jobDag.jobId",
+ equalTo(
+ Long.toString(
+
batchJobProxy.getJobId())))
+ .body("jobDag.pipelineEdges",
hasKey("1"))
+
.body("jobDag.pipelineEdges['1']", hasSize(1))
+ .body(
+
"jobDag.pipelineEdges['1'][0].inputVertexId",
+ equalTo("1"))
+ .body(
+
"jobDag.pipelineEdges['1'][0].targetVertexId",
+ equalTo("2"))
+ .body("jobDag.vertexInfoMap",
hasSize(2))
+ .body(
+
"jobDag.vertexInfoMap[0].vertexId",
+ equalTo(1))
+ .body(
+
"jobDag.vertexInfoMap[0].type",
+ equalTo("source"))
+ .body(
+
"jobDag.vertexInfoMap[0].vertexName",
+ equalTo(
+ "pipeline-1
[Source[0]-FakeSource]"))
+ .body(
+
"jobDag.vertexInfoMap[0].tablePaths[0]",
+ equalTo("fake"))
+ .body(
+
"jobDag.vertexInfoMap[1].vertexId",
+ equalTo(2))
+ .body(
+
"jobDag.vertexInfoMap[1].type",
+ equalTo("sink"))
+ .body(
+
"jobDag.vertexInfoMap[1].vertexName",
+ equalTo(
+ "pipeline-1
[Sink[0]-console-MultiTableSink]"))
+ .body(
+
"jobDag.vertexInfoMap[1].tablePaths[0]",
+ equalTo("fake"))
+ .body(
+
"metrics.TableSourceReceivedCount.fake",
+ equalTo("5"))
+ .body(
+
"metrics.TableSinkWriteCount.fake",
+ equalTo("5"))
+
.body("metrics.SinkWriteCount", equalTo("5"))
+
.body("metrics.SourceReceivedCount", equalTo("5"))
+ .body(
+
"jobDag.envOptions.'job.mode'",
+ equalTo("BATCH"))
.body("jobName",
equalTo("fake_to_console"))
.body("jobStatus",
equalTo("FINISHED"));
@@ -369,6 +518,17 @@ public class RestApiIT {
.body(
"jobDag.vertexInfoMap[1].tablePaths[0]",
equalTo("fake"))
+ .body(
+
"metrics.TableSourceReceivedCount.fake",
+ equalTo("5"))
+ .body(
+
"metrics.TableSinkWriteCount.fake",
+ equalTo("5"))
+
.body("metrics.SinkWriteCount", equalTo("5"))
+
.body("metrics.SourceReceivedCount", equalTo("5"))
+ .body(
+
"jobDag.envOptions.'job.mode'",
+ equalTo("BATCH"))
.body("jobName",
equalTo("fake_to_console"))
.body("jobStatus",
equalTo("FINISHED"));
});
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
index ee6326acbd..aea57beef3 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath;
import com.hazelcast.internal.json.JsonArray;
import com.hazelcast.internal.json.JsonObject;
+import com.hazelcast.internal.util.JsonUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@@ -34,6 +35,7 @@ import java.util.Map;
@Data
public class JobDAGInfo implements Serializable {
Long jobId;
+ Map<String, Object> envOptions;
Map<Integer, List<Edge>> pipelineEdges;
Map<Long, VertexInfo> vertexInfoMap;
@@ -54,6 +56,8 @@ public class JobDAGInfo implements Serializable {
JsonObject jsonObject = new JsonObject();
jsonObject.add("jobId", jobId.toString());
jsonObject.add("pipelineEdges", pipelineEdgesJsonObject);
+ jsonObject.add("envOptions", JsonUtil.toJsonObject(envOptions));
+
JsonArray vertexInfoMapString = new JsonArray();
for (Map.Entry<Long, VertexInfo> entry : vertexInfoMap.entrySet()) {
JsonObject vertexInfoJsonObj = new JsonObject();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
index e7b41de73d..bbe6d77e00 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
@@ -86,7 +86,10 @@ public class DAGUtils {
});
});
return new JobDAGInfo(
- jobImmutableInformation.getJobId(), pipelineWithEdges,
vertexInfoMap);
+ jobImmutableInformation.getJobId(),
+ logicalDag.getJobConfig().getEnvOptions(),
+ pipelineWithEdges,
+ vertexInfoMap);
} else {
// Generate LogicalPlan DAG
List<Edge> edges =
@@ -130,7 +133,10 @@ public class DAGUtils {
},
Collectors.toList()));
return new JobDAGInfo(
- jobImmutableInformation.getJobId(), pipelineWithEdges,
vertexInfoMap);
+ jobImmutableInformation.getJobId(),
+ logicalDag.getJobConfig().getEnvOptions(),
+ pipelineWithEdges,
+ vertexInfoMap);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java
index eab9ecbd34..6890421f9f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/metrics/TaskMetricsCalcContext.java
@@ -122,14 +122,13 @@ public class TaskMetricsCalcContext {
}
}
- public void updateMetrics(Object data) {
+ public void updateMetrics(Object data, String tableId) {
count.inc();
QPS.markEvent();
if (data instanceof SeaTunnelRow) {
SeaTunnelRow row = (SeaTunnelRow) data;
bytes.inc(row.getBytesSize());
bytesPerSeconds.markEvent(row.getBytesSize());
- String tableId = row.getTableId();
if (StringUtils.isNotBlank(tableId)) {
String tableName = TablePath.of(tableId).getFullName();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
index d052629f2e..b860dbc1c7 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/RestHttpGetCommandProcessor.java
@@ -35,6 +35,7 @@ import
org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobInfo;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.dag.DAGUtils;
import org.apache.seatunnel.engine.server.log.Log4j2HttpGetCommandProcessor;
import org.apache.seatunnel.engine.server.master.JobHistoryService.JobState;
import
org.apache.seatunnel.engine.server.operation.GetClusterHealthMetricsOperation;
@@ -692,19 +693,23 @@ public class RestHttpGetCommandProcessor extends
HttpCommandProcessor<HttpGetCom
jobStatus =
seaTunnelServer.getCoordinatorService().getJobStatus(jobId);
}
+ JobDAGInfo jobDAGInfo =
+ DAGUtils.getJobDAGInfo(
+ logicalDag,
+ jobImmutableInformation,
+
getSeaTunnelServer(false).getSeaTunnelConfig().getEngineConfig(),
+ true);
+
jobInfoJson
.add(RestConstant.JOB_ID, String.valueOf(jobId))
.add(RestConstant.JOB_NAME,
logicalDag.getJobConfig().getName())
.add(RestConstant.JOB_STATUS, jobStatus.toString())
- .add(
- RestConstant.ENV_OPTIONS,
-
JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions()))
.add(
RestConstant.CREATE_TIME,
DateTimeUtils.toString(
jobImmutableInformation.getCreateTime(),
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
- .add(RestConstant.JOB_DAG, logicalDag.getLogicalDagAsJson())
+ .add(RestConstant.JOB_DAG, jobDAGInfo.toJsonObject())
.add(
RestConstant.PLUGIN_JARS_URLS,
(JsonValue)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
index ce5fc74d3c..5553e2c85e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/BaseServlet.java
@@ -37,6 +37,7 @@ import org.apache.seatunnel.engine.core.job.JobInfo;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.CoordinatorService;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.dag.DAGUtils;
import org.apache.seatunnel.engine.server.master.JobHistoryService;
import org.apache.seatunnel.engine.server.operation.CancelJobOperation;
import org.apache.seatunnel.engine.server.operation.GetJobMetricsOperation;
@@ -57,6 +58,7 @@ import com.hazelcast.internal.json.JsonArray;
import com.hazelcast.internal.json.JsonObject;
import com.hazelcast.internal.json.JsonValue;
import com.hazelcast.internal.serialization.Data;
+import com.hazelcast.internal.util.JsonUtil;
import com.hazelcast.jet.impl.execution.init.CustomClassLoadedObject;
import com.hazelcast.spi.impl.NodeEngineImpl;
@@ -74,7 +76,6 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
-import static com.hazelcast.internal.util.JsonUtil.toJsonObject;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_BYTES_PER_SECONDS;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
@@ -182,19 +183,26 @@ public class BaseServlet extends HttpServlet {
jobStatus =
seaTunnelServer.getCoordinatorService().getJobStatus(jobId);
}
+ JobDAGInfo jobDAGInfo =
+ DAGUtils.getJobDAGInfo(
+ logicalDag,
+ jobImmutableInformation,
+
getSeaTunnelServer(false).getSeaTunnelConfig().getEngineConfig(),
+ true);
+
jobInfoJson
.add(RestConstant.JOB_ID, String.valueOf(jobId))
.add(RestConstant.JOB_NAME,
logicalDag.getJobConfig().getName())
.add(RestConstant.JOB_STATUS, jobStatus.toString())
.add(
RestConstant.ENV_OPTIONS,
-
toJsonObject(logicalDag.getJobConfig().getEnvOptions()))
+
JsonUtil.toJsonObject(logicalDag.getJobConfig().getEnvOptions()))
.add(
RestConstant.CREATE_TIME,
DateTimeUtils.toString(
jobImmutableInformation.getCreateTime(),
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
- .add(RestConstant.JOB_DAG, logicalDag.getLogicalDagAsJson())
+ .add(RestConstant.JOB_DAG, jobDAGInfo.toJsonObject())
.add(
RestConstant.PLUGIN_JARS_URLS,
(JsonValue)
@@ -210,7 +218,7 @@ public class BaseServlet extends HttpServlet {
.add(
RestConstant.IS_START_WITH_SAVE_POINT,
jobImmutableInformation.isStartWithSavePoint())
- .add(RestConstant.METRICS,
toJsonObject(getJobMetrics(jobMetrics)));
+ .add(RestConstant.METRICS,
metricsToJsonObject(getJobMetrics(jobMetrics)));
return jobInfoJson;
}
@@ -288,7 +296,7 @@ public class BaseServlet extends HttpServlet {
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS))
.add(RestConstant.JOB_DAG, jobDAGInfo.toJsonObject())
.add(RestConstant.PLUGIN_JARS_URLS, new JsonArray())
- .add(RestConstant.METRICS,
toJsonObject(getJobMetrics(jobMetrics)));
+ .add(RestConstant.METRICS,
metricsToJsonObject(getJobMetrics(jobMetrics)));
}
private Map<String, Object> getJobMetrics(String jobMetrics) {
@@ -575,4 +583,17 @@ public class BaseServlet extends HttpServlet {
jobImmutableInformation.isStartWithSavePoint());
voidPassiveCompletableFuture.join();
}
+
+ private JsonObject metricsToJsonObject(Map<String, Object> jobMetrics) {
+ JsonObject members = new JsonObject();
+ jobMetrics.forEach(
+ (key, value) -> {
+ if (value instanceof Map) {
+ members.add(key, metricsToJsonObject((Map<String,
Object>) value));
+ } else {
+ members.add(key, value.toString());
+ }
+ });
+ return members;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/JobInfoServlet.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/JobInfoServlet.java
index 16683e6826..d41635a9eb 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/JobInfoServlet.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/servlet/JobInfoServlet.java
@@ -48,18 +48,18 @@ public class JobInfoServlet extends BaseServlet {
if (jobId != null && jobId.length() > 1) {
jobId = jobId.substring(1);
} else {
- jobId = "";
+ throw new IllegalArgumentException("The jobId must not be empty.");
}
IMap<Object, Object> jobInfoMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_RUNNING_JOB_INFO);
- JobInfo jobInfo = (JobInfo) jobInfoMap.get(Long.valueOf(jobId));
+ Object jobInfo = jobInfoMap.get(Long.valueOf(jobId));
IMap<Object, Object> finishedJobStateMap =
nodeEngine.getHazelcastInstance().getMap(Constant.IMAP_FINISHED_JOB_STATE);
- JobState finishedJobState = (JobState)
finishedJobStateMap.get(Long.valueOf(jobId));
- if (!jobId.isEmpty() && jobInfo != null) {
- writeJson(resp, convertToJson(jobInfo, Long.parseLong(jobId)));
- } else if (!jobId.isEmpty() && finishedJobState != null) {
+ Object finishedJobState = finishedJobStateMap.get(Long.valueOf(jobId));
+ if (jobInfo != null) {
+ writeJson(resp, convertToJson((JobInfo) jobInfo,
Long.parseLong(jobId)));
+ } else if (finishedJobState != null) {
JobMetrics finishedJobMetrics =
(JobMetrics)
nodeEngine
@@ -75,7 +75,7 @@ public class JobInfoServlet extends BaseServlet {
writeJson(
resp,
getJobInfoJson(
- finishedJobState,
+ (JobState) finishedJobState,
finishedJobMetrics.toJsonString(),
finishedJobDAGInfo));
} else {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
index 7f2e34bdcb..53d206874a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SeaTunnelSourceCollector.java
@@ -107,7 +107,7 @@ public class SeaTunnelSourceCollector<T> implements
Collector<T> {
"Unsupported row type: " +
rowType.getClass().getName());
}
flowControlGate.audit((SeaTunnelRow) row);
- taskMetricsCalcContext.updateMetrics(row);
+ taskMetricsCalcContext.updateMetrics(row, tableId);
}
sendRecordToNext(new Record<>(row));
emptyThisPollNext = false;
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
index 5970d9a745..295328d821 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/flow/SinkFlowLifeCycle.java
@@ -23,12 +23,15 @@ import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SinkWriter.Context;
import org.apache.seatunnel.api.sink.SupportResourceShare;
import org.apache.seatunnel.api.sink.event.WriterCloseEvent;
import org.apache.seatunnel.api.sink.multitablesink.MultiTableSink;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
import org.apache.seatunnel.api.table.type.Record;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.engine.core.checkpoint.InternalCheckpointListener;
import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
@@ -53,7 +56,9 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -70,7 +75,7 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
private final SinkAction<T, StateT, CommitInfoT, AggregatedCommitInfoT>
sinkAction;
private SinkWriter<T, CommitInfoT, StateT> writer;
- private SinkWriter.Context writerContext;
+ private Context writerContext;
private transient Optional<Serializer<CommitInfoT>> commitInfoSerializer;
private transient Optional<Serializer<StateT>> writerStateSerializer;
@@ -97,6 +102,9 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
private EventListener eventListener;
+ /** Mapping relationship between upstream tablepath and downstream
tablepath. */
+ private final Map<TablePath, TablePath> tablesMaps = new HashMap<>();
+
public SinkFlowLifeCycle(
SinkAction<T, StateT, CommitInfoT, AggregatedCommitInfoT>
sinkAction,
TaskLocation taskLocation,
@@ -118,6 +126,21 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
boolean isMulti = sinkAction.getSink() instanceof MultiTableSink;
if (isMulti) {
sinkTables = ((MultiTableSink)
sinkAction.getSink()).getSinkTables();
+ String[] upstreamTablePaths =
+ ((MultiTableSink) sinkAction.getSink())
+ .getSinks()
+ .keySet()
+ .toArray(new String[0]);
+ for (int i = 0; i < ((MultiTableSink)
sinkAction.getSink()).getSinks().size(); i++) {
+ tablesMaps.put(TablePath.of(upstreamTablePaths[i]),
sinkTables.get(i));
+ }
+ } else {
+ Optional<CatalogTable> catalogTable =
sinkAction.getSink().getWriteCatalogTable();
+ if (catalogTable.isPresent()) {
+ sinkTables.add(catalogTable.get().getTablePath());
+ } else {
+ sinkTables.add(TablePath.DEFAULT);
+ }
}
this.taskMetricsCalcContext =
new TaskMetricsCalcContext(metricsContext, PluginType.SINK,
isMulti, sinkTables);
@@ -246,8 +269,39 @@ public class SinkFlowLifeCycle<T, CommitInfoT extends
Serializable, AggregatedCo
if (prepareClose) {
return;
}
+ String tableId = "";
writer.write((T) record.getData());
- taskMetricsCalcContext.updateMetrics(record.getData());
+ if (record.getData() instanceof SeaTunnelRow) {
+ if (this.sinkAction.getSink() instanceof MultiTableSink) {
+ if (((SeaTunnelRow) record.getData()).getTableId() ==
null
+ || ((SeaTunnelRow)
record.getData()).getTableId().isEmpty()) {
+ tableId = ((SeaTunnelRow)
record.getData()).getTableId();
+ } else {
+
+ TablePath tablePath =
+ tablesMaps.get(
+ TablePath.of(
+ ((SeaTunnelRow)
record.getData())
+ .getTableId()));
+ tableId =
+ tablePath != null
+ ? tablePath.getFullName()
+ : TablePath.DEFAULT.getFullName();
+ }
+
+ } else {
+ Optional<CatalogTable> writeCatalogTable =
+
this.sinkAction.getSink().getWriteCatalogTable();
+ tableId =
+ writeCatalogTable
+ .map(
+ catalogTable ->
+
catalogTable.getTablePath().getFullName())
+
.orElseGet(TablePath.DEFAULT::getFullName);
+ }
+
+ taskMetricsCalcContext.updateMetrics(record.getData(),
tableId);
+ }
}
} catch (Exception e) {
throw new RuntimeException(e);