This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 20413fac5 [Feature][Zeta] Zeta support print metrics information 
(#3913)
20413fac5 is described below

commit 20413fac59738da29884f7cf277bed1c568ee5fe
Author: Tyrantlucifer <[email protected]>
AuthorDate: Thu Jan 12 16:46:37 2023 +0800

    [Feature][Zeta] Zeta support print metrics information (#3913)
---
 config/seatunnel.yaml                              |  3 +-
 .../seatunnel/common/utils/StringFormatUtils.java  | 40 ++++++++++
 .../common/utils/StringFormatUtilsTest.java        | 56 ++++++++++++++
 .../seatunnel/command/ClientExecuteCommand.java    | 56 +++++++++++++-
 .../seatunnel/engine/client/SeaTunnelClient.java   | 26 +++++++
 .../engine/client/job/JobMetricsRunner.java        | 86 ++++++++++++++++++++++
 .../engine/common/config/EngineConfig.java         |  6 ++
 .../config/YamlSeaTunnelDomConfigProcessor.java    |  3 +
 .../common/config/server/ServerConfigOptions.java  |  2 +
 .../engine/server/CoordinatorService.java          |  9 +--
 10 files changed, 275 insertions(+), 12 deletions(-)

diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml
index 5b7d7f572..3e576e3d6 100644
--- a/config/seatunnel.yaml
+++ b/config/seatunnel.yaml
@@ -18,7 +18,8 @@
 seatunnel:
   engine:
     backup-count: 1
-    print-execution-info-interval: 10
+    print-execution-info-interval: 60
+    print-job-metrics-info-interval: 60
     slot-service:
       dynamic-slot: true
     checkpoint:
diff --git 
a/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/StringFormatUtils.java
 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/StringFormatUtils.java
new file mode 100644
index 000000000..cad9d667c
--- /dev/null
+++ 
b/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/StringFormatUtils.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+import java.util.Collections;
+
+public class StringFormatUtils {
+    private static final int NUM = 47;
+
+    private StringFormatUtils() {
+        // utility class can not be instantiated
+    }
+
+    public static String formatTable(Object... objects) {
+        String title = objects[0].toString();
+        int blankNum = (NUM - title.length()) / 2;
+        int kvNum = (objects.length - 1) / 2;
+        String template = "\n" + 
"***********************************************" +
+                "\n" + String.join("", Collections.nCopies(blankNum, " ")) + 
"%s" +
+                "\n" + "***********************************************" +
+                "\n" + String.join("", Collections.nCopies(kvNum, "%-26s: 
%19s\n"))
+                + "***********************************************\n";
+        return String.format(template, objects);
+    }
+}
diff --git 
a/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/StringFormatUtilsTest.java
 
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/StringFormatUtilsTest.java
new file mode 100644
index 000000000..ce1632ac5
--- /dev/null
+++ 
b/seatunnel-common/src/test/java/org/apache/seatunnel/common/utils/StringFormatUtilsTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.common.utils;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+public class StringFormatUtilsTest {
+    @Test
+    public void testStringFormat() {
+        String s = StringFormatUtils.formatTable("Job Statistic Information",
+                "Start Time",
+                "2023-01-11 00:00:00",
+
+                "End Time",
+                "2023-01-11 00:00:00",
+
+                "Total Time(s)",
+                0,
+
+                "Total Read Count",
+                0,
+
+                "Total Write Count",
+                0,
+
+                "Total Failed Count",
+                0);
+        Assertions.assertEquals(s, "\n" +
+                "***********************************************\n" +
+                "           Job Statistic Information\n" +
+                "***********************************************\n" +
+                "Start Time                : 2023-01-11 00:00:00\n" +
+                "End Time                  : 2023-01-11 00:00:00\n" +
+                "Total Time(s)             :                   0\n" +
+                "Total Read Count          :                   0\n" +
+                "Total Write Count         :                   0\n" +
+                "Total Failed Count        :                   0\n" +
+                "***********************************************\n");
+    }
+}
diff --git 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
index fd52bc2d0..0fbd3ee5e 100644
--- 
a/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.core.starter.seatunnel.command;
 
 import static 
org.apache.seatunnel.core.starter.utils.FileUtils.checkConfigExist;
 
+import org.apache.seatunnel.common.utils.DateTimeUtils;
+import org.apache.seatunnel.common.utils.StringFormatUtils;
 import org.apache.seatunnel.core.starter.command.Command;
 import org.apache.seatunnel.core.starter.enums.MasterType;
 import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
@@ -27,6 +29,7 @@ import org.apache.seatunnel.core.starter.utils.FileUtils;
 import org.apache.seatunnel.engine.client.SeaTunnelClient;
 import org.apache.seatunnel.engine.client.job.ClientJobProxy;
 import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
+import org.apache.seatunnel.engine.client.job.JobMetricsRunner;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
@@ -38,8 +41,13 @@ import com.hazelcast.instance.impl.HazelcastInstanceFactory;
 import lombok.extern.slf4j.Slf4j;
 
 import java.nio.file.Path;
+import java.time.Duration;
+import java.time.LocalDateTime;
 import java.util.Random;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
 /**
  * This command is used to execute the SeaTunnel engine job by SeaTunnel API.
@@ -53,11 +61,15 @@ public class ClientExecuteCommand implements 
Command<ClientCommandArgs> {
         this.clientCommandArgs = clientCommandArgs;
     }
 
-    @SuppressWarnings("checkstyle:RegexpSingleline")
+    @SuppressWarnings({"checkstyle:RegexpSingleline", 
"checkstyle:MagicNumber"})
     @Override
     public void execute() throws CommandExecuteException {
         HazelcastInstance instance = null;
         SeaTunnelClient engineClient = null;
+        ScheduledExecutorService executorService = null;
+        JobMetricsRunner.JobMetricsSummary jobMetricsSummary = null;
+        LocalDateTime startTime = LocalDateTime.now();
+        LocalDateTime endTime = LocalDateTime.now();
         SeaTunnelConfig seaTunnelConfig = 
ConfigProvider.locateAndGetSeaTunnelConfig();
         try {
             String clusterName = clientCommandArgs.getClusterName();
@@ -87,11 +99,22 @@ public class ClientExecuteCommand implements 
Command<ClientCommandArgs> {
                 jobConfig.setName(clientCommandArgs.getJobName());
                 JobExecutionEnvironment jobExecutionEnv =
                     engineClient.createExecutionContext(configFile.toString(), 
jobConfig);
-
+                // get job start time
+                startTime = LocalDateTime.now();
+                // create job proxy
                 ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
-                clientJobProxy.waitForJobComplete();
+                // get job id
                 long jobId = clientJobProxy.getJobId();
-                System.out.println(engineClient.getJobMetrics(jobId));
+                JobMetricsRunner jobMetricsRunner = new 
JobMetricsRunner(engineClient, jobId);
+                executorService = Executors.newSingleThreadScheduledExecutor();
+                executorService.scheduleAtFixedRate(jobMetricsRunner, 0,
+                        
seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(), 
TimeUnit.SECONDS);
+                // wait for job complete
+                clientJobProxy.waitForJobComplete();
+                // get job end time
+                endTime = LocalDateTime.now();
+                // get job statistic information when job finished
+                jobMetricsSummary = engineClient.getJobMetricsSummary(jobId);
             }
         } catch (ExecutionException | InterruptedException e) {
             throw new CommandExecuteException("SeaTunnel job executed failed", 
e);
@@ -102,6 +125,31 @@ public class ClientExecuteCommand implements 
Command<ClientCommandArgs> {
             if (instance != null) {
                 instance.shutdown();
             }
+            if (executorService != null) {
+                executorService.shutdown();
+            }
+            if (jobMetricsSummary != null) {
+                // print job statistics information when job finished
+                log.info(StringFormatUtils.formatTable(
+                        "Job Statistic Information",
+                        "Start Time",
+                        DateTimeUtils.toString(startTime, 
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
+
+                        "End Time",
+                        DateTimeUtils.toString(endTime, 
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
+
+                        "Total Time(s)",
+                        Duration.between(startTime, endTime).getSeconds(),
+
+                        "Total Read Count",
+                        jobMetricsSummary.getSourceReadCount(),
+
+                        "Total Write Count",
+                        jobMetricsSummary.getSinkWriteCount(),
+
+                        "Total Failed Count",
+                        jobMetricsSummary.getSourceReadCount() - 
jobMetricsSummary.getSinkWriteCount()));
+            }
         }
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
index 691f25c6f..7ed4ab949 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/SeaTunnelClient.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.client;
 
 import org.apache.seatunnel.engine.client.job.JobClient;
 import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
+import 
org.apache.seatunnel.engine.client.job.JobMetricsRunner.JobMetricsSummary;
 import org.apache.seatunnel.engine.common.config.JobConfig;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
@@ -31,11 +32,15 @@ import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelGetJobStatusCode
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelListJobStatusCodec;
 import 
org.apache.seatunnel.engine.core.protocol.codec.SeaTunnelPrintMessageCodec;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.hazelcast.client.config.ClientConfig;
 import com.hazelcast.logging.ILogger;
 import lombok.NonNull;
 
 public class SeaTunnelClient implements SeaTunnelClientInstance {
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
     private final SeaTunnelHazelcastClient hazelcastClient;
 
     public SeaTunnelClient(@NonNull ClientConfig clientConfig) {
@@ -127,4 +132,25 @@ public class SeaTunnelClient implements 
SeaTunnelClientInstance {
             SeaTunnelGetJobInfoCodec::decodeResponse
         ));
     }
+
+    public JobMetricsSummary getJobMetricsSummary(Long jobId) {
+        long sourceReadCount = 0L;
+        long sinkWriteCount = 0L;
+        String jobMetrics = getJobMetrics(jobId);
+        try {
+            JsonNode jsonNode = OBJECT_MAPPER.readTree(jobMetrics);
+            JsonNode sourceReaders = jsonNode.get("SourceReceivedCount");
+            JsonNode sinkWriters = jsonNode.get("SinkWriteCount");
+            for (int i = 0; i < sourceReaders.size(); i++) {
+                JsonNode sourceReader = sourceReaders.get(i);
+                JsonNode sinkWriter = sinkWriters.get(i);
+                sourceReadCount += sourceReader.get("value").asLong();
+                sinkWriteCount += sinkWriter.get("value").asLong();
+            }
+            return new JobMetricsSummary(sourceReadCount, sinkWriteCount);
+            // Add NullPointerException because of metrics information can be 
empty like {}
+        } catch (JsonProcessingException | NullPointerException e) {
+            return new JobMetricsSummary(sourceReadCount, sinkWriteCount);
+        }
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java
new file mode 100644
index 000000000..fde095334
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobMetricsRunner.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.client.job;
+
+import org.apache.seatunnel.common.utils.DateTimeUtils;
+import org.apache.seatunnel.common.utils.StringFormatUtils;
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+
+import java.time.Duration;
+import java.time.LocalDateTime;
+
+@Slf4j
+public class JobMetricsRunner implements Runnable {
+    private final SeaTunnelClient seaTunnelClient;
+    private final Long jobId;
+    private LocalDateTime lastRunTime = LocalDateTime.now();
+    private Long lastReadCount = 0L;
+    private Long lastWriteCount = 0L;
+
+    public JobMetricsRunner(SeaTunnelClient seaTunnelClient, Long jobId) {
+        this.seaTunnelClient = seaTunnelClient;
+        this.jobId = jobId;
+    }
+
+    @Override
+    public void run() {
+        Thread.currentThread().setName("job-metrics-runner-" + jobId);
+        try {
+            JobMetricsSummary jobMetricsSummary = 
seaTunnelClient.getJobMetricsSummary(jobId);
+            LocalDateTime now = LocalDateTime.now();
+            long seconds = Duration.between(lastRunTime, now).getSeconds();
+            long averageRead = (jobMetricsSummary.getSourceReadCount() - 
lastReadCount) / seconds;
+            long averageWrite = (jobMetricsSummary.getSinkWriteCount() - 
lastWriteCount) / seconds;
+            log.info(StringFormatUtils.formatTable(
+                    "Job Progress Information",
+                    "Read Count So Far",
+                    jobMetricsSummary.getSourceReadCount(),
+
+                    "Write Count So Far",
+                    jobMetricsSummary.getSinkWriteCount(),
+
+                    "Average Read Count",
+                    averageRead + "/s",
+
+                    "Average Write Count",
+                    averageWrite + "/s",
+
+                    "Last Statistic Time",
+                    DateTimeUtils.toString(lastRunTime, 
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
+
+                    "Current Statistic Time",
+                    DateTimeUtils.toString(now, 
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS)));
+            lastRunTime = now;
+            lastReadCount = jobMetricsSummary.getSourceReadCount();
+            lastWriteCount = jobMetricsSummary.getSinkWriteCount();
+        } catch (Exception e) {
+            log.warn("Failed to get job metrics summary, it maybe first-run");
+        }
+    }
+
+    @Data
+    @AllArgsConstructor
+    public static class JobMetricsSummary {
+        private long sourceReadCount;
+        private long sinkWriteCount;
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index 365e4e887..0037d0df8 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -32,6 +32,8 @@ public class EngineConfig {
     private int backupCount = ServerConfigOptions.BACKUP_COUNT.defaultValue();
     private int printExecutionInfoInterval = 
ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.defaultValue();
 
+    private int printJobMetricsInfoInterval = 
ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.defaultValue();
+
     private SlotServiceConfig slotServiceConfig = 
ServerConfigOptions.SLOT_SERVICE.defaultValue();
 
     private CheckpointConfig checkpointConfig = 
ServerConfigOptions.CHECKPOINT.defaultValue();
@@ -46,4 +48,8 @@ public class EngineConfig {
         this.printExecutionInfoInterval = printExecutionInfoInterval;
     }
 
+    public void setPrintJobMetricsInfoInterval(int 
printJobMetricsInfoInterval) {
+        checkPositive(printExecutionInfoInterval, 
ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL + " must be > 0");
+        this.printJobMetricsInfoInterval = printJobMetricsInfoInterval;
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index 22e137b09..229d40f1c 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -98,6 +98,9 @@ public class YamlSeaTunnelDomConfigProcessor extends 
AbstractDomConfigProcessor
             } else if 
(ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key().equals(name)) {
                 
engineConfig.setPrintExecutionInfoInterval(getIntegerValue(ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.key(),
                     getTextContent(node)));
+            } else if 
(ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.key().equals(name)) {
+                
engineConfig.setPrintJobMetricsInfoInterval(getIntegerValue(ServerConfigOptions.PRINT_JOB_METRICS_INFO_INTERVAL.key(),
+                    getTextContent(node)));
             } else if (ServerConfigOptions.SLOT_SERVICE.key().equals(name)) {
                 
engineConfig.setSlotServiceConfig(parseSlotServiceConfig(node));
             } else if (ServerConfigOptions.CHECKPOINT.key().equals(name)) {
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index 5dc5e0080..6b782bc8b 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -31,6 +31,8 @@ public class ServerConfigOptions {
 
     public static final Option<Integer> PRINT_EXECUTION_INFO_INTERVAL = 
Options.key("print-execution-info-interval").intType().defaultValue(60).withDescription("The
 interval (in seconds) between two consecutive executions of the print 
execution info task.");
 
+    public static final Option<Integer> PRINT_JOB_METRICS_INFO_INTERVAL = 
Options.key("print-job-metrics-info-interval").intType().defaultValue(60).withDescription("The
 interval (in seconds) of job print metrics info");
+
     public static final Option<Boolean> DYNAMIC_SLOT = 
Options.key("dynamic-slot").booleanType().defaultValue(true).withDescription("Whether
 to use dynamic slot.");
 
     public static final Option<Integer> SLOT_NUM = 
Options.key("slot-num").intType().defaultValue(2).withDescription("The number 
of slots. Only valid when dynamic slot is disabled.");
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index d5d1fbf3c..dfc92ce38 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.engine.server;
 
 import org.apache.seatunnel.api.common.metrics.JobMetrics;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
+import org.apache.seatunnel.common.utils.StringFormatUtils;
 import org.apache.seatunnel.engine.common.Constant;
 import org.apache.seatunnel.engine.common.config.EngineConfig;
 import org.apache.seatunnel.engine.common.exception.JobException;
@@ -503,13 +504,7 @@ public class CoordinatorService {
         int poolSize = threadPoolExecutor.getPoolSize();
         long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
         long taskCount = threadPoolExecutor.getTaskCount();
-        logger.info(String.format(
-                "\n" + "***********************************************" +
-                "\n" + "     %s" +
-                "\n" + "***********************************************" +
-                "\n" + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n"
-                        + "%-26s: %19s\n" + "%-26s: %19s\n" + "%-26s: %19s\n"
-                + "***********************************************\n",
+        logger.info(StringFormatUtils.formatTable(
                 "CoordinatorService Thread Pool Status",
                 "activeCount",
                 activeCount,

Reply via email to