This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 84d8cc3fb1 [Feature][Zeta]The expiration time of a historical Job can
be config (#5180)
84d8cc3fb1 is described below
commit 84d8cc3fb12147df80322d3c169aeeb6b8335ec0
Author: wu-a-ge <[email protected]>
AuthorDate: Thu Aug 10 19:12:17 2023 +0800
[Feature][Zeta]The expiration time of a historical Job can be config (#5180)
* fix:hdfs Checkpoint Storage management fails to delete historical files
Co-authored-by: hailin0 <[email protected]>
---
config/seatunnel.yaml | 1 +
docs/en/seatunnel-engine/deployment.md | 12 ++++++++++
.../seatunnel/engine/e2e/JobExecutionIT.java | 26 ++++++++++++++++++++++
.../src/test/resources/seatunnel.yaml | 1 +
.../engine/common/config/EngineConfig.java | 10 +++++++++
.../config/YamlSeaTunnelDomConfigProcessor.java | 5 +++++
.../common/config/server/ServerConfigOptions.java | 5 +++++
.../engine/server/CoordinatorService.java | 3 ++-
.../engine/server/master/JobHistoryService.java | 13 ++++++-----
9 files changed, 70 insertions(+), 6 deletions(-)
diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml
index 7e496ca39a..7c689a328d 100644
--- a/config/seatunnel.yaml
+++ b/config/seatunnel.yaml
@@ -17,6 +17,7 @@
seatunnel:
engine:
+ history-job-expire-minutes: 1440
backup-count: 1
queue-type: blockingqueue
print-execution-info-interval: 60
diff --git a/docs/en/seatunnel-engine/deployment.md
b/docs/en/seatunnel-engine/deployment.md
index c07cd45d6b..1f8692530c 100644
--- a/docs/en/seatunnel-engine/deployment.md
+++ b/docs/en/seatunnel-engine/deployment.md
@@ -103,6 +103,18 @@ seatunnel:
About the checkpoint storage, you can see [checkpoint
storage](checkpoint-storage.md)
+### 4.4 Historical Job expiration Config
+
+The information about each completed Job, such as status, counters, and error
logs, is stored in the IMap object. As the number of running jobs increases,
the memory increases and eventually the memory will overflow. Therefore, you
can adjust the history-job-expire-minutes parameter to solve this problem. The
time unit of this parameter is minute. The default value is 1440 minutes, that
is, one day.
+
+Example
+
+```
+seatunnel:
+ engine:
+ history-job-expire-minutes: 1440
+```
+
## 5. Config SeaTunnel Engine Server
All SeaTunnel Engine Server config in `hazelcast.yaml` file.
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index 5fa521eb50..b666b994c7 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -182,6 +182,32 @@ public class JobExecutionIT {
Assertions.assertTrue(result.getError().startsWith("java.lang.NumberFormatException"));
}
+ @Test
+ public void testExpiredJobWasDeleted() throws Exception {
+ Common.setDeployMode(DeployMode.CLIENT);
+ String filePath =
TestUtils.getResource("batch_fakesource_to_file.conf");
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName("job_expire");
+
+ ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
+ SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+ JobExecutionEnvironment jobExecutionEnv =
+ engineClient.createExecutionContext(filePath, jobConfig);
+
+ final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+ JobResult result = clientJobProxy.doWaitForJobComplete().get();
+ Assertions.assertEquals(result.getStatus(), JobStatus.FINISHED);
+ Awaitility.await()
+ .atMost(65, TimeUnit.SECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertThrowsExactly(
+ NullPointerException.class,
+ () -> clientJobProxy.getJobStatus()));
+ }
+
@AfterAll
static void afterClass() {
if (hazelcastInstance != null) {
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
index 16b9f55c30..3897ae9503 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
@@ -17,6 +17,7 @@
seatunnel:
engine:
+ history-job-expire-minutes: 1
backup-count: 2
queue-type: blockingqueue
print-execution-info-interval: 10
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index edc18a0b15..e162b428bb 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -32,6 +32,7 @@ import static
com.hazelcast.internal.util.Preconditions.checkPositive;
@Data
@SuppressWarnings("checkstyle:MagicNumber")
public class EngineConfig {
+
private int backupCount = ServerConfigOptions.BACKUP_COUNT.defaultValue();
private int printExecutionInfoInterval =
ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.defaultValue();
@@ -50,6 +51,8 @@ public class EngineConfig {
private CheckpointConfig checkpointConfig =
ServerConfigOptions.CHECKPOINT.defaultValue();
private QueueType queueType =
ServerConfigOptions.QUEUE_TYPE.defaultValue();
+ private int historyJobExpireMinutes =
+ ServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES.defaultValue();
public void setBackupCount(int newBackupCount) {
checkBackupCount(newBackupCount, 0);
@@ -82,6 +85,13 @@ public class EngineConfig {
this.taskExecutionThreadShareMode = taskExecutionThreadShareMode;
}
+ public void setHistoryJobExpireMinutes(int historyJobExpireMinutes) {
+ checkPositive(
+ historyJobExpireMinutes,
+ ServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES + " must be >
0");
+ this.historyJobExpireMinutes = historyJobExpireMinutes;
+ }
+
public EngineConfig setQueueType(QueueType queueType) {
checkNotNull(queueType);
this.queueType = queueType;
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index a901fbb5e6..5b8bbf6976 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -131,6 +131,11 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
engineConfig.setSlotServiceConfig(parseSlotServiceConfig(node));
} else if (ServerConfigOptions.CHECKPOINT.key().equals(name)) {
engineConfig.setCheckpointConfig(parseCheckpointConfig(node));
+ } else if
(ServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES.key().equals(name)) {
+ engineConfig.setHistoryJobExpireMinutes(
+ getIntegerValue(
+
ServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES.key(),
+ getTextContent(node)));
} else {
LOGGER.warning("Unrecognized element: " + name);
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index 2de8acad01..2409e59ca2 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -145,4 +145,9 @@ public class ServerConfigOptions {
.type(new TypeReference<Map<String, String>>() {})
.noDefaultValue()
.withDescription("The checkpoint storage instance
configuration.");
+ public static final Option<Integer> HISTORY_JOB_EXPIRE_MINUTES =
+ Options.key("history-job-expire-minutes")
+ .intType()
+ .defaultValue(1440)
+ .withDescription("The expire time of history jobs.time
unit minute");
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index fde8b3744c..1f87218de0 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -219,7 +219,8 @@ public class CoordinatorService {
.getMap(Constant.IMAP_FINISHED_JOB_METRICS),
nodeEngine
.getHazelcastInstance()
-
.getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO));
+
.getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO),
+ engineConfig.getHistoryJobExpireMinutes());
List<CompletableFuture<Void>> collect =
runningJobInfoIMap.entrySet().stream()
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
index 20961adf70..686d2a04fe 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
@@ -76,20 +76,22 @@ public class JobHistoryService {
* finishedJobStateImap key is jobId and value is jobState(json)
JobStateData Indicates the
* status of the job, pipeline, and task
*/
- // TODO need to limit the amount of storage
private final IMap<Long, JobState> finishedJobStateImap;
private final IMap<Long, JobMetrics> finishedJobMetricsImap;
private final ObjectMapper objectMapper;
+ private final int finishedJobExpireTime;
+
public JobHistoryService(
IMap<Object, Object> runningJobStateIMap,
ILogger logger,
Map<Long, JobMaster> runningJobMasterMap,
IMap<Long, JobState> finishedJobStateImap,
IMap<Long, JobMetrics> finishedJobMetricsImap,
- IMap<Long, JobDAGInfo> finishedJobVertexInfoImap) {
+ IMap<Long, JobDAGInfo> finishedJobVertexInfoImap,
+ int finishedJobExpireTime) {
this.runningJobStateIMap = runningJobStateIMap;
this.logger = logger;
this.runningJobMasterMap = runningJobMasterMap;
@@ -98,6 +100,7 @@ public class JobHistoryService {
this.finishedJobDAGInfoImap = finishedJobVertexInfoImap;
this.objectMapper = new ObjectMapper();
this.objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS,
false);
+ this.finishedJobExpireTime = finishedJobExpireTime;
}
// Gets the status of a running and completed job
@@ -169,14 +172,14 @@ public class JobHistoryService {
JobState jobState = toJobStateMapper(jobMaster, false);
jobState.setFinishTime(System.currentTimeMillis());
jobState.setErrorMessage(jobMaster.getErrorMessage());
- finishedJobStateImap.put(jobState.jobId, jobState, 14, TimeUnit.DAYS);
+ finishedJobStateImap.put(jobState.jobId, jobState,
finishedJobExpireTime, TimeUnit.MINUTES);
}
@SuppressWarnings("checkstyle:MagicNumber")
public void storeFinishedPipelineMetrics(long jobId, JobMetrics metrics) {
finishedJobMetricsImap.computeIfAbsent(jobId, key -> JobMetrics.of(new
HashMap<>()));
JobMetrics newMetrics =
finishedJobMetricsImap.get(jobId).merge(metrics);
- finishedJobMetricsImap.put(jobId, newMetrics, 14, TimeUnit.DAYS);
+ finishedJobMetricsImap.put(jobId, newMetrics, finishedJobExpireTime,
TimeUnit.MINUTES);
}
private JobState toJobStateMapper(JobMaster jobMaster, boolean simple) {
@@ -236,7 +239,7 @@ public class JobHistoryService {
}
public void storeJobInfo(long jobId, JobDAGInfo jobInfo) {
- finishedJobDAGInfoImap.put(jobId, jobInfo);
+ finishedJobDAGInfoImap.put(jobId, jobInfo, finishedJobExpireTime,
TimeUnit.MINUTES);
}
@AllArgsConstructor