This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5089d8a2e4 [Feature][Zeta] Support delete logs regularly (#7787)
5089d8a2e4 is described below
commit 5089d8a2e4a9bbc5467071dbd911b8339e82b165
Author: corgy-w <[email protected]>
AuthorDate: Tue Nov 5 18:16:53 2024 +0800
[Feature][Zeta] Support delete logs regularly (#7787)
---
config/seatunnel.yaml | 2 +
docs/en/seatunnel-engine/logging.md | 18 +++++
docs/zh/seatunnel-engine/logging.md | 19 +++++
.../seatunnel/engine/e2e/joblog/JobLogIT.java | 45 ++++++++++-
.../src/test/resources/cluster/seatunnel.yaml | 7 +-
.../src/test/resources/seatunnel.yaml | 7 +-
.../config/YamlSeaTunnelDomConfigProcessor.java | 23 +++++-
.../common/config/server/ServerConfigOptions.java | 14 ++++
.../common/config/server/TelemetryConfig.java | 2 +
...lemetryConfig.java => TelemetryLogsConfig.java} | 5 +-
.../seatunnel/engine/common/utils/LogUtil.java | 93 ++++++++++++++++++++++
.../engine/core/job/ExecutionAddress.java | 27 +++++++
.../seatunnel/engine/core/job/JobDAGInfo.java | 2 +
.../engine/server/CoordinatorService.java | 1 +
.../seatunnel/engine/server/SeaTunnelServer.java | 18 +++++
.../seatunnel/engine/server/dag/DAGUtils.java | 11 ++-
.../engine/server/master/JobHistoryService.java | 40 ++++++++++
.../seatunnel/engine/server/master/JobMaster.java | 22 ++++-
.../engine/server/rest/service/BaseLogService.java | 72 +----------------
.../engine/server/rest/service/BaseService.java | 4 +-
.../serializable/TaskDataSerializerHook.java | 5 ++
.../telemetry/log/TaskLogManagerService.java | 72 +++++++++++++++++
.../telemetry/log/operation/CleanLogOperation.java | 61 ++++++++++++++
23 files changed, 487 insertions(+), 83 deletions(-)
diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml
index f773204eba..79a713a71e 100644
--- a/config/seatunnel.yaml
+++ b/config/seatunnel.yaml
@@ -38,6 +38,8 @@ seatunnel:
telemetry:
metric:
enabled: false
+ log:
+ scheduled-deletion-enable: true
http:
enable-http: true
port: 8080
diff --git a/docs/en/seatunnel-engine/logging.md
b/docs/en/seatunnel-engine/logging.md
index 094cb15feb..be0bc12f0a 100644
--- a/docs/en/seatunnel-engine/logging.md
+++ b/docs/en/seatunnel-engine/logging.md
@@ -92,6 +92,24 @@ SeaTunnel provides an API for querying logs.
For more details, please refer to the [REST-API](rest-api-v2.md).
+## SeaTunnel Log Configuration
+
+### Scheduled deletion of old logs
+
+SeaTunnel supports scheduled deletion of old log files to prevent disk space
exhaustion. You can add the following configuration in the `seatunnel.yml` file:
+
+```yaml
+seatunnel:
+ engine:
+ history-job-expire-minutes: 1440
+ telemetry:
+ logs:
+ scheduled-deletion-enable: true
+```
+
+- `history-job-expire-minutes`: Sets the retention time for historical job
data and logs (in minutes). The system will automatically clear expired job
information and log files after the specified period.
+- `scheduled-deletion-enable`: Enable scheduled cleanup, with default value of
`true`. The system will automatically delete relevant log files when job
expiration time, as defined by `history-job-expire-minutes`, is reached. If
this feature is disabled, logs will remain permanently on disk, requiring
manual management, which may affect disk space usage. It is recommended to
configure this setting based on specific needs.
+
## Best practices for developers
You can create an SLF4J logger by calling
`org.slf4j.LoggerFactory#LoggerFactory.getLogger` with the Class of your class
as an argument.
diff --git a/docs/zh/seatunnel-engine/logging.md
b/docs/zh/seatunnel-engine/logging.md
index 7d4f4f1d62..f97ea572e8 100644
--- a/docs/zh/seatunnel-engine/logging.md
+++ b/docs/zh/seatunnel-engine/logging.md
@@ -92,6 +92,25 @@ SeaTunnel 提供了一个 API,用于查询日志。
有关详细信息,请参阅 [REST-API](rest-api-v2.md)。
+## SeaTunnel 日志配置
+
+### 定时删除旧日志
+
+SeaTunnel 支持定时删除旧日志文件,以避免磁盘空间不足。您可以在 `seatunnel.yml` 文件中添加以下配置:
+
+```yaml
+seatunnel:
+ engine:
+ history-job-expire-minutes: 1440
+ telemetry:
+ logs:
+ scheduled-deletion-enable: true
+```
+
+- `history-job-expire-minutes`:
设置历史作业和日志的保留时间(单位:分钟)。系统将在指定的时间后自动清除过期的作业信息和日志文件。
+- `scheduled-deletion-enable`: 启用定时清理功能,默认为 `true`。系统将在作业达到
`history-job-expire-minutes`
设置的过期时间后自动删除相关日志文件。关闭该功能后,日志将永久保留在磁盘上,需要用户自行管理,否则可能影响磁盘占用。建议根据需求合理配置。
+
+
## 开发人员最佳实践
您可以通过调用 `org.slf4j.LoggerFactory#LoggerFactory.getLogger` 并以您的类的类作为参数来创建 SLF4J
记录器。
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java
index 1aa52488f6..4afbfacb71 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/joblog/JobLogIT.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.platform.commons.util.StringUtils;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
@@ -35,12 +36,15 @@ import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.utility.DockerLoggerFactory;
import org.testcontainers.utility.MountableFile;
+import com.beust.jcommander.internal.Lists;
+import com.hazelcast.jet.datamodel.Tuple2;
import io.restassured.response.Response;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
+import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.Stream;
@@ -52,7 +56,11 @@ import static org.hamcrest.Matchers.equalTo;
public class JobLogIT extends SeaTunnelContainer {
private static final String CUSTOM_JOB_NAME = "test-job-log-file";
+ private static final String CUSTOM_JOB_NAME2 = "test-job-log-file2";
+ private static final String CUSTOM_JOB_NAME3 = "test-job-log-file3";
private static final long CUSTOM_JOB_ID = 862969647010611201L;
+ private static final long CUSTOM_JOB_ID2 = 862969647010611202L;
+ private static final long CUSTOM_JOB_ID3 = 862969647010611203L;
private static final String confFile = "/fakesource_to_console.conf";
private static final Path BIN_PATH = Paths.get(SEATUNNEL_HOME, "bin",
SERVER_SHELL);
@@ -99,10 +107,29 @@ public class JobLogIT extends SeaTunnelContainer {
@Test
public void testJobLogFile() throws Exception {
submitJobAndAssertResponse(
- server, JobMode.STREAMING.name(), false, CUSTOM_JOB_NAME,
CUSTOM_JOB_ID);
+ server, JobMode.BATCH.name(), false, CUSTOM_JOB_NAME,
CUSTOM_JOB_ID);
+
+ submitJobAndAssertResponse(
+ server, JobMode.STREAMING.name(), false, CUSTOM_JOB_NAME2,
CUSTOM_JOB_ID2);
+
+ submitJobAndAssertResponse(
+ server, JobMode.STREAMING.name(), false, CUSTOM_JOB_NAME3,
CUSTOM_JOB_ID3);
assertConsoleLog();
assertFileLog();
+ List<Tuple2<Boolean, String>> before =
+ Lists.newArrayList(
+ Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID + ".log"),
+ Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID2 + ".log"),
+ Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID3 +
".log"));
+ assertFileLogClean(before);
+ Thread.sleep(90000);
+ List<Tuple2<Boolean, String>> after =
+ Lists.newArrayList(
+ Tuple2.tuple2(true, "job-" + CUSTOM_JOB_ID + ".log"),
+ Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID2 + ".log"),
+ Tuple2.tuple2(false, "job-" + CUSTOM_JOB_ID3 +
".log"));
+ assertFileLogClean(after);
}
private void assertConsoleLog() {
@@ -168,6 +195,22 @@ public class JobLogIT extends SeaTunnelContainer {
});
}
+ private void assertFileLogClean(List<Tuple2<Boolean, String>> tuple2s)
+ throws IOException, InterruptedException {
+ for (Tuple2<Boolean, String> tuple2 : tuple2s) {
+ Container.ExecResult execResult =
+ server.execInContainer(
+ "sh", "-c", "find /tmp/seatunnel/logs -name " +
tuple2.f1() + "\n");
+ String file = execResult.getStdout();
+ execResult =
+ secondServer.execInContainer(
+ "sh", "-c", "find /tmp/seatunnel/logs -name " +
tuple2.f1() + "\n");
+ String file1 = execResult.getStdout();
+ Assertions.assertEquals(
+ tuple2.f0(), StringUtils.isBlank(file) &&
StringUtils.isBlank(file1));
+ }
+ }
+
private Response submitJob(
GenericContainer<?> container,
String jobMode,
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/seatunnel.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/seatunnel.yaml
index 5878531b2e..6b0387caa0 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/seatunnel.yaml
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster/seatunnel.yaml
@@ -17,7 +17,7 @@
seatunnel:
engine:
- history-job-expire-minutes: 1440
+ history-job-expire-minutes: 1
backup-count: 2
queue-type: blockingqueue
print-execution-info-interval: 10
@@ -35,3 +35,8 @@ seatunnel:
enable-http: true
port: 8080
enable-dynamic-port: false
+ telemetry:
+ metric:
+ enabled: false
+ logs:
+ scheduled-deletion-enable: true
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
index 84fef4251e..80b928fcdc 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
@@ -34,4 +34,9 @@ seatunnel:
namespace: /tmp/seatunnel/checkpoint_snapshot/
http:
enable-http: false
- port: 8080
\ No newline at end of file
+ port: 8080
+ telemetry:
+ metric:
+ enabled: false
+ logs:
+ scheduled-deletion-enable: true
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index ffa984de1f..c0f6b6ad6c 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -28,6 +28,7 @@ import
org.apache.seatunnel.engine.common.config.server.ScheduleStrategy;
import org.apache.seatunnel.engine.common.config.server.ServerConfigOptions;
import org.apache.seatunnel.engine.common.config.server.SlotServiceConfig;
import org.apache.seatunnel.engine.common.config.server.TelemetryConfig;
+import org.apache.seatunnel.engine.common.config.server.TelemetryLogsConfig;
import org.apache.seatunnel.engine.common.config.server.TelemetryMetricConfig;
import org.apache.seatunnel.engine.common.config.server.ThreadShareMode;
@@ -330,17 +331,19 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
}
private TelemetryConfig parseTelemetryConfig(Node telemetryNode) {
- TelemetryConfig metricConfig = new TelemetryConfig();
+ TelemetryConfig telemetryConfig = new TelemetryConfig();
for (Node node : childElements(telemetryNode)) {
String name = cleanNodeName(node);
if (ServerConfigOptions.TELEMETRY_METRIC.key().equals(name)) {
- metricConfig.setMetric(parseTelemetryMetricConfig(node));
+ telemetryConfig.setMetric(parseTelemetryMetricConfig(node));
+ } else if (ServerConfigOptions.TELEMETRY_LOGS.key().equals(name)) {
+ telemetryConfig.setLogs(parseTelemetryLogsConfig(node));
} else {
LOGGER.warning("Unrecognized element: " + name);
}
}
- return metricConfig;
+ return telemetryConfig;
}
private TelemetryMetricConfig parseTelemetryMetricConfig(Node metricNode) {
@@ -357,6 +360,20 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
return metricConfig;
}
+ private TelemetryLogsConfig parseTelemetryLogsConfig(Node logsNode) {
+ TelemetryLogsConfig logsConfig = new TelemetryLogsConfig();
+ for (Node node : childElements(logsNode)) {
+ String name = cleanNodeName(node);
+ if
(ServerConfigOptions.TELEMETRY_LOGS_SCHEDULED_DELETION_ENABLE.key().equals(name))
{
+ logsConfig.setEnabled(getBooleanValue(getTextContent(node)));
+ } else {
+ LOGGER.warning("Unrecognized element: " + name);
+ }
+ }
+
+ return logsConfig;
+ }
+
private HttpConfig parseHttpConfig(Node httpNode) {
HttpConfig httpConfig = new HttpConfig();
for (Node node : childElements(httpNode)) {
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index d9e9662a99..7153cccd0f 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -211,6 +211,20 @@ public class ServerConfigOptions {
.withDescription(
"Whether to use classloader cache mode. With cache
mode, all jobs share the same classloader if the jars are the same");
+ public static final Option<Boolean>
TELEMETRY_LOGS_SCHEDULED_DELETION_ENABLE =
+ Options.key("scheduled-deletion-enable")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Enable scheduled cleanup, with default value of
true. The system will automatically delete relevant log files when job
expiration time, as defined by `history-job-expire-minutes`, is reached. "
+ + "If this feature is disabled, logs will
remain permanently on disk, requiring manual management, which may affect disk
space usage. It is recommended to configure this setting based on specific
needs.");
+
+ public static final Option<TelemetryLogsConfig> TELEMETRY_LOGS =
+ Options.key("logs")
+ .type(new TypeReference<TelemetryLogsConfig>() {})
+ .defaultValue(new TelemetryLogsConfig())
+ .withDescription("The telemetry logs configuration.");
+
public static final Option<Boolean> TELEMETRY_METRIC_ENABLED =
Options.key("enabled")
.booleanType()
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryConfig.java
index c3e603eea4..7652832d46 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryConfig.java
@@ -25,4 +25,6 @@ import java.io.Serializable;
public class TelemetryConfig implements Serializable {
private TelemetryMetricConfig metric =
ServerConfigOptions.TELEMETRY_METRIC.defaultValue();
+
+ private TelemetryLogsConfig logs =
ServerConfigOptions.TELEMETRY_LOGS.defaultValue();
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryLogsConfig.java
similarity index 83%
copy from
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryConfig.java
copy to
seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryLogsConfig.java
index c3e603eea4..9eb8b32975 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/TelemetryLogsConfig.java
@@ -22,7 +22,8 @@ import lombok.Data;
import java.io.Serializable;
@Data
-public class TelemetryConfig implements Serializable {
+public class TelemetryLogsConfig implements Serializable {
- private TelemetryMetricConfig metric =
ServerConfigOptions.TELEMETRY_METRIC.defaultValue();
+ private boolean enabled =
+
ServerConfigOptions.TELEMETRY_LOGS_SCHEDULED_DELETION_ENABLE.defaultValue();
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/LogUtil.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/LogUtil.java
new file mode 100644
index 0000000000..7d92d7dfbc
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/utils/LogUtil.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.common.utils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.core.LoggerContext;
+import org.apache.logging.log4j.core.config.builder.api.Component;
+import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
+import org.apache.logging.log4j.core.config.properties.PropertiesConfiguration;
+import org.apache.logging.log4j.core.lookup.StrSubstitutor;
+
+import java.lang.reflect.Field;
+
+public class LogUtil {
+
+ /** Get configuration log path by log4j */
+ public static String getLogPath() throws NoSuchFieldException,
IllegalAccessException {
+ String routingAppender = "routingAppender";
+ String fileAppender = "fileAppender";
+ PropertiesConfiguration config = getLogConfiguration();
+ // Get routingAppender log file path
+ String routingLogFilePath = getRoutingLogFilePath(config);
+
+ // Get fileAppender log file path
+ String fileLogPath = getFileLogPath(config);
+ String logRef =
+
config.getLoggerConfig(StringUtils.EMPTY).getAppenderRefs().stream()
+ .map(Object::toString)
+ .filter(ref -> ref.contains(routingAppender) ||
ref.contains(fileAppender))
+ .findFirst()
+ .orElse(StringUtils.EMPTY);
+ if (logRef.equals(routingAppender)) {
+ return routingLogFilePath.substring(0,
routingLogFilePath.lastIndexOf("/"));
+ } else if (logRef.equals(fileAppender)) {
+ return fileLogPath.substring(0,
routingLogFilePath.lastIndexOf("/"));
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Log file path is empty, get logRef : %s",
logRef));
+ }
+ }
+
+ private static PropertiesConfiguration getLogConfiguration() {
+ LoggerContext context = (LoggerContext) LogManager.getContext(false);
+ return (PropertiesConfiguration) context.getConfiguration();
+ }
+
+ private static String getRoutingLogFilePath(PropertiesConfiguration config)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field propertiesField =
BuiltConfiguration.class.getDeclaredField("appendersComponent");
+ propertiesField.setAccessible(true);
+ Component propertiesComponent = (Component)
propertiesField.get(config);
+ StrSubstitutor substitutor = config.getStrSubstitutor();
+ return propertiesComponent.getComponents().stream()
+ .filter(
+ component ->
+
"routingAppender".equals(component.getAttributes().get("name")))
+ .flatMap(component -> component.getComponents().stream())
+ .flatMap(component -> component.getComponents().stream())
+ .flatMap(component -> component.getComponents().stream())
+ .map(component ->
substitutor.replace(component.getAttributes().get("fileName")))
+ .findFirst()
+ .orElse(null);
+ }
+
+ private static String getFileLogPath(PropertiesConfiguration config)
+ throws NoSuchFieldException, IllegalAccessException {
+ Field propertiesField =
BuiltConfiguration.class.getDeclaredField("appendersComponent");
+ propertiesField.setAccessible(true);
+ Component propertiesComponent = (Component)
propertiesField.get(config);
+ StrSubstitutor substitutor = config.getStrSubstitutor();
+ return propertiesComponent.getComponents().stream()
+ .filter(component ->
"fileAppender".equals(component.getAttributes().get("name")))
+ .map(component ->
substitutor.replace(component.getAttributes().get("fileName")))
+ .findFirst()
+ .orElse(null);
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ExecutionAddress.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ExecutionAddress.java
new file mode 100644
index 0000000000..ef445b50cb
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/ExecutionAddress.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding
copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package org.apache.seatunnel.engine.core.job;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class ExecutionAddress implements Serializable {
+ String hostname;
+ int port;
+}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
index aea57beef3..f0ef24bad6 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/job/JobDAGInfo.java
@@ -29,6 +29,7 @@ import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.util.List;
import java.util.Map;
+import java.util.Set;
@AllArgsConstructor
@NoArgsConstructor
@@ -38,6 +39,7 @@ public class JobDAGInfo implements Serializable {
Map<String, Object> envOptions;
Map<Integer, List<Edge>> pipelineEdges;
Map<Long, VertexInfo> vertexInfoMap;
+ Set<ExecutionAddress> historyExecutionPlan;
public JsonObject toJsonObject() {
JsonObject pipelineEdgesJsonObject = new JsonObject();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index 02fdc98b1e..eae0627343 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -398,6 +398,7 @@ public class CoordinatorService {
jobHistoryService =
new JobHistoryService(
+ nodeEngine,
runningJobStateIMap,
logger,
pendingJobMasterMap,
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index fb53fb4459..5721a7db30 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -29,6 +29,7 @@ import
org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
import org.apache.seatunnel.engine.server.service.jar.ConnectorPackageService;
import org.apache.seatunnel.engine.server.service.slot.DefaultSlotService;
import org.apache.seatunnel.engine.server.service.slot.SlotService;
+import org.apache.seatunnel.engine.server.telemetry.log.TaskLogManagerService;
import
org.apache.seatunnel.engine.server.telemetry.metrics.entity.ThreadPoolStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -46,6 +47,7 @@ import com.hazelcast.spi.impl.operationservice.LiveOperations;
import com.hazelcast.spi.impl.operationservice.LiveOperationsTracker;
import lombok.Getter;
import lombok.NonNull;
+import lombok.extern.slf4j.Slf4j;
import java.util.Properties;
import java.util.concurrent.Executors;
@@ -55,6 +57,7 @@ import java.util.concurrent.TimeUnit;
import static
com.hazelcast.spi.properties.ClusterProperty.INVOCATION_MAX_RETRY_COUNT;
import static
com.hazelcast.spi.properties.ClusterProperty.INVOCATION_RETRY_PAUSE;
+@Slf4j
public class SeaTunnelServer
implements ManagedService, MembershipAwareService,
LiveOperationsTracker {
@@ -71,6 +74,7 @@ public class SeaTunnelServer
private CoordinatorService coordinatorService;
private ScheduledExecutorService monitorService;
private JettyService jettyService;
+ private TaskLogManagerService taskLogManagerService;
@Getter private SeaTunnelHealthMonitor seaTunnelHealthMonitor;
@@ -136,6 +140,16 @@ public class SeaTunnelServer
seaTunnelHealthMonitor = new SeaTunnelHealthMonitor(((NodeEngineImpl)
engine).getNode());
+ // task log manager service
+ if (seaTunnelConfig.getEngineConfig().getTelemetryConfig() != null
+ &&
seaTunnelConfig.getEngineConfig().getTelemetryConfig().getLogs() != null
+ &&
seaTunnelConfig.getEngineConfig().getTelemetryConfig().getLogs().isEnabled()) {
+ taskLogManagerService =
+ new TaskLogManagerService(
+
seaTunnelConfig.getEngineConfig().getTelemetryConfig().getLogs());
+ taskLogManagerService.initClean();
+ }
+
// Start Jetty server
if (seaTunnelConfig.getEngineConfig().getHttpConfig().isEnabled()) {
jettyService = new JettyService(nodeEngine, seaTunnelConfig);
@@ -337,6 +351,10 @@ public class SeaTunnelServer
return getCoordinatorService().getConnectorPackageService();
}
+ public TaskLogManagerService getTaskLogManagerService() {
+ return taskLogManagerService;
+ }
+
public ThreadPoolStatus getThreadPoolStatusMetrics() {
return coordinatorService.getThreadPoolStatusMetrics();
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
index bbe6d77e00..62df65c917 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/DAGUtils.java
@@ -29,6 +29,7 @@ import
org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
import org.apache.seatunnel.engine.core.job.Edge;
+import org.apache.seatunnel.engine.core.job.ExecutionAddress;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.VertexInfo;
@@ -42,6 +43,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -52,7 +54,8 @@ public class DAGUtils {
LogicalDag logicalDag,
JobImmutableInformation jobImmutableInformation,
EngineConfig engineConfig,
- boolean isPhysicalDAGIInfo) {
+ boolean isPhysicalDAGIInfo,
+ Set<ExecutionAddress> historyExecutionAddress) {
List<Pipeline> pipelines =
new ExecutionPlanGenerator(logicalDag,
jobImmutableInformation, engineConfig)
.generate()
@@ -89,7 +92,8 @@ public class DAGUtils {
jobImmutableInformation.getJobId(),
logicalDag.getJobConfig().getEnvOptions(),
pipelineWithEdges,
- vertexInfoMap);
+ vertexInfoMap,
+ historyExecutionAddress);
} else {
// Generate LogicalPlan DAG
List<Edge> edges =
@@ -136,7 +140,8 @@ public class DAGUtils {
jobImmutableInformation.getJobId(),
logicalDag.getJobConfig().getEnvOptions(),
pipelineWithEdges,
- vertexInfoMap);
+ vertexInfoMap,
+ historyExecutionAddress);
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
index a73708a548..7b0d83ef03 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
@@ -24,6 +24,7 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode
import org.apache.seatunnel.api.common.metrics.JobMetrics;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
+import org.apache.seatunnel.engine.core.job.ExecutionAddress;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
@@ -33,15 +34,22 @@ import
org.apache.seatunnel.engine.server.dag.physical.PipelineLocation;
import org.apache.seatunnel.engine.server.execution.ExecutionState;
import org.apache.seatunnel.engine.server.execution.PendingSourceState;
import org.apache.seatunnel.engine.server.execution.TaskGroupLocation;
+import
org.apache.seatunnel.engine.server.telemetry.log.operation.CleanLogOperation;
+import org.apache.seatunnel.engine.server.utils.NodeEngineUtil;
+import com.hazelcast.cluster.Address;
+import com.hazelcast.core.EntryEvent;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.IMap;
+import com.hazelcast.map.listener.EntryExpiredListener;
+import com.hazelcast.spi.impl.NodeEngine;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.Getter;
import scala.Tuple2;
import java.io.Serializable;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -52,6 +60,9 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
public class JobHistoryService {
+
+ private final NodeEngine nodeEngine;
+
/**
* IMap key is one of jobId {@link
* org.apache.seatunnel.engine.server.dag.physical.PipelineLocation} and
{@link
@@ -91,6 +102,7 @@ public class JobHistoryService {
private final int finishedJobExpireTime;
public JobHistoryService(
+ NodeEngine nodeEngine,
IMap<Object, Object> runningJobStateIMap,
ILogger logger,
Map<Long, Tuple2<PendingSourceState, JobMaster>>
pendingJobMasterMap,
@@ -99,6 +111,7 @@ public class JobHistoryService {
IMap<Long, JobMetrics> finishedJobMetricsImap,
IMap<Long, JobDAGInfo> finishedJobVertexInfoImap,
int finishedJobExpireTime) {
+ this.nodeEngine = nodeEngine;
this.runningJobStateIMap = runningJobStateIMap;
this.logger = logger;
this.pendingJobMasterMap = pendingJobMasterMap;
@@ -106,6 +119,7 @@ public class JobHistoryService {
this.finishedJobStateImap = finishedJobStateImap;
this.finishedJobMetricsImap = finishedJobMetricsImap;
this.finishedJobDAGInfoImap = finishedJobVertexInfoImap;
+ this.finishedJobDAGInfoImap.addEntryListener(new
JobInfoExpiredListener(), true);
this.objectMapper = new ObjectMapper();
this.objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS,
false);
this.finishedJobExpireTime = finishedJobExpireTime;
@@ -287,4 +301,30 @@ public class JobHistoryService {
private PipelineStatus pipelineStatus;
private Map<TaskGroupLocation, ExecutionState> executionStateMap;
}
+
+ private class JobInfoExpiredListener implements EntryExpiredListener<Long,
JobDAGInfo> {
+ @Override
+ public void entryExpired(EntryEvent<Long, JobDAGInfo> event) {
+ Long jobId = event.getKey();
+ JobDAGInfo jobDagInfo = event.getOldValue();
+ try {
+ Set<ExecutionAddress> historyExecutionPlan =
jobDagInfo.getHistoryExecutionPlan();
+
+ historyExecutionPlan.forEach(
+ address -> {
+ logger.info("clean job log, jobId: " + jobId + ",
address: " + address);
+ try {
+ NodeEngineUtil.sendOperationToMemberNode(
+ nodeEngine,
+ new CleanLogOperation(jobId),
+ new Address(address.getHostname(),
address.getPort()));
+ } catch (UnknownHostException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } catch (Exception e) {
+ logger.warning("clean job log err", e);
+ }
+ }
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 04812e078f..49b676c97e 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -44,6 +44,7 @@ import
org.apache.seatunnel.engine.core.dag.actions.SinkAction;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.dag.logical.LogicalVertex;
import org.apache.seatunnel.engine.core.job.ConnectorJarIdentifier;
+import org.apache.seatunnel.engine.core.job.ExecutionAddress;
import org.apache.seatunnel.engine.core.job.JobDAGInfo;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobInfo;
@@ -89,10 +90,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
@@ -157,6 +160,8 @@ public class JobMaster {
private final IMap<Long, JobInfo> runningJobInfoIMap;
+ @Getter private final Set<ExecutionAddress> historyExecutionAddress = new
HashSet<>();
+
private final IMap<Long, HashMap<TaskLocation, SeaTunnelMetricsContext>>
metricsImap;
/** If the job or pipeline cancel by user, needRestore will be false */
@@ -394,6 +399,17 @@ public class JobMaster {
== preApplyResourceFutures.size();
if (enoughResource) {
+ for (Map.Entry<TaskGroupLocation, CompletableFuture<SlotProfile>>
entry :
+ preApplyResourceFutures.entrySet()) {
+ try {
+ Address worker = entry.getValue().get().getWorker();
+ historyExecutionAddress.add(
+ new ExecutionAddress(worker.getHost(),
worker.getPort()));
+
+ } catch (Exception e) {
+ LOGGER.warning("history execution plan add worker failed",
e);
+ }
+ }
if (isSubPlan) {
// SubPlan applies for resources separately and needs to be
merged into the entire
// job's resources
@@ -569,7 +585,11 @@ public class JobMaster {
if (jobDAGInfo == null) {
jobDAGInfo =
DAGUtils.getJobDAGInfo(
- logicalDag, jobImmutableInformation, engineConfig,
isPhysicalDAGIInfo);
+ logicalDag,
+ jobImmutableInformation,
+ engineConfig,
+ isPhysicalDAGIInfo,
+ historyExecutionAddress);
}
return jobDAGInfo;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java
index 7d582bf589..a1e84f9ec7 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseLogService.java
@@ -18,14 +18,7 @@
package org.apache.seatunnel.engine.server.rest.service;
import org.apache.seatunnel.common.utils.ExceptionUtils;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.core.LoggerContext;
-import org.apache.logging.log4j.core.config.builder.api.Component;
-import org.apache.logging.log4j.core.config.builder.impl.BuiltConfiguration;
-import org.apache.logging.log4j.core.config.properties.PropertiesConfiguration;
-import org.apache.logging.log4j.core.lookup.StrSubstitutor;
+import org.apache.seatunnel.engine.common.utils.LogUtil;
import com.hazelcast.internal.util.StringUtil;
import com.hazelcast.spi.impl.NodeEngineImpl;
@@ -34,7 +27,6 @@ import lombok.extern.slf4j.Slf4j;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.lang.reflect.Field;
import java.net.HttpURLConnection;
import java.net.URL;
@@ -47,73 +39,13 @@ public class BaseLogService extends BaseService {
/** Get configuration log path */
public String getLogPath() {
try {
- String routingAppender = "routingAppender";
- String fileAppender = "fileAppender";
- PropertiesConfiguration config = getLogConfiguration();
- // Get routingAppender log file path
- String routingLogFilePath = getRoutingLogFilePath(config);
-
- // Get fileAppender log file path
- String fileLogPath = getFileLogPath(config);
- String logRef =
-
config.getLoggerConfig(StringUtils.EMPTY).getAppenderRefs().stream()
- .map(Object::toString)
- .filter(
- ref ->
- ref.contains(routingAppender)
- ||
ref.contains(fileAppender))
- .findFirst()
- .orElse(StringUtils.EMPTY);
- if (logRef.equals(routingAppender)) {
- return routingLogFilePath.substring(0,
routingLogFilePath.lastIndexOf("/"));
- } else if (logRef.equals(fileAppender)) {
- return fileLogPath.substring(0,
routingLogFilePath.lastIndexOf("/"));
- } else {
- log.warn(String.format("Log file path is empty, get logRef :
%s", logRef));
- return null;
- }
+ return LogUtil.getLogPath();
} catch (NoSuchFieldException | IllegalAccessException e) {
log.error("Get log path error,{}", ExceptionUtils.getMessage(e));
return null;
}
}
- private String getFileLogPath(PropertiesConfiguration config)
- throws NoSuchFieldException, IllegalAccessException {
- Field propertiesField =
BuiltConfiguration.class.getDeclaredField("appendersComponent");
- propertiesField.setAccessible(true);
- Component propertiesComponent = (Component)
propertiesField.get(config);
- StrSubstitutor substitutor = config.getStrSubstitutor();
- return propertiesComponent.getComponents().stream()
- .filter(component ->
"fileAppender".equals(component.getAttributes().get("name")))
- .map(component ->
substitutor.replace(component.getAttributes().get("fileName")))
- .findFirst()
- .orElse(null);
- }
-
- private String getRoutingLogFilePath(PropertiesConfiguration config)
- throws NoSuchFieldException, IllegalAccessException {
- Field propertiesField =
BuiltConfiguration.class.getDeclaredField("appendersComponent");
- propertiesField.setAccessible(true);
- Component propertiesComponent = (Component)
propertiesField.get(config);
- StrSubstitutor substitutor = config.getStrSubstitutor();
- return propertiesComponent.getComponents().stream()
- .filter(
- component ->
-
"routingAppender".equals(component.getAttributes().get("name")))
- .flatMap(component -> component.getComponents().stream())
- .flatMap(component -> component.getComponents().stream())
- .flatMap(component -> component.getComponents().stream())
- .map(component ->
substitutor.replace(component.getAttributes().get("fileName")))
- .findFirst()
- .orElse(null);
- }
-
- private PropertiesConfiguration getLogConfiguration() {
- LoggerContext context = (LoggerContext) LogManager.getContext(false);
- return (PropertiesConfiguration) context.getConfiguration();
- }
-
protected String sendGet(String urlString) {
try {
HttpURLConnection connection = (HttpURLConnection) new
URL(urlString).openConnection();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
index b304f7fdde..f342aaba45 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/rest/service/BaseService.java
@@ -69,6 +69,7 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@@ -164,7 +165,8 @@ public abstract class BaseService {
logicalDag,
jobImmutableInformation,
getSeaTunnelServer(false).getSeaTunnelConfig().getEngineConfig(),
- true);
+ true,
+ new HashSet<>());
jobInfoJson
.add(RestConstant.JOB_ID, String.valueOf(jobId))
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
index a926767576..47525de0a5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/serializable/TaskDataSerializerHook.java
@@ -44,6 +44,7 @@ import
org.apache.seatunnel.engine.server.task.operation.source.RestoredSplitOpe
import
org.apache.seatunnel.engine.server.task.operation.source.SourceNoMoreElementOperation;
import
org.apache.seatunnel.engine.server.task.operation.source.SourceReaderEventOperation;
import
org.apache.seatunnel.engine.server.task.operation.source.SourceRegisterOperation;
+import
org.apache.seatunnel.engine.server.telemetry.log.operation.CleanLogOperation;
import com.hazelcast.internal.serialization.DataSerializerHook;
import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
@@ -104,6 +105,8 @@ public class TaskDataSerializerHook implements
DataSerializerHook {
public static final int CLOSE_READER_OPERATION = 26;
+ public static final int CLEAN_LOG_OPERATION = 27;
+
public static final int FACTORY_ID =
FactoryIdHelper.getFactoryId(
SeaTunnelFactoryIdConstant.SEATUNNEL_TASK_DATA_SERIALIZER_FACTORY,
@@ -176,6 +179,8 @@ public class TaskDataSerializerHook implements
DataSerializerHook {
return new JobEventReportOperation();
case CLOSE_READER_OPERATION:
return new CloseIdleReaderOperation();
+ case CLEAN_LOG_OPERATION:
+ return new CleanLogOperation();
default:
throw new IllegalArgumentException("Unknown type id " +
typeId);
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/log/TaskLogManagerService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/log/TaskLogManagerService.java
new file mode 100644
index 0000000000..f116ada66c
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/log/TaskLogManagerService.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.telemetry.log;
+
+import org.apache.seatunnel.engine.common.config.server.TelemetryLogsConfig;
+import org.apache.seatunnel.engine.common.utils.LogUtil;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+@Slf4j
+public class TaskLogManagerService {
+ private String path;
+
+ public TaskLogManagerService(TelemetryLogsConfig log) {}
+
+ public void initClean() {
+ try {
+ path = LogUtil.getLogPath();
+ } catch (Exception e) {
+ log.warn(
+ "The corresponding log file path is not properly
configured, please check the log configuration file.",
+ e);
+ }
+ }
+
+ public void clean(long jobId) {
+ log.info("Cleaning logs for jobId: {} , path : {}", jobId, path);
+ if (path == null) {
+ return;
+ }
+ String[] logFiles = getLogFiles(jobId, path);
+ for (String logFile : logFiles) {
+ try {
+ Files.delete(Paths.get(path + "/" + logFile));
+ } catch (IOException e) {
+ log.warn("Failed to delete log file: {}", logFile, e);
+ }
+ }
+ }
+
+ private String[] getLogFiles(long jobId, String path) {
+ File logDir = new File(path);
+ if (!logDir.exists() || !logDir.isDirectory()) {
+ log.warn(
+ "Skipping deletion: Log directory '{}' either does not
exist or is not a valid directory. Please verify the path and ensure the logs
are being written correctly.",
+ path);
+ return new String[0];
+ }
+
+ return logDir.list((dir, name) ->
name.contains(String.valueOf(jobId)));
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/log/operation/CleanLogOperation.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/log/operation/CleanLogOperation.java
new file mode 100644
index 0000000000..2f5f215d80
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/telemetry/log/operation/CleanLogOperation.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.telemetry.log.operation;
+
+import org.apache.seatunnel.engine.server.SeaTunnelServer;
+import org.apache.seatunnel.engine.server.serializable.TaskDataSerializerHook;
+import org.apache.seatunnel.engine.server.task.operation.TracingOperation;
+import org.apache.seatunnel.engine.server.telemetry.log.TaskLogManagerService;
+
+import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
+
+public class CleanLogOperation extends TracingOperation implements
IdentifiedDataSerializable {
+
+ private long jobId;
+
+ public CleanLogOperation(long jobId) {
+ super();
+ this.jobId = jobId;
+ }
+
+ public CleanLogOperation() {}
+
+ @Override
+ public void runInternal() throws Exception {
+ SeaTunnelServer service = getService();
+ TaskLogManagerService taskLogManagerService =
service.getTaskLogManagerService();
+ if (taskLogManagerService != null) {
+ taskLogManagerService.clean(jobId);
+ }
+ }
+
+ @Override
+ public int getFactoryId() {
+ return TaskDataSerializerHook.FACTORY_ID;
+ }
+
+ @Override
+ public int getClassId() {
+ return TaskDataSerializerHook.CLEAN_LOG_OPERATION;
+ }
+
+ @Override
+ public String getServiceName() {
+ return SeaTunnelServer.SERVICE_NAME;
+ }
+}