This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 84d8cc3fb1 [Feature][Zeta]The expiration time of a historical Job can 
be config (#5180)
84d8cc3fb1 is described below

commit 84d8cc3fb12147df80322d3c169aeeb6b8335ec0
Author: wu-a-ge <[email protected]>
AuthorDate: Thu Aug 10 19:12:17 2023 +0800

    [Feature][Zeta]The expiration time of a historical Job can be config (#5180)
    
    * fix:hdfs Checkpoint Storage management fails to delete historical files
    Co-authored-by: hailin0 <[email protected]>
---
 config/seatunnel.yaml                              |  1 +
 docs/en/seatunnel-engine/deployment.md             | 12 ++++++++++
 .../seatunnel/engine/e2e/JobExecutionIT.java       | 26 ++++++++++++++++++++++
 .../src/test/resources/seatunnel.yaml              |  1 +
 .../engine/common/config/EngineConfig.java         | 10 +++++++++
 .../config/YamlSeaTunnelDomConfigProcessor.java    |  5 +++++
 .../common/config/server/ServerConfigOptions.java  |  5 +++++
 .../engine/server/CoordinatorService.java          |  3 ++-
 .../engine/server/master/JobHistoryService.java    | 13 ++++++-----
 9 files changed, 70 insertions(+), 6 deletions(-)

diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml
index 7e496ca39a..7c689a328d 100644
--- a/config/seatunnel.yaml
+++ b/config/seatunnel.yaml
@@ -17,6 +17,7 @@
 
 seatunnel:
   engine:
+    history-job-expire-minutes: 1440
     backup-count: 1
     queue-type: blockingqueue
     print-execution-info-interval: 60
diff --git a/docs/en/seatunnel-engine/deployment.md 
b/docs/en/seatunnel-engine/deployment.md
index c07cd45d6b..1f8692530c 100644
--- a/docs/en/seatunnel-engine/deployment.md
+++ b/docs/en/seatunnel-engine/deployment.md
@@ -103,6 +103,18 @@ seatunnel:
 
 About the checkpoint storage, you can see [checkpoint 
storage](checkpoint-storage.md)
 
+### 4.4 Historical Job expiration Config
+
+The information about each completed Job, such as status, counters, and error 
logs, is stored in the IMap object. As the number of running jobs increases, 
the memory increases and eventually the memory will overflow. Therefore, you 
can adjust the history-job-expire-minutes parameter to solve this problem. The 
time unit of this parameter is minute. The default value is 1440 minutes, that 
is, one day.
+
+Example
+
+```
+seatunnel:
+  engine:
+    history-job-expire-minutes: 1440
+```
+
 ## 5. Config SeaTunnel Engine Server
 
 All SeaTunnel Engine Server config in `hazelcast.yaml` file.
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
index 5fa521eb50..b666b994c7 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobExecutionIT.java
@@ -182,6 +182,32 @@ public class JobExecutionIT {
         
Assertions.assertTrue(result.getError().startsWith("java.lang.NumberFormatException"));
     }
 
+    @Test
+    public void testExpiredJobWasDeleted() throws Exception {
+        Common.setDeployMode(DeployMode.CLIENT);
+        String filePath = 
TestUtils.getResource("batch_fakesource_to_file.conf");
+        JobConfig jobConfig = new JobConfig();
+        jobConfig.setName("job_expire");
+
+        ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
+        
clientConfig.setClusterName(TestUtils.getClusterName("JobExecutionIT"));
+        SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+        JobExecutionEnvironment jobExecutionEnv =
+                engineClient.createExecutionContext(filePath, jobConfig);
+
+        final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
+
+        JobResult result = clientJobProxy.doWaitForJobComplete().get();
+        Assertions.assertEquals(result.getStatus(), JobStatus.FINISHED);
+        Awaitility.await()
+                .atMost(65, TimeUnit.SECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertThrowsExactly(
+                                        NullPointerException.class,
+                                        () -> clientJobProxy.getJobStatus()));
+    }
+
     @AfterAll
     static void afterClass() {
         if (hazelcastInstance != null) {
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
index 16b9f55c30..3897ae9503 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
@@ -17,6 +17,7 @@
 
 seatunnel:
   engine:
+    history-job-expire-minutes: 1
     backup-count: 2
     queue-type: blockingqueue
     print-execution-info-interval: 10
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
index edc18a0b15..e162b428bb 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/EngineConfig.java
@@ -32,6 +32,7 @@ import static 
com.hazelcast.internal.util.Preconditions.checkPositive;
 @Data
 @SuppressWarnings("checkstyle:MagicNumber")
 public class EngineConfig {
+
     private int backupCount = ServerConfigOptions.BACKUP_COUNT.defaultValue();
     private int printExecutionInfoInterval =
             ServerConfigOptions.PRINT_EXECUTION_INFO_INTERVAL.defaultValue();
@@ -50,6 +51,8 @@ public class EngineConfig {
     private CheckpointConfig checkpointConfig = 
ServerConfigOptions.CHECKPOINT.defaultValue();
 
     private QueueType queueType = 
ServerConfigOptions.QUEUE_TYPE.defaultValue();
+    private int historyJobExpireMinutes =
+            ServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES.defaultValue();
 
     public void setBackupCount(int newBackupCount) {
         checkBackupCount(newBackupCount, 0);
@@ -82,6 +85,13 @@ public class EngineConfig {
         this.taskExecutionThreadShareMode = taskExecutionThreadShareMode;
     }
 
+    public void setHistoryJobExpireMinutes(int historyJobExpireMinutes) {
+        checkPositive(
+                historyJobExpireMinutes,
+                ServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES + " must be > 
0");
+        this.historyJobExpireMinutes = historyJobExpireMinutes;
+    }
+
     public EngineConfig setQueueType(QueueType queueType) {
         checkNotNull(queueType);
         this.queueType = queueType;
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index a901fbb5e6..5b8bbf6976 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -131,6 +131,11 @@ public class YamlSeaTunnelDomConfigProcessor extends 
AbstractDomConfigProcessor
                 
engineConfig.setSlotServiceConfig(parseSlotServiceConfig(node));
             } else if (ServerConfigOptions.CHECKPOINT.key().equals(name)) {
                 engineConfig.setCheckpointConfig(parseCheckpointConfig(node));
+            } else if 
(ServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES.key().equals(name)) {
+                engineConfig.setHistoryJobExpireMinutes(
+                        getIntegerValue(
+                                
ServerConfigOptions.HISTORY_JOB_EXPIRE_MINUTES.key(),
+                                getTextContent(node)));
             } else {
                 LOGGER.warning("Unrecognized element: " + name);
             }
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index 2de8acad01..2409e59ca2 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -145,4 +145,9 @@ public class ServerConfigOptions {
                     .type(new TypeReference<Map<String, String>>() {})
                     .noDefaultValue()
                     .withDescription("The checkpoint storage instance 
configuration.");
+    public static final Option<Integer> HISTORY_JOB_EXPIRE_MINUTES =
+            Options.key("history-job-expire-minutes")
+                    .intType()
+                    .defaultValue(1440)
+                    .withDescription("The expire time of history jobs.time 
unit minute");
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
index fde8b3744c..1f87218de0 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/CoordinatorService.java
@@ -219,7 +219,8 @@ public class CoordinatorService {
                                 .getMap(Constant.IMAP_FINISHED_JOB_METRICS),
                         nodeEngine
                                 .getHazelcastInstance()
-                                
.getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO));
+                                
.getMap(Constant.IMAP_FINISHED_JOB_VERTEX_INFO),
+                        engineConfig.getHistoryJobExpireMinutes());
 
         List<CompletableFuture<Void>> collect =
                 runningJobInfoIMap.entrySet().stream()
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
index 20961adf70..686d2a04fe 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobHistoryService.java
@@ -76,20 +76,22 @@ public class JobHistoryService {
      * finishedJobStateImap key is jobId and value is jobState(json) 
JobStateData Indicates the
      * status of the job, pipeline, and task
      */
-    // TODO need to limit the amount of storage
     private final IMap<Long, JobState> finishedJobStateImap;
 
     private final IMap<Long, JobMetrics> finishedJobMetricsImap;
 
     private final ObjectMapper objectMapper;
 
+    private final int finishedJobExpireTime;
+
     public JobHistoryService(
             IMap<Object, Object> runningJobStateIMap,
             ILogger logger,
             Map<Long, JobMaster> runningJobMasterMap,
             IMap<Long, JobState> finishedJobStateImap,
             IMap<Long, JobMetrics> finishedJobMetricsImap,
-            IMap<Long, JobDAGInfo> finishedJobVertexInfoImap) {
+            IMap<Long, JobDAGInfo> finishedJobVertexInfoImap,
+            int finishedJobExpireTime) {
         this.runningJobStateIMap = runningJobStateIMap;
         this.logger = logger;
         this.runningJobMasterMap = runningJobMasterMap;
@@ -98,6 +100,7 @@ public class JobHistoryService {
         this.finishedJobDAGInfoImap = finishedJobVertexInfoImap;
         this.objectMapper = new ObjectMapper();
         this.objectMapper.configure(SerializationFeature.FAIL_ON_EMPTY_BEANS, 
false);
+        this.finishedJobExpireTime = finishedJobExpireTime;
     }
 
     // Gets the status of a running and completed job
@@ -169,14 +172,14 @@ public class JobHistoryService {
         JobState jobState = toJobStateMapper(jobMaster, false);
         jobState.setFinishTime(System.currentTimeMillis());
         jobState.setErrorMessage(jobMaster.getErrorMessage());
-        finishedJobStateImap.put(jobState.jobId, jobState, 14, TimeUnit.DAYS);
+        finishedJobStateImap.put(jobState.jobId, jobState, 
finishedJobExpireTime, TimeUnit.MINUTES);
     }
 
     @SuppressWarnings("checkstyle:MagicNumber")
     public void storeFinishedPipelineMetrics(long jobId, JobMetrics metrics) {
         finishedJobMetricsImap.computeIfAbsent(jobId, key -> JobMetrics.of(new 
HashMap<>()));
         JobMetrics newMetrics = 
finishedJobMetricsImap.get(jobId).merge(metrics);
-        finishedJobMetricsImap.put(jobId, newMetrics, 14, TimeUnit.DAYS);
+        finishedJobMetricsImap.put(jobId, newMetrics, finishedJobExpireTime, 
TimeUnit.MINUTES);
     }
 
     private JobState toJobStateMapper(JobMaster jobMaster, boolean simple) {
@@ -236,7 +239,7 @@ public class JobHistoryService {
     }
 
     public void storeJobInfo(long jobId, JobDAGInfo jobInfo) {
-        finishedJobDAGInfoImap.put(jobId, jobInfo);
+        finishedJobDAGInfoImap.put(jobId, jobInfo, finishedJobExpireTime, 
TimeUnit.MINUTES);
     }
 
     @AllArgsConstructor

Reply via email to