This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5089d8a2e4 [Feature][Zeta] Support delete logs regularly (#7787)
5089d8a2e4 is described below

commit 5089d8a2e4a9bbc5467071dbd911b8339e82b165
Author: corgy-w <[email protected]>
AuthorDate: Tue Nov 5 18:16:53 2024 +0800

    [Feature][Zeta] Support delete logs regularly (#7787)
---
 config/seatunnel.yaml                              |  2 +
 docs/en/seatunnel-engine/logging.md                | 18 +++++
 docs/zh/seatunnel-engine/logging.md                | 19 +++++
 .../seatunnel/engine/e2e/joblog/JobLogIT.java      | 45 ++++++++++-
 .../src/test/resources/cluster/seatunnel.yaml      |  7 +-
 .../src/test/resources/seatunnel.yaml              |  7 +-
 .../config/YamlSeaTunnelDomConfigProcessor.java    | 23 +++++-
 .../common/config/server/ServerConfigOptions.java  | 14 ++++
 .../common/config/server/TelemetryConfig.java      |  2 +
 ...lemetryConfig.java => TelemetryLogsConfig.java} |  5 +-
 .../seatunnel/engine/common/utils/LogUtil.java     | 93 ++++++++++++++++++++++
 .../engine/core/job/ExecutionAddress.java          | 27 +++++++
 .../seatunnel/engine/core/job/JobDAGInfo.java      |  2 +
 .../engine/server/CoordinatorService.java          |  1 +
 .../seatunnel/engine/server/SeaTunnelServer.java   | 18 +++++
 .../seatunnel/engine/server/dag/DAGUtils.java      | 11 ++-
 .../engine/server/master/JobHistoryService.java    | 40 ++++++++++
 .../seatunnel/engine/server/master/JobMaster.java  | 22 ++++-
 .../engine/server/rest/service/BaseLogService.java | 72 +----------------
 .../engine/server/rest/service/BaseService.java    |  4 +-
 .../serializable/TaskDataSerializerHook.java       |  5 ++
 .../telemetry/log/TaskLogManagerService.java       | 72 +++++++++++++++++
 .../telemetry/log/operation/CleanLogOperation.java | 61 ++++++++++++++
 23 files changed, 487 insertions(+), 83 deletions(-)

diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml
index f773204eba..79a713a71e 100644
--- a/config/seatunnel.yaml
+++ b/config/seatunnel.yaml
@@ -38,6 +38,8 @@ seatunnel:
     telemetry:
       metric:
         enabled: false
+      log:
+        scheduled-deletion-enable: true
     http:
       enable-http: true
       port: 8080
diff --git a/docs/en/seatunnel-engine/logging.md 
b/docs/en/seatunnel-engine/logging.md
index 094cb15feb..be0bc12f0a 100644
--- a/docs/en/seatunnel-engine/logging.md
+++ b/docs/en/seatunnel-engine/logging.md
@@ -92,6 +92,24 @@ SeaTunnel provides an API for querying logs.
 
 For more details, please refer to the [REST-API](rest-api-v2.md).
 
+## SeaTunnel Log Configuration
+
+### Scheduled deletion of old logs
+
+SeaTunnel supports scheduled deletion of old log files to prevent disk space 
exhaustion. You can add the following configuration in the `seatunnel.yml` file:
+
+```yaml
+seatunnel:
+  engine:
+    history-job-expire-minutes: 1440
+    telemetry:
+      logs:
+        scheduled-deletion-enable: true
+```
+
+- `history-job-expire-minutes`: Sets the retention time for historical job 
data and logs (in minutes). The system will automatically clear expired job 
information and log files after the specified period.
+- `scheduled-deletion-enable`: Enable scheduled cleanup, with default value of 
`true`. The system will automatically delete relevant log files when job 
expiration time, as defined by `history-job-expire-minutes`, is reached. If 
this feature is disabled, logs will remain permanently on disk, requiring 
manual management, which may affect disk space usage. It is recommended to 
configure this setting based on specific needs.
+
 ## Best practices for developers
 
 You can create an SLF4J logger by calling 
`org.slf4j.LoggerFactory#LoggerFactory.getLogger` with the Class of your class 
as an argument.
diff --git a/docs/zh/seatunnel-engine/logging.md 
b/docs/zh/seatunnel-engine/logging.md
index 7d4f4f1d62..f97ea572e8 100644
--- a/docs/zh/seatunnel-engine/logging.md
+++ b/docs/zh/seatunnel-engine/logging.md
@@ -92,6 +92,25 @@ SeaTunnel 提供了一个 API,用于查询日志。
 
 有关详细信息,请参阅 [REST-API](rest-api-v2.md)。
 
+## SeaTunnel 日志配置
+
+### 定时删除旧日志
+
+SeaTunnel 支持定时删除旧日志文件,以避免磁盘空间不足。您可以在 `seatunnel.yml` 文件中添加以下配置:
+
+```yaml
+seatunnel:
+  engine:
+    history-job-expire-minutes: 1440
+    telemetry:
+      logs:
+        scheduled-deletion-enable: true
+```
+
+- `history-job-expire-minutes`: 
设置历史作业和日志的保留时间(单位:分钟)。系统将在指定的时间后自动清除过期的作业信息和日志文件。
+- `scheduled-deletion-enable`: 启用定时清理功能,默认为 `true`。系统将在作业达到 
`history-job-expire-minutes` 
设置的过期时间后自动删除相关日志文件。关闭该功能后,日志将永久保留在磁盘上,需要用户自行管理,否则可能影响磁盘占用。建议根据需求合理配置。
+
+
 ## 开发人员最佳实践
 
 您可以通过调用 `org.slf4j.LoggerFactory#LoggerFactory.getLogger` 并以您的类的类作为参数来创建 SLF4J 
记录器。
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java
index 1aa52488f6..4afbfacb71 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.platform.commons.util.StringUtils;
 import org.testcontainers.containers.Container;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.Network;
@@ -35,12 +36,15 @@ import org.testcontainers.containers.wait.strategy.Wait;
 import org.testcontainers.utility.DockerLoggerFactory;
 import org.testcontainers.utility.MountableFile;
 
+import com.beust.jcommander.internal.Lists;
+import com.hazelcast.jet.datamodel.Tuple2;
 import io.restassured.response.Response;
 
 import java.io.IOException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.stream.Stream;
@@ -52,7 +56,11 @@ import static org.hamcrest.Matchers.equalTo;
 public class JobLogIT extends SeaTunnelContainer {
 
     private static final String CUSTOM_JOB_NAME = "test-job-log-file";
+    private static final String CUSTOM_JOB_NAME2 = "test-job-log-file2";
+    private static final String CUSTOM_JOB_NAME3 = "test-job-log-file3";
     private static final long CUSTOM_JOB_ID = 862969647010611201L;
+    private static final long CUSTOM_JOB_ID2 = 862969647010611202L;
+    private static final long CUSTOM_JOB_ID3 = 862969647010611203L;
 
     private static final String confFile = "/fakesource_to_console.conf";
     private static final Path BIN_PATH = Paths.get(SEATUNNEL_HOME, "bin", 
SERVER_SHELL);
@@ -99,10 +107,29 @@ public class JobLogIT extends SeaTunnelContainer {
     @Test
     public void testJobLogFile() throws Exception {
         submitJobAndAssertResponse(
-                server, JobMode.STREAMING.name(), false, CUSTOM_JOB_NAME, 
CUSTOM_JOB_ID);
+                server, JobMode.BATCH.name(), false, CUSTOM_JOB_NAME, 
CUSTOM_JOB_ID);
+
+        submitJobAndAssertResponse(
+                server, JobMode.STREAMING.name(), false, CUSTOM_JOB_NAME2, 
CUSTOM_JOB_ID2);
+
+        submitJobAndAssertResponse(
+                server, JobMode.STREAMING.name(), false, CUSTOM_JOB_NAME3, 
CUSTOM_JOB_ID3);
 
         assertConsoleLog();
         assertFileLog();
+        List<Tuple2<Boolean, String>> before =
+                Lists.newArrayList(
+                        Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID + ".log"),
+                        Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID2 + ".log"),
+                        Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID3 + 
".log"));
+        assertFileLogClean(before);
+        Thread.sleep(90000);
+        List<Tuple2<Boolean, String>> after =
+                Lists.newArrayList(
+                        Tuple2.tuple2(true, "job-" + CUSTOM_JOB_ID + ".log"),
+                        Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID2 + ".log"),
+                        Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID3 + 
".log"));
+        assertFileLogClean(after);
     }
 
     private void assertConsoleLog() {
@@ -168,6 +195,22 @@ public class JobLogIT extends SeaTunnelContainer {
                         });
     }
 
