This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cb26360f5 [Feature][ST-Engine] ST-Engine add print jobMetrics on job
finished (#3691)
cb26360f5 is described below
commit cb26360f5871e08e93f2d6c8514640afe8a2967a
Author: ic4y <[email protected]>
AuthorDate: Sat Dec 10 14:55:18 2022 +0800
[Feature][ST-Engine] ST-Engine add print jobMetrics on job finished (#3691)
---
.../java/org/apache/seatunnel/api/common/metrics/JobMetrics.java | 2 +-
.../seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java | 8 ++++++++
.../core/starter/seatunnel/command/ClientExecuteCommand.java | 5 +++++
3 files changed, 14 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
index 7c52013a6..a9f8c8054 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/metrics/JobMetrics.java
@@ -149,7 +149,7 @@ public final class JobMetrics implements Serializable {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS,
false);
try {
- return objectMapper.writeValueAsString(this.metrics);
+ return
objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(this.metrics);
} catch (JsonProcessingException e) {
ObjectNode objectNode = objectMapper.createObjectNode();
objectNode.put("err", "serialize JobMetrics err");
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
index 920e0162e..eb87196d6 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ClientCommandArgs.java
@@ -53,6 +53,10 @@ public class ClientCommandArgs extends AbstractCommandArgs {
description = "Cancel job by JobId")
private String cancelJobId;
+ @Parameter(names = {"-m", "--metrics"},
+ description = "Get job metrics by JobId")
+ private String metricsJobId;
+
@Parameter(names = {"-l", "--list"},
description = "list job status")
private boolean listJob = false;
@@ -89,6 +93,10 @@ public class ClientCommandArgs extends AbstractCommandArgs {
return cancelJobId;
}
+ public String getMetricsJobId() {
+ return metricsJobId;
+ }
+
public boolean isListJob(){
return listJob;
}
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index c511b0514..08a98289d 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -77,6 +77,9 @@ public class ClientExecuteCommand implements
Command<ClientCommandArgs> {
System.out.println(jobState);
} else if (null != clientCommandArgs.getCancelJobId()) {
engineClient.cancelJob(Long.parseLong(clientCommandArgs.getCancelJobId()));
+ } else if (null != clientCommandArgs.getMetricsJobId()) {
+ String jobMetrics =
engineClient.getJobMetrics(Long.parseLong(clientCommandArgs.getMetricsJobId()));
+ System.out.println(jobMetrics);
} else {
Path configFile = FileUtils.getConfigPath(clientCommandArgs);
checkConfigExist(configFile);
@@ -87,6 +90,8 @@ public class ClientExecuteCommand implements
Command<ClientCommandArgs> {
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
clientJobProxy.waitForJobComplete();
+ long jobId = clientJobProxy.getJobId();
+ System.out.println(engineClient.getJobMetrics(jobId));
}
} catch (ExecutionException | InterruptedException e) {
throw new CommandExecuteException("SeaTunnel job executed failed",
e);