This is an automated email from the ASF dual-hosted git repository.

ic4y pushed a commit to branch dev-addTimeout
in repository https://gitbox.apache.org/repos/asf/seatunnel.git

commit 29505ed98a7f6fe5c72ab45fbde783d698c08753
Author: liuli <[email protected]>
AuthorDate: Thu Aug 3 21:19:58 2023 +0800

    [Feature] [api env] Add job-level configuration for checkpoint timeout.
---
 .../main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java    | 6 ++++++
 .../src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java   | 1 +
 .../core/starter/flink/execution/FlinkRuntimeEnvironment.java       | 5 ++++-
 .../core/starter/flink/execution/FlinkRuntimeEnvironment.java       | 5 ++++-
 .../seatunnel/engine/server/checkpoint/CheckpointCloseReason.java   | 2 +-
 .../java/org/apache/seatunnel/engine/server/master/JobMaster.java   | 5 +++++
 6 files changed, 21 insertions(+), 3 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index bc80c66428..d076cd5367 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -51,6 +51,12 @@ public interface EnvCommonOptions {
                     .withDescription(
                             "The interval (in milliseconds) between two 
consecutive checkpoints.");
 
+    Option<Long> CHECKPOINT_TIMEOUT =
+            Options.key("checkpoint.timeout")
+                    .longType()
+                    .noDefaultValue()
+                    .withDescription("The timeout (in milliseconds) for a 
checkpoint.");
+
     Option<String> JARS =
             Options.key("jars")
                     .stringType()
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
index 3a90b82e83..09310f080c 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
@@ -30,6 +30,7 @@ public class EnvOptionRule {
                         CommonOptions.PARALLELISM,
                         EnvCommonOptions.JARS,
                         EnvCommonOptions.CHECKPOINT_INTERVAL,
+                        EnvCommonOptions.CHECKPOINT_TIMEOUT,
                         EnvCommonOptions.CUSTOM_PARAMETERS)
                 .build();
     }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 7fb75064a4..34aa7ee4f2 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -265,7 +265,10 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
                 }
             }
 
-            if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) {
+            if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
+                long timeout = 
config.getLong(EnvCommonOptions.CHECKPOINT_TIMEOUT.key());
+                checkpointConfig.setCheckpointTimeout(timeout);
+            } else if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) {
                 long timeout = 
config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT);
                 checkpointConfig.setCheckpointTimeout(timeout);
             }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 4b5bef07cb..583a1cf3e5 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -265,7 +265,10 @@ public class FlinkRuntimeEnvironment implements 
RuntimeEnvironment {
                 }
             }
 
-            if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) {
+            if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
+                long timeout = 
config.getLong(EnvCommonOptions.CHECKPOINT_TIMEOUT.key());
+                checkpointConfig.setCheckpointTimeout(timeout);
+            } else if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) {
                 long timeout = 
config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT);
                 checkpointConfig.setCheckpointTimeout(timeout);
             }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
index 9f35f62fd6..c07f10fb1c 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java
@@ -20,7 +20,7 @@ package org.apache.seatunnel.engine.server.checkpoint;
 public enum CheckpointCloseReason {
     PIPELINE_END("Pipeline turn to end state."),
     CHECKPOINT_EXPIRED(
-            "Checkpoint expired before completing. Please increase checkpoint 
timeout in the seatunnel.yaml"),
+            "Checkpoint expired before completing. Please increase checkpoint 
timeout in the seatunnel.yaml or jobConfig env."),
     CHECKPOINT_COORDINATOR_COMPLETED("CheckpointCoordinator completed."),
     CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."),
     CHECKPOINT_COORDINATOR_RESET("CheckpointCoordinator reset."),
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index e14d946c81..9c59af5099 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -276,6 +276,11 @@ public class JobMaster {
                     Long.parseLong(
                             
jobEnv.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()).toString()));
         }
+        if (jobEnv.containsKey(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
+            jobCheckpointConfig.setCheckpointTimeout(
+                    Long.parseLong(
+                            
jobEnv.get(EnvCommonOptions.CHECKPOINT_TIMEOUT.key()).toString()));
+        }
         return jobCheckpointConfig;
     }
 

Reply via email to