+    private void assertFileLogClean(List<Tuple2<Boolean, String>> tuple2s)
+            throws IOException, InterruptedException {
+        for (Tuple2<Boolean, String> tuple2 : tuple2s) {
+            Container.ExecResult execResult =
+                    server.execInContainer(
+                            "sh", "-c", "find /tmp/seatunnel/logs -name " + 
tuple2.f1() + "\n");
+            String file = execResult.getStdout();
+            execResult =
+                    secondServer.execInContainer(
+                            "sh", "-c", "find /tmp/seatunnel/logs -name " + 
tuple2.f1() + "\n");
+            String file1 = execResult.getStdout();
+            Assertions.assertEquals(
+                    tuple2.f0(), StringUtils.isBlank(file) && 
StringUtils.isBlank(file1));
+        }
+    }
+
     private Response submitJob(
             GenericContainer<?> container,
             String jobMode,
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/seatunnel.yaml
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/seatunnel.yaml
index 5878531b2e..6b0387caa0 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/seatunnel.yaml
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/seatunnel.yaml
@@ -17,7 +17,7 @@
 
 seatunnel:
   engine:
-    history-job-expire-minutes: 1440
+    history-job-expire-minutes: 1
     backup-count: 2
     queue-type: blockingqueue
     print-execution-info-interval: 10
@@ -35,3 +35,8 @@ seatunnel:
         enable-http: true
         port: 8080
         enable-dynamic-port: false
+    telemetry:
+          metric:
+             enabled: false
+          logs:
+             scheduled-deletion-enable: true
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
index 84fef4251e..80b928fcdc 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
@@ -34,4 +34,9 @@ seatunnel:
           namespace: /tmp/seatunnel/checkpoint_snapshot/
     http:
         enable-http: false
-        port: 8080
\ No newline at end of file
+        port: 8080
+    telemetry:
+      metric:
+         enabled: false
+      logs:
+         scheduled-deletion-enable: true
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index ffa984de1f..c0f6b6ad6c 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -28,6 +28,7 @@ import 
org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
 import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
 import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig;
 import org.apache.seatunnel.engine.common.config.server.TelemetryConfig;
+import org.apache.seatunnel.engine.common.config.server.TelemetryLogsConfig;
 import org.apache.seatunnel.engine.common.config.server.TelemetryMetricConfig;
 import org.apache.seatunnel.engine.common.config.server.ThreadShareMode;
 
@@ -330,17 +331,19 @@ public class YamlSeaTunnelDomConfigProcessor extends 
AbstractDomConfigProcessor
     }
 
     private TelemetryConfig parseTelemetryConfig(Node telemetryNode) {
-        TelemetryConfig metricConfig = new TelemetryConfig();
+        TelemetryConfig telemetryConfig = new TelemetryConfig();
         for (Node node : childElements(telemetryNode)) {
             String name = cleanNodeName(node);
             if (ServerConfigOptions.TELEMETRY_METRIC.key().equals(name)) {
-                metricConfig.setMetric(parseTelemetryMetricConfig(node));
+                telemetryConfig.setMetric(parseTelemetryMetricConfig(node));
+            } else if (ServerConfigOptions.TELEMETRY_LOGS.key().equals(name)) {
+                telemetryConfig.setLogs(parseTelemetryLogsConfig(node));
             } else {
                 LOGGER.warning("Unrecognized element: " + name);
             }
         }
 
-        return metricConfig;
+        return telemetryConfig;
     }
 
     private TelemetryMetricConfig parseTelemetryMetricConfig(Node metricNode) {
@@ -357,6 +360,20 @@ public class YamlSeaTunnelDomConfigProcessor extends 
AbstractDomConfigProcessor
         return metricConfig;
     }
 
