This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new ad4f1fcdea [Bugfix][zeta] Resolved the issue causing checkpoints to 
halt on tolerable-failure=0. (#5263)
ad4f1fcdea is described below

commit ad4f1fcdeab3406e500f94e74b92cebb157ee99e
Author: ic4y <[email protected]>
AuthorDate: Sun Aug 13 22:09:06 2023 +0800

    [Bugfix][zeta] Resolved the issue causing checkpoints to halt on 
tolerable-failure=0. (#5263)
    
    * [Bugfix][zeta] Resolved the issue causing checkpoints to halt on 
tolerable-failure=0.
    
    * remove max-concurrent
---
 config/seatunnel.yaml                              |  2 --
 docs/en/seatunnel-engine/checkpoint-storage.md     |  8 --------
 docs/en/seatunnel-engine/deployment.md             | 10 ----------
 .../src/test/resources/seatunnel.yaml              |  2 --
 .../src/test/resources/seatunnel.yaml              |  2 --
 .../config/YamlSeaTunnelDomConfigProcessor.java    | 10 ----------
 .../common/config/server/CheckpointConfig.java     | 18 ------------------
 .../common/config/server/ServerConfigOptions.java  | 12 ------------
 .../src/main/resources/seatunnel.yaml              |  2 --
 .../config/YamlSeaTunnelConfigParserTest.java      |  6 ------
 .../src/test/resources/seatunnel.yaml              |  2 --
 .../server/checkpoint/CheckpointCoordinator.java   | 22 +++-------------------
 .../seatunnel/engine/server/master/JobMaster.java  |  4 ----
 .../src/test/resources/seatunnel.yaml              |  2 --
 14 files changed, 3 insertions(+), 99 deletions(-)

diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml
index 7c689a328d..5961c83923 100644
--- a/config/seatunnel.yaml
+++ b/config/seatunnel.yaml
@@ -27,8 +27,6 @@ seatunnel:
     checkpoint:
       interval: 10000
       timeout: 60000
-      max-concurrent: 1
-      tolerable-failure: 2
       storage:
         type: hdfs
         max-retained: 3
diff --git a/docs/en/seatunnel-engine/checkpoint-storage.md 
b/docs/en/seatunnel-engine/checkpoint-storage.md
index a88f301439..afe1fa6bc1 100644
--- a/docs/en/seatunnel-engine/checkpoint-storage.md
+++ b/docs/en/seatunnel-engine/checkpoint-storage.md
@@ -59,8 +59,6 @@ seatunnel:
     checkpoint:
       interval: 6000
       timeout: 7000
-      max-concurrent: 1
-      tolerable-failure: 2
       storage:
         type: hdfs
         max-retained: 3
@@ -94,8 +92,6 @@ seatunnel:
         checkpoint:
             interval: 6000
             timeout: 7000
-            max-concurrent: 1
-            tolerable-failure: 2
             storage:
                 type: hdfs
                 max-retained: 3
@@ -119,8 +115,6 @@ seatunnel:
     checkpoint:
       interval: 6000
       timeout: 7000
-      max-concurrent: 1
-      tolerable-failure: 2
       storage:
         type: hdfs
         max-retained: 3
@@ -160,8 +154,6 @@ seatunnel:
     checkpoint:
       interval: 6000
       timeout: 7000
-      max-concurrent: 1
-      tolerable-failure: 2
       storage:
         type: hdfs
         max-retained: 3
diff --git a/docs/en/seatunnel-engine/deployment.md 
b/docs/en/seatunnel-engine/deployment.md
index 1f8692530c..18c1a587a2 100644
--- a/docs/en/seatunnel-engine/deployment.md
+++ b/docs/en/seatunnel-engine/deployment.md
@@ -75,14 +75,6 @@ The interval between two checkpoints, unit is milliseconds. 
If the `checkpoint.i
 
 The timeout of a checkpoint. If a checkpoint cannot be completed within the 
timeout period, a checkpoint failure will be triggered. Therefore, Job will be 
restored.
 
-**max-concurrent**
-
-How many checkpoints can be performed simultaneously at most.
-
-**tolerable-failure**
-
-Maximum number of retries after checkpoint failure.
-
 Example
 
 ```
@@ -95,8 +87,6 @@ seatunnel:
         checkpoint:
             interval: 300000
             timeout: 10000
-            max-concurrent: 1
-            tolerable-failure: 2
 ```
 
 **checkpoint storage**
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
index 3897ae9503..4276fc8791 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
@@ -26,8 +26,6 @@ seatunnel:
     checkpoint:
       interval: 300000
       timeout: 10000
-      max-concurrent: 1
-      tolerable-failure: 2
       storage:
         type: localfile
         max-retained: 3
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/seatunnel.yaml 
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/seatunnel.yaml
index ea5b5ac230..4678cfed3d 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/seatunnel.yaml
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/seatunnel.yaml
@@ -24,8 +24,6 @@ seatunnel:
         checkpoint:
             interval: 6000
             timeout: 7000
-            max-concurrent: 1
-            tolerable-failure: 2
             storage:
                 type: hdfs
                 max-retained: 3
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index 5b8bbf6976..2010d1f415 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -161,16 +161,6 @@ public class YamlSeaTunnelDomConfigProcessor extends 
AbstractDomConfigProcessor
                         getIntegerValue(
                                 
ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.key(),
                                 getTextContent(node)));
-            } else if 
(ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.key().equals(name)) {
-                checkpointConfig.setMaxConcurrentCheckpoints(
-                        getIntegerValue(
-                                
ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.key(),
-                                getTextContent(node)));
-            } else if 
(ServerConfigOptions.CHECKPOINT_TOLERABLE_FAILURE.key().equals(name)) {
-                checkpointConfig.setTolerableFailureCheckpoints(
-                        getIntegerValue(
-                                
ServerConfigOptions.CHECKPOINT_TOLERABLE_FAILURE.key(),
-                                getTextContent(node)));
             } else if 
(ServerConfigOptions.CHECKPOINT_STORAGE.key().equals(name)) {
                 
checkpointConfig.setStorage(parseCheckpointStorageConfig(node));
             } else {
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
index 7038a65b42..8d521f2b8b 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
@@ -33,10 +33,6 @@ public class CheckpointConfig implements Serializable {
     private long checkpointTimeout = 
ServerConfigOptions.CHECKPOINT_TIMEOUT.defaultValue();
     private long schemaChangeCheckpointTimeout =
             
ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.defaultValue();
-    private int maxConcurrentCheckpoints =
-            ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.defaultValue();
-    private int tolerableFailureCheckpoints =
-            ServerConfigOptions.CHECKPOINT_TOLERABLE_FAILURE.defaultValue();
 
     private CheckpointStorageConfig storage = 
ServerConfigOptions.CHECKPOINT_STORAGE.defaultValue();
 
@@ -60,18 +56,4 @@ public class CheckpointConfig implements Serializable {
                 "The minimum checkpoint timeout is 10 ms.");
         this.schemaChangeCheckpointTimeout = checkpointTimeout;
     }
-
-    public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints) {
-        checkArgument(
-                maxConcurrentCheckpoints >= 1,
-                "The minimum number of concurrent checkpoints is 1.");
-        this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
-    }
-
-    public void setTolerableFailureCheckpoints(int 
tolerableFailureCheckpoints) {
-        checkArgument(
-                maxConcurrentCheckpoints >= 0,
-                "The number of tolerance failed checkpoints must be a natural 
number.");
-        this.tolerableFailureCheckpoints = tolerableFailureCheckpoints;
-    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index 2409e59ca2..486f11878e 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -92,18 +92,6 @@ public class ServerConfigOptions {
                     .withDescription(
                             "The timeout (in milliseconds) for a schema change 
checkpoint.");
 
-    public static final Option<Integer> CHECKPOINT_MAX_CONCURRENT =
-            Options.key("max-concurrent")
-                    .intType()
-                    .defaultValue(1)
-                    .withDescription("The maximum number of concurrent 
checkpoints.");
-
-    public static final Option<Integer> CHECKPOINT_TOLERABLE_FAILURE =
-            Options.key("tolerable-failure")
-                    .intType()
-                    .defaultValue(0)
-                    .withDescription("The tolerable failure number of a 
checkpoint.");
-
     public static final Option<String> CHECKPOINT_STORAGE_TYPE =
             Options.key("type")
                     .stringType()
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml 
b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml
index e5d92281da..cc14d81eaf 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml
@@ -25,8 +25,6 @@ seatunnel:
         checkpoint:
             interval: 300000
             timeout: 10000
-            max-concurrent: 1
-            tolerable-failure: 2
             storage:
                 type: hdfs
                 max-retained: 3
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
 
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
index 4c199b352e..ed6853e39b 100644
--- 
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
@@ -53,12 +53,6 @@ public class YamlSeaTunnelConfigParserTest {
         Assertions.assertEquals(
                 7000, 
config.getEngineConfig().getCheckpointConfig().getCheckpointTimeout());
 
-        Assertions.assertEquals(
-                1, 
config.getEngineConfig().getCheckpointConfig().getMaxConcurrentCheckpoints());
-
-        Assertions.assertEquals(
-                2, 
config.getEngineConfig().getCheckpointConfig().getTolerableFailureCheckpoints());
-
         Assertions.assertEquals(
                 "hdfs", 
config.getEngineConfig().getCheckpointConfig().getStorage().getStorage());
 
diff --git 
a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml 
b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
index 4f6ce5f4ef..8453bdeeca 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
+++ b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
@@ -25,8 +25,6 @@ seatunnel:
         checkpoint:
             interval: 6000
             timeout: 7000
-            max-concurrent: 1
-            tolerable-failure: 2
             storage:
                 type: hdfs
                 max-retained: 3
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 1b6bc6b687..222f60a5cb 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -117,7 +117,6 @@ public class CheckpointCoordinator {
 
     private final CheckpointConfig coordinatorConfig;
 
-    private int tolerableFailureCheckpoints;
     private transient ScheduledExecutorService scheduler;
 
     private final AtomicLong latestTriggerTimestamp = new AtomicLong(0);
@@ -165,7 +164,6 @@ public class CheckpointCoordinator {
         this.runningJobStateIMap = runningJobStateIMap;
         this.plan = plan;
         this.coordinatorConfig = checkpointConfig;
-        this.tolerableFailureCheckpoints = 
coordinatorConfig.getTolerableFailureCheckpoints();
         this.pendingCheckpoints = new ConcurrentHashMap<>();
         this.completedCheckpoints =
                 new 
ArrayDeque<>(coordinatorConfig.getStorage().getMaxRetainedCheckpoints() + 1);
@@ -392,7 +390,6 @@ public class CheckpointCoordinator {
         if (checkpointType.notFinalCheckpoint() && 
checkpointType.notSchemaChangeCheckpoint()) {
             if (currentTimestamp - latestTriggerTimestamp.get()
                             < coordinatorConfig.getCheckpointInterval()
-                    || pendingCounter.get() >= 
coordinatorConfig.getMaxConcurrentCheckpoints()
                     || !isAllTaskReady) {
                 return;
             }
@@ -531,16 +528,9 @@ public class CheckpointCoordinator {
                                 if 
(pendingCheckpoints.get(pendingCheckpoint.getCheckpointId())
                                                 != null
                                         && 
!pendingCheckpoint.isFullyAcknowledged()) {
-                                    if (tolerableFailureCheckpoints-- <= 0
-                                            || pendingCheckpoint
-                                                    .getCheckpointType()
-                                                    
.isSchemaChangeCheckpoint()) {
-                                        LOG.info(
-                                                "timeout checkpoint: "
-                                                        + 
pendingCheckpoint.getInfo());
-                                        handleCoordinatorError(
-                                                
CheckpointCloseReason.CHECKPOINT_EXPIRED, null);
-                                    }
+                                    LOG.info("timeout checkpoint: " + 
pendingCheckpoint.getInfo());
+                                    handleCoordinatorError(
+                                            
CheckpointCloseReason.CHECKPOINT_EXPIRED, null);
                                 }
                             },
                             checkpointTimeout,
@@ -746,12 +736,6 @@ public class CheckpointCoordinator {
         notifyCompleted(completedCheckpoint);
         pendingCheckpoints.remove(checkpointId);
         pendingCounter.decrementAndGet();
-        if (pendingCheckpoints.size() + 1 == 
coordinatorConfig.getMaxConcurrentCheckpoints()) {
-            // latest checkpoint completed time > checkpoint interval
-            if (completedCheckpoint.getCheckpointType().notFinalCheckpoint()) {
-                scheduleTriggerPendingCheckpoint(0L);
-            }
-        }
         if (isCompleted()) {
             
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED);
             if (latestCompletedCheckpoint.getCheckpointType().isSavepoint()) {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 404956a7e7..5137f23b7b 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -264,10 +264,6 @@ public class JobMaster {
         CheckpointConfig jobCheckpointConfig = new CheckpointConfig();
         
jobCheckpointConfig.setCheckpointTimeout(defaultCheckpointConfig.getCheckpointTimeout());
         
jobCheckpointConfig.setCheckpointInterval(defaultCheckpointConfig.getCheckpointInterval());
-        jobCheckpointConfig.setMaxConcurrentCheckpoints(
-                defaultCheckpointConfig.getMaxConcurrentCheckpoints());
-        jobCheckpointConfig.setTolerableFailureCheckpoints(
-                defaultCheckpointConfig.getTolerableFailureCheckpoints());
 
         CheckpointStorageConfig jobCheckpointStorageConfig = new 
CheckpointStorageConfig();
         
jobCheckpointStorageConfig.setStorage(defaultCheckpointConfig.getStorage().getStorage());
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel.yaml 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel.yaml
index 8f22b0613c..f8739cc483 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel.yaml
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel.yaml
@@ -25,8 +25,6 @@ seatunnel:
         checkpoint:
             interval: 6000
             timeout: 7000
-            max-concurrent: 1
-            tolerable-failure: 2
             storage:
                 type: hdfs
                 max-retained: 3

Reply via email to