This is an automated email from the ASF dual-hosted git repository. ic4y pushed a commit to branch dev-addTimeout in repository https://gitbox.apache.org/repos/asf/seatunnel.git
commit 29505ed98a7f6fe5c72ab45fbde783d698c08753 Author: liuli <[email protected]> AuthorDate: Thu Aug 3 21:19:58 2023 +0800 [Feature] [api env] Add job-level configuration for checkpoint timeout. --- .../main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java | 6 ++++++ .../src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java | 1 + .../core/starter/flink/execution/FlinkRuntimeEnvironment.java | 5 ++++- .../core/starter/flink/execution/FlinkRuntimeEnvironment.java | 5 ++++- .../seatunnel/engine/server/checkpoint/CheckpointCloseReason.java | 2 +- .../java/org/apache/seatunnel/engine/server/master/JobMaster.java | 5 +++++ 6 files changed, 21 insertions(+), 3 deletions(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java index bc80c66428..d076cd5367 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java @@ -51,6 +51,12 @@ public interface EnvCommonOptions { .withDescription( "The interval (in milliseconds) between two consecutive checkpoints."); + Option<Long> CHECKPOINT_TIMEOUT = + Options.key("checkpoint.timeout") + .longType() + .noDefaultValue() + .withDescription("The timeout (in milliseconds) for a checkpoint."); + Option<String> JARS = Options.key("jars") .stringType() diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java index 3a90b82e83..09310f080c 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java @@ -30,6 +30,7 @@ public class EnvOptionRule { CommonOptions.PARALLELISM, EnvCommonOptions.JARS, EnvCommonOptions.CHECKPOINT_INTERVAL, + EnvCommonOptions.CHECKPOINT_TIMEOUT, EnvCommonOptions.CUSTOM_PARAMETERS) .build(); } diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java index 7fb75064a4..34aa7ee4f2 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java @@ -265,7 +265,10 @@ public class FlinkRuntimeEnvironment implements RuntimeEnvironment { } } - if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) { + if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) { + long timeout = config.getLong(EnvCommonOptions.CHECKPOINT_TIMEOUT.key()); + checkpointConfig.setCheckpointTimeout(timeout); + } else if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) { long timeout = config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT); checkpointConfig.setCheckpointTimeout(timeout); } diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java index 4b5bef07cb..583a1cf3e5 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java @@ -265,7 +265,10 @@ public class FlinkRuntimeEnvironment implements RuntimeEnvironment { } } - if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) { + if (config.hasPath(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) { + long timeout = config.getLong(EnvCommonOptions.CHECKPOINT_TIMEOUT.key()); + checkpointConfig.setCheckpointTimeout(timeout); + } else if (config.hasPath(ConfigKeyName.CHECKPOINT_TIMEOUT)) { long timeout = config.getLong(ConfigKeyName.CHECKPOINT_TIMEOUT); checkpointConfig.setCheckpointTimeout(timeout); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java index 9f35f62fd6..c07f10fb1c 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCloseReason.java @@ -20,7 +20,7 @@ package org.apache.seatunnel.engine.server.checkpoint; public enum CheckpointCloseReason { PIPELINE_END("Pipeline turn to end state."), CHECKPOINT_EXPIRED( - "Checkpoint expired before completing. Please increase checkpoint timeout in the seatunnel.yaml"), + "Checkpoint expired before completing. Please increase checkpoint timeout in the seatunnel.yaml or jobConfig env."), CHECKPOINT_COORDINATOR_COMPLETED("CheckpointCoordinator completed."), CHECKPOINT_COORDINATOR_SHUTDOWN("CheckpointCoordinator shutdown."), CHECKPOINT_COORDINATOR_RESET("CheckpointCoordinator reset."), diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index e14d946c81..9c59af5099 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -276,6 +276,11 @@ public class JobMaster { Long.parseLong( jobEnv.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()).toString())); } + if (jobEnv.containsKey(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) { + jobCheckpointConfig.setCheckpointTimeout( + Long.parseLong( + jobEnv.get(EnvCommonOptions.CHECKPOINT_TIMEOUT.key()).toString())); + } return jobCheckpointConfig; }