+    private TelemetryLogsConfig parseTelemetryLogsConfig(Node logsNode) {
+        TelemetryLogsConfig logsConfig = new TelemetryLogsConfig();
+        for (Node node : childElements(logsNode)) {
+            String name = cleanNodeName(node);
+            if 
(ServerConfigOptions.TELEMETRY_LOGS_SCHEDULED_DELETION_ENABLE.key().equals(name))
 {
+                logsConfig.setEnabled(getBooleanValue(getTextContent(node)));
+            } else {
+                LOGGER.warning("Unrecognized element: " + name);
+            }
+        }
+
+        return logsConfig;
+    }
+
     private HttpConfig parseHttpConfig(Node httpNode) {
         HttpConfig httpConfig = new HttpConfig();
         for (Node node : childElements(httpNode)) {
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index d9e9662a99..7153cccd0f 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -211,6 +211,20 @@ public class ServerConfigOptions {
                     .withDescription(
                             "Whether to use classloader cache mode. With cache 
mode, all jobs share the same classloader if the jars are the same");
 
+    public static final Option<Boolean> 
TELEMETRY_LOGS_SCHEDULED_DELETION_ENABLE =
+            Options.key("scheduled-deletion-enable")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Enable scheduled cleanup, with default value of 
true. The system will automatically delete relevant log files when job 
expiration time, as defined by `history-job-expire-minutes`, is reached. "
+                                    + "If this feature is disabled, logs will 
remain permanently on disk, requiring manual management, which may affect disk 
space usage. It is recommended to configure this setting based on specific 
needs.");
+
+    public static final Option<TelemetryLogsConfig> TELEMETRY_LOGS =
+            Options.key("logs")
+                    .type(new TypeReference<TelemetryLogsConfig>() {})
+                    .defaultValue(new TelemetryLogsConfig())
+                    .withDescription("The telemetry logs configuration.");
+
     public static final Option<Boolean> TELEMETRY_METRIC_ENABLED =
             Options.key("enabled")
                     .booleanType()
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryConfig.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryConfig.java
index c3e603eea4..7652832d46 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryConfig.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryConfig.java
@@ -25,4 +25,6 @@ import java.io.Serializable;
 public class TelemetryConfig implements Serializable {
 
     private TelemetryMetricConfig metric = 
ServerConfigOptions.TELEMETRY_METRIC.defaultValue();
+
+    private TelemetryLogsConfig logs = 
ServerConfigOptions.TELEMETRY_LOGS.defaultValue();
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryConfig.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryLogsConfig.java
similarity index 83%
copy from 
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryConfig.java
copy to 
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryLogsConfig.java
index c3e603eea4..9eb8b32975 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryConfig.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryLogsConfig.java
@@ -22,7 +22,8 @@ import lombok.Data;
 import java.io.Serializable;
 
 @Data
-public class TelemetryConfig implements Serializable {
+public class TelemetryLogsConfig implements Serializable {
 
-    private TelemetryMetricConfig metric = 
ServerConfigOptions.TELEMETRY_METRIC.defaultValue();
+    private boolean enabled =
+            
ServerConfigOptions.TELEMETRY_LOGS_SCHEDULED_DELETION_ENABLE.defaultValue();
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/LogUtil.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/LogUtil.java
new file mode 100644
index 0000000000..7d92d7dfbc
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/LogUtil.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.utils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.builder.api.Component;
+import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
+import org.apache.logging.log4j.core.config.properties.PropertiesConfiguration;
+import org.apache.logging.log4j.core.lookup.StrSubstitutor;
+
+import java.lang.reflect.Field;
+
+public class LogUtil {
+
+    /** Get configuration log path by log4j */
+    public static String getLogPath() throws NoSuchFieldException, 
IllegalAccessException {
+        String routingAppender = "routingAppender";
+        String fileAppender = "fileAppender";
+        PropertiesConfiguration config = getLogConfiguration();
+        // Get routingAppender log file path
+        String routingLogFilePath = getRoutingLogFilePath(config);
+
+        // Get fileAppender log file path
+        String fileLogPath = getFileLogPath(config);
+        String logRef =
+                
config.getLoggerConfig(StringUtils.EMPTY).getAppenderRefs().stream()
+                        .map(Object::toString)
+                        .filter(ref -> ref.contains(routingAppender) || 
ref.contains(fileAppender))
+                        .findFirst()
+                        .orElse(StringUtils.EMPTY);
+        if (logRef.equals(routingAppender)) {
+            return routingLogFilePath.substring(0, 
routingLogFilePath.lastIndexOf("/"));
+        } else if (logRef.equals(fileAppender)) {
+            return fileLogPath.substring(0, 
routingLogFilePath.lastIndexOf("/"));
+        } else {
+            throw new IllegalArgumentException(
+                    String.format("Log file path is empty, get logRef : %s", 
logRef));
+        }
+    }
+
+    private static PropertiesConfiguration getLogConfiguration() {
+        LoggerContext context = (LoggerContext) LogManager.getContext(false);
+        return (PropertiesConfiguration) context.getConfiguration();
+    }
+
+    private static String getRoutingLogFilePath(PropertiesConfiguration config)
+            throws NoSuchFieldException, IllegalAccessException {
+        Field propertiesField = 
BuiltConfiguration.class.getDeclaredField("appendersComponent");
+        propertiesField.setAccessible(true);
+        Component propertiesComponent = (Component) 
propertiesField.get(config);
+        StrSubstitutor substitutor = config.getStrSubstitutor();
+        return propertiesComponent.getComponents().stream()
+                .filter(
+                        component ->
+                                
"routingAppender".equals(component.getAttributes().get("name")))
+                .flatMap(component -> component.getComponents().stream())
+                .flatMap(component -> component.getComponents().stream())
+                .flatMap(component -> component.getComponents().stream())
+                .map(component -> 
substitutor.replace(component.getAttributes().get("fileName")))
+                .findFirst()
+                .orElse(null);
+    }
+
+    private static String getFileLogPath(PropertiesConfiguration config)
+            throws NoSuchFieldException, IllegalAccessException {
+        Field propertiesField = 
BuiltConfiguration.class.getDeclaredField("appendersComponent");
+        propertiesField.setAccessible(true);
+        Component propertiesComponent = (Component) 
propertiesField.get(config);
+        StrSubstitutor substitutor = config.getStrSubstitutor();
+        return propertiesComponent.getComponents().stream()
+                .filter(component -> 
"fileAppender".equals(component.getAttributes().get("name")))
+                .map(component -> 
substitutor.replace(component.getAttributes().get("fileName")))
+                .findFirst()
+                .orElse(null);
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ExecutionAddress.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ExecutionAddress.java
new file mode 100644
index 0000000000..ef445b50cb
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ExecutionAddress.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding 
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not 
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.seatunnel.engine.core.job;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class ExecutionAddress implements Serializable {
+    String hostname;
+    int port;
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
index aea57beef3..f0ef24bad6 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
@@ -29,6 +29,7 @@ import lombok.NoArgsConstructor;
 import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 @AllArgsConstructor
 @NoArgsConstructor
@@ -38,6 +39,7 @@ public class JobDAGInfo implements Serializable {
     Map<String, Object> envOptions;
     Map<Integer, List<Edge>> pipelineEdges;
     Map<Long, VertexInfo> vertexInfoMap;
+    Set<ExecutionAddress> historyExecutionPlan;
 
     public JsonObject toJsonObject() {
         JsonObject pipelineEdgesJsonObject = new JsonObject();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 02fdc98b1e..eae0627343 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -398,6 +398,7 @@ public class CoordinatorService {
 
         jobHistoryService =
                 new JobHistoryService(
+                        nodeEngine,
                         runningJobStateIMap,
                         logger,
                         pendingJobMasterMap,
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index fb53fb4459..5721a7db30 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -29,6 +29,7 @@ import 
org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
 import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService;
 import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService;
 import org.apache.seatunnel.engine.server.service.slot.SlotService;
+import org.apache.seatunnel.engine.server.telemetry.log.TaskLogManagerService;
 import 
org.apache.seatunnel.engine.server.telemetry.metrics.entity.ThreadPoolStatus;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -46,6 +47,7 @@ import com.hazelcast.spi.impl.operationservice.LiveOperations;
 import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
 import lombok.Getter;
 import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
 
 import java.util.Properties;
 import java.util.concurrent.Executors;
@@ -55,6 +57,7 @@ import java.util.concurrent.TimeUnit;
 import static 
com.hazelcast.spi.properties.ClusterProperty.INVOCATION_MAX_RETRY_COUNT;
 import static 
com.hazelcast.spi.properties.ClusterProperty.INVOCATION_RETRY_PAUSE;
 
+@Slf4j
 public class SeaTunnelServer
         implements ManagedService, MembershipAwareService, 
LiveOperationsTracker {
 
@@ -71,6 +74,7 @@ public class SeaTunnelServer
     private CoordinatorService coordinatorService;
     private ScheduledExecutorService monitorService;
     private JettyService jettyService;
+    private TaskLogManagerService taskLogManagerService;
 
     @Getter private SeaTunnelHealthMonitor seaTunnelHealthMonitor;
 
@@ -136,6 +140,16 @@ public class SeaTunnelServer
 
         seaTunnelHealthMonitor = new SeaTunnelHealthMonitor(((NodeEngineImpl) 
engine).getNode());
 
+        // task log manager service
+        if (seaTunnelConfig.getEngineConfig().getTelemetryConfig() != null
+                && 
seaTunnelConfig.getEngineConfig().getTelemetryConfig().getLogs() != null
+                && 
seaTunnelConfig.getEngineConfig().getTelemetryConfig().getLogs().isEnabled()) {
+            taskLogManagerService =
+                    new TaskLogManagerService(
+                            
seaTunnelConfig.getEngineConfig().getTelemetryConfig().getLogs());
+            taskLogManagerService.initClean();
+        }
+
         // Start Jetty server
         if (seaTunnelConfig.getEngineConfig().getHttpConfig().isEnabled()) {
             jettyService = new JettyService(nodeEngine, seaTunnelConfig);
@@ -337,6 +351,10 @@ public class SeaTunnelServer
         return getCoordinatorService().getConnectorPackageService();
     }
 
+    public TaskLogManagerService getTaskLogManagerService() {
+        return taskLogManagerService;
+    }
+
     public ThreadPoolStatus getThreadPoolStatusMetrics() {
         return coordinatorService.getThreadPoolStatusMetrics();
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
index bbe6d77e00..62df65c917 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
@@ -29,6 +29,7 @@ import 
org.apache.seatunnel.engine.core.dag.actions.SourceAction;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
 import org.apache.seatunnel.engine.core.job.Edge;
+import org.apache.seatunnel.engine.core.job.ExecutionAddress;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.VertexInfo;
@@ -42,6 +43,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -52,7 +54,8 @@ public class DAGUtils {
             LogicalDag logicalDag,
             JobImmutableInformation jobImmutableInformation,
             EngineConfig engineConfig,
-            boolean isPhysicalDAGIInfo) {
+            boolean isPhysicalDAGIInfo,
+            Set<ExecutionAddress> historyExecutionAddress) {
         List<Pipeline> pipelines =
                 new ExecutionPlanGenerator(logicalDag, 
jobImmutableInformation, engineConfig)
                         .generate()
@@ -89,7 +92,8 @@ public class DAGUtils {
                     jobImmutableInformation.getJobId(),
                     logicalDag.getJobConfig().getEnvOptions(),
                     pipelineWithEdges,
-                    vertexInfoMap);
+                    vertexInfoMap,
+                    historyExecutionAddress);
         } else {
             // Generate LogicalPlan DAG
             List<Edge> edges =
@@ -136,7 +140,8 @@ public class DAGUtils {
                     jobImmutableInformation.getJobId(),
                     logicalDag.getJobConfig().getEnvOptions(),
                     pipelineWithEdges,
-                    vertexInfoMap);
+                    vertexInfoMap,
+                    historyExecutionAddress);
         }
     }
 
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
index a73708a548..7b0d83ef03 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
@@ -24,6 +24,7 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode
 
 import org.apache.seatunnel.api.common.metrics.JobMetrics;
 import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.core.job.ExecutionAddress;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobStatus;
@@ -33,15 +34,22 @@ import 
org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
 import org.apache.seatunnel.engine.server.execution.ExecutionState;
 import org.apache.seatunnel.engine.server.execution.PendingSourceState;
 import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import 
org.apache.seatunnel.engine.server.telemetry.log.operation.CleanLogOperation;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
 
+import com.hazelcast.cluster.Address;
+import com.hazelcast.core.EntryEvent;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.map.IMap;
+import com.hazelcast.map.listener.EntryExpiredListener;
+import com.hazelcast.spi.impl.NodeEngine;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.Getter;
 import scala.Tuple2;
 
 import java.io.Serializable;
+import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -52,6 +60,9 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 public class JobHistoryService {
+
+    private final NodeEngine nodeEngine;
+
     /**
      * IMap key is one of jobId {@link
      * org.apache.seatunnel.engine.server.dag.physical.PipelineLocation} and 
{@link
@@ -91,6 +102,7 @@ public class JobHistoryService {
     private final int finishedJobExpireTime;
 
     public JobHistoryService(
+            NodeEngine nodeEngine,
             IMap<Object, Object> runningJobStateIMap,
             ILogger logger,
             Map<Long, Tuple2<PendingSourceState, JobMaster>> 
pendingJobMasterMap,
@@ -99,6 +111,7 @@ public class JobHistoryService {
             IMap<Long, JobMetrics> finishedJobMetricsImap,
             IMap<Long, JobDAGInfo> finishedJobVertexInfoImap,
             int finishedJobExpireTime) {
+        this.nodeEngine = nodeEngine;
         this.runningJobStateIMap = runningJobStateIMap;
         this.logger = logger;
         this.pendingJobMasterMap = pendingJobMasterMap;
@@ -106,6 +119,7 @@ public class JobHistoryService {
         this.finishedJobStateImap = finishedJobStateImap;
         this.finishedJobMetricsImap = finishedJobMetricsImap;
         this.finishedJobDAGInfoImap = finishedJobVertexInfoImap;
+        this.finishedJobDAGInfoImap.addEntryListener(new 
JobInfoExpiredListener(), true);
         this.objectMapper = new ObjectMapper();
         this.objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, 
false);
         this.finishedJobExpireTime = finishedJobExpireTime;
@@ -287,4 +301,30 @@ public class JobHistoryService {
         private PipelineStatus pipelineStatus;
         private Map<TaskGroupLocation, ExecutionState> executionStateMap;
     }
+
+    private class JobInfoExpiredListener implements EntryExpiredListener<Long, 
JobDAGInfo> {
+        @Override
+        public void entryExpired(EntryEvent<Long, JobDAGInfo> event) {
+            Long jobId = event.getKey();
+            JobDAGInfo jobDagInfo = event.getOldValue();
+            try {
+                Set<ExecutionAddress> historyExecutionPlan = 
jobDagInfo.getHistoryExecutionPlan();
+
+                historyExecutionPlan.forEach(
+                        address -> {
+                            logger.info("clean job log, jobId: " + jobId + ", 
address: " + address);
+                            try {
+                                NodeEngineUtil.sendOperationToMemberNode(
+                                        nodeEngine,
+                                        new CleanLogOperation(jobId),
+                                        new Address(address.getHostname(), 
address.getPort()));
+                            } catch (UnknownHostException e) {
+                                throw new RuntimeException(e);
+                            }
+                        });
+            } catch (Exception e) {
+                logger.warning("clean job log err", e);
+            }
+        }
+    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 04812e078f..49b676c97e 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -44,6 +44,7 @@ import 
org.apache.seatunnel.engine.core.dag.actions.SinkAction;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
 import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.core.job.ExecutionAddress;
 import org.apache.seatunnel.engine.core.job.JobDAGInfo;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobInfo;
@@ -89,10 +90,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
@@ -157,6 +160,8 @@ public class JobMaster {
 
     private final IMap<Long, JobInfo> runningJobInfoIMap;
 
+    @Getter private final Set<ExecutionAddress> historyExecutionAddress = new 
HashSet<>();
+
     private final IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>> 
metricsImap;
 
     /** If the job or pipeline cancel by user, needRestore will be false */
@@ -394,6 +399,17 @@ public class JobMaster {
                         == preApplyResourceFutures.size();
 
         if (enoughResource) {
+            for (Map.Entry<TaskGroupLocation, CompletableFuture<SlotProfile>> 
entry :
+                    preApplyResourceFutures.entrySet()) {
+                try {
+                    Address worker = entry.getValue().get().getWorker();
+                    historyExecutionAddress.add(
+                            new ExecutionAddress(worker.getHost(), 
worker.getPort()));
+
+                } catch (Exception e) {
+                    LOGGER.warning("history execution plan add worker failed", 
e);
+                }
+            }
             if (isSubPlan) {
                 // SubPlan applies for resources separately and needs to be 
merged into the entire
                 // job's resources
@@ -569,7 +585,11 @@ public class JobMaster {
         if (jobDAGInfo == null) {
             jobDAGInfo =
                     DAGUtils.getJobDAGInfo(
-                            logicalDag, jobImmutableInformation, engineConfig, 
isPhysicalDAGIInfo);
+                            logicalDag,
+                            jobImmutableInformation,
+                            engineConfig,
+                            isPhysicalDAGIInfo,
+                            historyExecutionAddress);
         }
         return jobDAGInfo;
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java
index 7d582bf589..a1e84f9ec7 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java
@@ -18,14 +18,7 @@
 package org.apache.seatunnel.engine.server.rest.service;
 
 import org.apache.seatunnel.common.utils.ExceptionUtils;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.core.LoggerContext;
-import org.apache.logging.log4j.core.config.builder.api.Component;
-import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
-import org.apache.logging.log4j.core.config.properties.PropertiesConfiguration;
-import org.apache.logging.log4j.core.lookup.StrSubstitutor;
+import org.apache.seatunnel.engine.common.utils.LogUtil;
 
 import com.hazelcast.internal.util.StringUtil;
 import com.hazelcast.spi.impl.NodeEngineImpl;
@@ -34,7 +27,6 @@ import lombok.extern.slf4j.Slf4j;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.lang.reflect.Field;
 import java.net.HttpURLConnection;
 import java.net.URL;
 
@@ -47,73 +39,13 @@ public class BaseLogService extends BaseService {
     /** Get configuration log path */
     public String getLogPath() {
         try {
-            String routingAppender = "routingAppender";
-            String fileAppender = "fileAppender";
-            PropertiesConfiguration config = getLogConfiguration();
-            // Get routingAppender log file path
-            String routingLogFilePath = getRoutingLogFilePath(config);
-
-            // Get fileAppender log file path
-            String fileLogPath = getFileLogPath(config);
-            String logRef =
-                    
config.getLoggerConfig(StringUtils.EMPTY).getAppenderRefs().stream()
-                            .map(Object::toString)
-                            .filter(
-                                    ref ->
-                                            ref.contains(routingAppender)
-                                                    || 
ref.contains(fileAppender))
-                            .findFirst()
-                            .orElse(StringUtils.EMPTY);
-            if (logRef.equals(routingAppender)) {
-                return routingLogFilePath.substring(0, 
routingLogFilePath.lastIndexOf("/"));
-            } else if (logRef.equals(fileAppender)) {
-                return fileLogPath.substring(0, 
routingLogFilePath.lastIndexOf("/"));
-            } else {
-                log.warn(String.format("Log file path is empty, get logRef : 
%s", logRef));
-                return null;
-            }
+            return LogUtil.getLogPath();
         } catch (NoSuchFieldException | IllegalAccessException e) {
             log.error("Get log path error,{}", ExceptionUtils.getMessage(e));
             return null;
         }
     }
 
-    private String getFileLogPath(PropertiesConfiguration config)
-            throws NoSuchFieldException, IllegalAccessException {
-        Field propertiesField = 
BuiltConfiguration.class.getDeclaredField("appendersComponent");
-        propertiesField.setAccessible(true);
-        Component propertiesComponent = (Component) 
propertiesField.get(config);
-        StrSubstitutor substitutor = config.getStrSubstitutor();
-        return propertiesComponent.getComponents().stream()
-                .filter(component -> 
"fileAppender".equals(component.getAttributes().get("name")))
-                .map(component -> 
substitutor.replace(component.getAttributes().get("fileName")))
-                .findFirst()
-                .orElse(null);
-    }
-
-    private String getRoutingLogFilePath(PropertiesConfiguration config)
-            throws NoSuchFieldException, IllegalAccessException {
-        Field propertiesField = 
BuiltConfiguration.class.getDeclaredField("appendersComponent");
-        propertiesField.setAccessible(true);
-        Component propertiesComponent = (Component) 
propertiesField.get(config);
-        StrSubstitutor substitutor = config.getStrSubstitutor();
-        return propertiesComponent.getComponents().stream()
-                .filter(
-                        component ->
-                                
"routingAppender".equals(component.getAttributes().get("name")))
-                .flatMap(component -> component.getComponents().stream())
-                .flatMap(component -> component.getComponents().stream())
-                .flatMap(component -> component.getComponents().stream())
-                .map(component -> 
substitutor.replace(component.getAttributes().get("fileName")))
-                .findFirst()
-                .orElse(null);
-    }
-
-    private PropertiesConfiguration getLogConfiguration() {
-        LoggerContext context = (LoggerContext) LogManager.getContext(false);
-        return (PropertiesConfiguration) context.getConfiguration();
-    }
-
     protected String sendGet(String urlString) {
         try {
             HttpURLConnection connection = (HttpURLConnection) new 
URL(urlString).openConnection();
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
index b304f7fdde..f342aaba45 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
@@ -69,6 +69,7 @@ import lombok.extern.slf4j.Slf4j;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
@@ -164,7 +165,8 @@ public abstract class BaseService {
                         logicalDag,
                         jobImmutableInformation,
                         
getSeaTunnelServer(false).getSeaTunnelConfig().getEngineConfig(),
-                        true);
+                        true,
+                        new HashSet<>());
 
         jobInfoJson
                 .add(RestConstant.JOB_ID, String.valueOf(jobId))
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index a926767576..47525de0a5 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -44,6 +44,7 @@ import 
org.apache.seatunnel.engine.server.task.operation.source.RestoredSplitOpe
 import 
org.apache.seatunnel.engine.server.task.operation.source.SourceNoMoreElementOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.source.SourceReaderEventOperation;
 import 
org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation;
+import 
org.apache.seatunnel.engine.server.telemetry.log.operation.CleanLogOperation;
 
 import com.hazelcast.internal.serialization.DataSerializerHook;
 import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
@@ -104,6 +105,8 @@ public class TaskDataSerializerHook implements 
DataSerializerHook {
 
     public static final int CLOSE_READER_OPERATION = 26;
 
+    public static final int CLEAN_LOG_OPERATION = 27;
+
     public static final int FACTORY_ID =
             FactoryIdHelper.getFactoryId(
                     
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
@@ -176,6 +179,8 @@ public class TaskDataSerializerHook implements 
DataSerializerHook {
                     return new JobEventReportOperation();
                 case CLOSE_READER_OPERATION:
                     return new CloseIdleReaderOperation();
+                case CLEAN_LOG_OPERATION:
+                    return new CleanLogOperation();
                 default:
                     throw new IllegalArgumentException("Unknown type id " + 
typeId);
             }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/log/TaskLogManagerService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/log/TaskLogManagerService.java
new file mode 100644
index 0000000000..f116ada66c
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/log/TaskLogManagerService.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.telemetry.log;
+
+import org.apache.seatunnel.engine.common.config.server.TelemetryLogsConfig;
+import org.apache.seatunnel.engine.common.utils.LogUtil;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+@Slf4j
+public class TaskLogManagerService {
+    private String path;
+
+    public TaskLogManagerService(TelemetryLogsConfig log) {}
+
+    public void initClean() {
+        try {
+            path = LogUtil.getLogPath();
+        } catch (Exception e) {
+            log.warn(
+                    "The corresponding log file path is not properly 
configured, please check the log configuration file.",
+                    e);
+        }
+    }
+
+    public void clean(long jobId) {
+        log.info("Cleaning logs for jobId: {} , path : {}", jobId, path);
+        if (path == null) {
+            return;
+        }
+        String[] logFiles = getLogFiles(jobId, path);
+        for (String logFile : logFiles) {
+            try {
+                Files.delete(Paths.get(path + "/" + logFile));
+            } catch (IOException e) {
+                log.warn("Failed to delete log file: {}", logFile, e);
+            }
+        }
+    }
+
+    private String[] getLogFiles(long jobId, String path) {
+        File logDir = new File(path);
+        if (!logDir.exists() || !logDir.isDirectory()) {
+            log.warn(
+                    "Skipping deletion: Log directory '{}' either does not 
exist or is not a valid directory. Please verify the path and ensure the logs 
are being written correctly.",
+                    path);
+            return new String[0];
+        }
+
+        return logDir.list((dir, name) -> 
name.contains(String.valueOf(jobId)));
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/log/operation/CleanLogOperation.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/log/operation/CleanLogOperation.java
new file mode 100644
index 0000000000..2f5f215d80
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/log/operation/CleanLogOperation.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.telemetry.log.operation;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
+import org.apache.seatunnel.engine.server.task.operation.TracingOperation;
+import org.apache.seatunnel.engine.server.telemetry.log.TaskLogManagerService;
+
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+
+public class CleanLogOperation extends TracingOperation implements 
IdentifiedDataSerializable {
+
+    private long jobId;
+
+    public CleanLogOperation(long jobId) {
+        super();
+        this.jobId = jobId;
+    }
+
+    public CleanLogOperation() {}
+
+    @Override
+    public void runInternal() throws Exception {
+        SeaTunnelServer service = getService();
+        TaskLogManagerService taskLogManagerService = 
service.getTaskLogManagerService();
+        if (taskLogManagerService != null) {
+            taskLogManagerService.clean(jobId);
+        }
+    }
+
+    @Override
+    public int getFactoryId() {
+        return TaskDataSerializerHook.FACTORY_ID;
+    }
+
+    @Override
+    public int getClassId() {
+        return TaskDataSerializerHook.CLEAN_LOG_OPERATION;
+    }
+
+    @Override
+    public String getServiceName() {
+        return SeaTunnelServer.SERVICE_NAME;
+    }
+}

Reply via email to