This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 20413fac5 [Feature][Zeta] Zeta support print metrics information
(#3913)
20413fac5 is described below
commit 20413fac59738da29884f7cf277bed1c568ee5fe
Author: Tyrantlucifer <[email protected]>
AuthorDate: Thu Jan 12 16:46:37 2023 +0800
[Feature][Zeta] Zeta support print metrics information (#3913)
---
config/seatunnel.yaml | 3 +-
.../seatunnel/common/utils/StringFormatUtils.java | 40 ++++++++++
.../common/utils/StringFormatUtilsTest.java | 56 ++++++++++++++
.../seatunnel/command/ClientExecuteCommand.java | 56 +++++++++++++-
.../seatunnel/engine/client/SeaTunnelClient.java | 26 +++++++
.../engine/client/job/JobMetricsRunner.java | 86 ++++++++++++++++++++++
.../engine/common/config/EngineConfig.java | 6 ++
.../config/YamlSeaTunnelDomConfigProcessor.java | 3 +
.../common/config/server/ServerConfigOptions.java | 2 +
.../engine/server/CoordinatorService.java | 9 +--
10 files changed, 275 insertions(+), 12 deletions(-)
diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml
index 5b7d7f572..3e576e3d6 100644
--- a/config/seatunnel.yaml
+++ b/config/seatunnel.yaml
@@ -18,7 +18,8 @@
seatunnel:
engine:
backup-count: 1
- print-execution-info-interval: 10
+ print-execution-info-interval: 60
+ print-job-metrics-info-interval: 60
slot-service:
dynamic-slot: true
checkpoint:
diff --git
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/StringFormatUtils.java
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/StringFormatUtils.java
new file mode 100644
index 000000000..cad9d667c
--- /dev/null
+++
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/StringFormatUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+import java.util.Collections;
+
+public class StringFormatUtils {
+ private static final int NUM = 47;
+
+ private StringFormatUtils() {
+ // utility class can not be instantiated
+ }
+
+ public static String formatTable(Object... objects) {
+ String title = objects[0].toString();
+ int blankNum = (NUM - title.length()) / 2;
+ int kvNum = (objects.length - 1) / 2;
+ String template = "\n" +
"***********************************************" +
+ "\n" + String.join("", Collections.nCopies(blankNum, " ")) +
"%s" +
+ "\n" + "***********************************************" +
+ "\n" + String.join("", Collections.nCopies(kvNum, "%-26s:
%19s\n"))
+ + "***********************************************\n";
+ return String.format(template, objects);
+ }
+}
diff --git
a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/StringFormatUtilsTest.java
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/StringFormatUtilsTest.java
new file mode 100644
index 000000000..ce1632ac5
--- /dev/null
+++
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/StringFormatUtilsTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class StringFormatUtilsTest {
+ @Test
+ public void testStringFormat() {
+ String s = StringFormatUtils.formatTable("Job Statistic Information",
+ "Start Time",
+ "2023-01-11 00:00:00",
+
+ "End Time",
+ "2023-01-11 00:00:00",
+
+ "Total Time(s)",
+ 0,
+
+ "Total Read Count",
+ 0,
+
+ "Total Write Count",
+ 0,
+
+ "Total Failed Count",
+ 0);
+ Assertions.assertEquals(s, "\n" +
+ "***********************************************\n" +
+ " Job Statistic Information\n" +
+ "***********************************************\n" +
+ "Start Time : 2023-01-11 00:00:00\n" +
+ "End Time : 2023-01-11 00:00:00\n" +
+ "Total Time(s) : 0\n" +
+ "Total Read Count : 0\n" +
+ "Total Write Count : 0\n" +
+ "Total Failed Count : 0\n" +
+ "***********************************************\n");
+ }
+}
diff --git
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index fd52bc2d0..0fbd3ee5e 100644
---
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.core.starter.seatunnel.command;
import static
org.apache.seatunnel.core.starter.utils.FileUtils.checkConfigExist;
+import org.apache.seatunnel.common.utils.DateTimeUtils;
+import org.apache.seatunnel.common.utils.StringFormatUtils;
import org.apache.seatunnel.core.starter.command.Command;
import org.apache.seatunnel.core.starter.enums.MasterType;
import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
@@ -27,6 +29,7 @@ import org.apache.seatunnel.core.starter.utils.FileUtils;
import org.apache.seatunnel.engine.client.SeaTunnelClient;
import org.apache.seatunnel.engine.client.job.ClientJobProxy;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.job.JobMetricsRunner;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
@@ -38,8 +41,13 @@ import com.hazelcast.instance.impl.HazelcastInstanceFactory;
import lombok.extern.slf4j.Slf4j;
import java.nio.file.Path;
+import java.time.Duration;
+import java.time.LocalDateTime;
import java.util.Random;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
/**
* This command is used to execute the SeaTunnel engine job by SeaTunnel API.
@@ -53,11 +61,15 @@ public class ClientExecuteCommand implements
Command<ClientCommandArgs> {
this.clientCommandArgs = clientCommandArgs;
}
- @SuppressWarnings("checkstyle:RegexpSingleline")
+ @SuppressWarnings({"checkstyle:RegexpSingleline",
"checkstyle:MagicNumber"})
@Override
public void execute() throws CommandExecuteException {
HazelcastInstance instance = null;
SeaTunnelClient engineClient = null;
+ ScheduledExecutorService executorService = null;
+ JobMetricsRunner.JobMetricsSummary jobMetricsSummary = null;
+ LocalDateTime startTime = LocalDateTime.now();
+ LocalDateTime endTime = LocalDateTime.now();
SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
try {
String clusterName = clientCommandArgs.getClusterName();
@@ -87,11 +99,22 @@ public class ClientExecuteCommand implements
Command<ClientCommandArgs> {
jobConfig.setName(clientCommandArgs.getJobName());
JobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(configFile.toString(),
jobConfig);
-
+ // get job start time
+ startTime = LocalDateTime.now();
+ // create job proxy
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
- clientJobProxy.waitForJobComplete();
+ // get job id
long jobId = clientJobProxy.getJobId();
- System.out.println(engineClient.getJobMetrics(jobId));
+ JobMetricsRunner jobMetricsRunner = new
JobMetricsRunner(engineClient, jobId);
+ executorService = Executors.newSingleThreadScheduledExecutor();
+ executorService.scheduleAtFixedRate(jobMetricsRunner, 0,
+
seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(),
TimeUnit.SECONDS);
+ // wait for job complete
+ clientJobProxy.waitForJobComplete();
+ // get job end time
+ endTime = LocalDateTime.now();
+ // get job statistic information when job finished
+ jobMetricsSummary = engineClient.getJobMetricsSummary(jobId);
}
} catch (ExecutionException | InterruptedException e) {
throw new CommandExecuteException("SeaTunnel job executed failed",
e);
@@ -102,6 +125,31 @@ public class ClientExecuteCommand implements
Command<ClientCommandArgs> {
if (instance != null) {
instance.shutdown();
}
+ if (executorService != null) {
+ executorService.shutdown();
+ }
+ if (jobMetricsSummary != null) {
+ // print job statistics information when job finished
+ log.info(StringFormatUtils.formatTable(
+ "Job Statistic Information",
+ "Start Time",
+ DateTimeUtils.toString(startTime,
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
+
+ "End Time",
+ DateTimeUtils.toString(endTime,
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
+
+ "Total Time(s)",
+ Duration.between(startTime, endTime).getSeconds(),
+
+ "Total Read Count",
+ jobMetricsSummary.getSourceReadCount(),
+
+ "Total Write Count",
+ jobMetricsSummary.getSinkWriteCount(),
+
+ "Total Failed Count",
+ jobMetricsSummary.getSourceReadCount() -
jobMetricsSummary.getSinkWriteCount()));
+ }
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index 691f25c6f..7ed4ab949 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.client;
import org.apache.seatunnel.engine.client.job.JobClient;
import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
+import
org.apache.seatunnel.engine.client.job.JobMetricsRunner.JobMetricsSummary;
import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
@@ -31,11 +32,15 @@ import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCode
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec;
import
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.logging.ILogger;
import lombok.NonNull;
public class SeaTunnelClient implements SeaTunnelClientInstance {
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private final SeaTunnelHazelcastClient hazelcastClient;
public SeaTunnelClient(@NonNull ClientConfig clientConfig) {
@@ -127,4 +132,25 @@ public class SeaTunnelClient implements
SeaTunnelClientInstance {
SeaTunnelGetJobInfoCodec::decodeResponse
));
}
+
+ public JobMetricsSummary getJobMetricsSummary(Long jobId) {
+ long sourceReadCount = 0L;
+ long sinkWriteCount = 0L;
+ String jobMetrics = getJobMetrics(jobId);
+ try {
+ JsonNode jsonNode = OBJECT_MAPPER.readTree(jobMetrics);
+ JsonNode sourceReaders = jsonNode.get("SourceReceivedCount");
+ JsonNode sinkWriters = jsonNode.get("SinkWriteCount");
+ for (int i = 0; i < sourceReaders.size(); i++) {
+ JsonNode sourceReader = sourceReaders.get(i);
+ JsonNode sinkWriter = sinkWriters.get(i);
+ sourceReadCount += sourceReader.get("value").asLong();
+ sinkWriteCount += sinkWriter.get("value").asLong();
+ }
+ return new JobMetricsSummary(sourceReadCount, sinkWriteCount);
+ // Add NullPointerException because of metrics information can be
empty like {}
+ } catch (JsonProcessingException | NullPointerException e) {
+ return new JobMetricsSummary(sourceReadCount, sinkWriteCount);
+ }
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java
new file mode 100644
index 000000000..fde095334
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client.job;
+
+import org.apache.seatunnel.common.utils.DateTimeUtils;
+import org.apache.seatunnel.common.utils.StringFormatUtils;
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+
+@Slf4j
+public class JobMetricsRunner implements Runnable {
+ private final SeaTunnelClient seaTunnelClient;
+ private final Long jobId;
+ private LocalDateTime lastRunTime = LocalDateTime.now();
+ private Long lastReadCount = 0L;
+ private Long lastWriteCount = 0L;
+
+ public JobMetricsRunner(SeaTunnelClient seaTunnelClient, Long jobId) {
+ this.seaTunnelClient = seaTunnelClient;
+ this.jobId = jobId;
+ }
+
+ @Override
+ public void run() {
+ Thread.currentThread().setName("job-metrics-runner-" + jobId);
+ try {
+ JobMetricsSummary jobMetricsSummary =
seaTunnelClient.getJobMetricsSummary(jobId);
+ LocalDateTime now = LocalDateTime.now();
+ long seconds = Duration.between(lastRunTime, now).getSeconds();
+ long averageRead = (jobMetricsSummary.getSourceReadCount() -
lastReadCount) / seconds;
+ long averageWrite = (jobMetricsSummary.getSinkWriteCount() -
lastWriteCount) / seconds;
+ log.info(StringFormatUtils.formatTable(
+ "Job Progress Information",
+ "Read Count So Far",
+ jobMetricsSummary.getSourceReadCount(),
+
+ "Write Count So Far",
+ jobMetricsSummary.getSinkWriteCount(),
+
+ "Average Read Count",
+ averageRead + "/s",
+
+ "Average Write Count",
+ averageWrite + "/s",
+
+ "Last Statistic Time",
+ DateTimeUtils.toString(lastRunTime,
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
+
+ "Current Statistic Time",
+ DateTimeUtils.toString(now,
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)));
+ lastRunTime = now;
+ lastReadCount = jobMetricsSummary.getSourceReadCount();
+ lastWriteCount = jobMetricsSummary.getSinkWriteCount();
+ } catch (Exception e) {
+ log.warn("Failed to get job metrics summary, it maybe first-run");
+ }
+ }
+
+ @Data
+ @AllArgsConstructor
+ public static class JobMetricsSummary {
+ private long sourceReadCount;
+ private long sinkWriteCount;
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index 365e4e887..0037d0df8 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -32,6 +32,8 @@ public class EngineConfig {
private int backupCount = ServerConfigOptions.BACKUP_COUNT.defaultValue();
private int printExecutionInfoInterval =
ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.defaultValue();
+ private int printJobMetricsInfoInterval =
ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.defaultValue();
+
private SlotServiceConfig slotServiceConfig =
ServerConfigOptions.SLOT_SERVICE.defaultValue();
private CheckpointConfig checkpointConfig =
ServerConfigOptions.CHECKPOINT.defaultValue();
@@ -46,4 +48,8 @@ public class EngineConfig {
this.printExecutionInfoInterval = printExecutionInfoInterval;
}
+ public void setPrintJobMetricsInfoInterval(int
printJobMetricsInfoInterval) {
+ checkPositive(printExecutionInfoInterval,
ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL + " must be > 0");
+ this.printJobMetricsInfoInterval = printJobMetricsInfoInterval;
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index 22e137b09..229d40f1c 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -98,6 +98,9 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
} else if
(ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key().equals(name)) {
engineConfig.setPrintExecutionInfoInterval(getIntegerValue(ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key(),
getTextContent(node)));
+ } else if
(ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.key().equals(name)) {
+
engineConfig.setPrintJobMetricsInfoInterval(getIntegerValue(ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.key(),
+ getTextContent(node)));
} else if (ServerConfigOptions.SLOT_SERVICE.key().equals(name)) {
engineConfig.setSlotServiceConfig(parseSlotServiceConfig(node));
} else if (ServerConfigOptions.CHECKPOINT.key().equals(name)) {
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index 5dc5e0080..6b782bc8b 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -31,6 +31,8 @@ public class ServerConfigOptions {
public static final Option<Integer> PRINT_EXECUTION_INFO_INTERVAL =
Options.key("print-execution-info-interval").intType().defaultValue(60).withDescription("The
interval (in seconds) between two consecutive executions of the print
execution info task.");
+ public static final Option<Integer> PRINT_JOB_METRICS_INFO_INTERVAL =
Options.key("print-job-metrics-info-interval").intType().defaultValue(60).withDescription("The
interval (in seconds) of job print metrics info");
+
public static final Option<Boolean> DYNAMIC_SLOT =
Options.key("dynamic-slot").booleanType().defaultValue(true).withDescription("Whether
to use dynamic slot.");
public static final Option<Integer> SLOT_NUM =
Options.key("slot-num").intType().defaultValue(2).withDescription("The number
of slots. Only valid when dynamic slot is disabled.");
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index d5d1fbf3c..dfc92ce38 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server;
import org.apache.seatunnel.api.common.metrics.JobMetrics;
import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.StringFormatUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.EngineConfig;
import org.apache.seatunnel.engine.common.exception.JobException;
@@ -503,13 +504,7 @@ public class CoordinatorService {
int poolSize = threadPoolExecutor.getPoolSize();
long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
long taskCount = threadPoolExecutor.getTaskCount();
- logger.info(String.format(
- "\n" + "***********************************************" +
- "\n" + " %s" +
- "\n" + "***********************************************" +
- "\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n"
- + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n"
- + "***********************************************\n",
+ logger.info(StringFormatUtils.formatTable(
"CoordinatorService Thread Pool Status",
"activeCount",
activeCount,