This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new ad4f1fcdea [Bugfix][zeta] Resolved the issue causing checkpoints to
halt on tolerable-failure=0. (#5263)
ad4f1fcdea is described below
commit ad4f1fcdeab3406e500f94e74b92cebb157ee99e
Author: ic4y <[email protected]>
AuthorDate: Sun Aug 13 22:09:06 2023 +0800
[Bugfix][zeta] Resolved the issue causing checkpoints to halt on
tolerable-failure=0. (#5263)
* [Bugfix][zeta] Resolved the issue causing checkpoints to halt on
tolerable-failure=0.
* remove max-concurrent
---
config/seatunnel.yaml | 2 --
docs/en/seatunnel-engine/checkpoint-storage.md | 8 --------
docs/en/seatunnel-engine/deployment.md | 10 ----------
.../src/test/resources/seatunnel.yaml | 2 --
.../src/test/resources/seatunnel.yaml | 2 --
.../config/YamlSeaTunnelDomConfigProcessor.java | 10 ----------
.../common/config/server/CheckpointConfig.java | 18 ------------------
.../common/config/server/ServerConfigOptions.java | 12 ------------
.../src/main/resources/seatunnel.yaml | 2 --
.../config/YamlSeaTunnelConfigParserTest.java | 6 ------
.../src/test/resources/seatunnel.yaml | 2 --
.../server/checkpoint/CheckpointCoordinator.java | 22 +++-------------------
.../seatunnel/engine/server/master/JobMaster.java | 4 ----
.../src/test/resources/seatunnel.yaml | 2 --
14 files changed, 3 insertions(+), 99 deletions(-)
diff --git a/config/seatunnel.yaml b/config/seatunnel.yaml
index 7c689a328d..5961c83923 100644
--- a/config/seatunnel.yaml
+++ b/config/seatunnel.yaml
@@ -27,8 +27,6 @@ seatunnel:
checkpoint:
interval: 10000
timeout: 60000
- max-concurrent: 1
- tolerable-failure: 2
storage:
type: hdfs
max-retained: 3
diff --git a/docs/en/seatunnel-engine/checkpoint-storage.md
b/docs/en/seatunnel-engine/checkpoint-storage.md
index a88f301439..afe1fa6bc1 100644
--- a/docs/en/seatunnel-engine/checkpoint-storage.md
+++ b/docs/en/seatunnel-engine/checkpoint-storage.md
@@ -59,8 +59,6 @@ seatunnel:
checkpoint:
interval: 6000
timeout: 7000
- max-concurrent: 1
- tolerable-failure: 2
storage:
type: hdfs
max-retained: 3
@@ -94,8 +92,6 @@ seatunnel:
checkpoint:
interval: 6000
timeout: 7000
- max-concurrent: 1
- tolerable-failure: 2
storage:
type: hdfs
max-retained: 3
@@ -119,8 +115,6 @@ seatunnel:
checkpoint:
interval: 6000
timeout: 7000
- max-concurrent: 1
- tolerable-failure: 2
storage:
type: hdfs
max-retained: 3
@@ -160,8 +154,6 @@ seatunnel:
checkpoint:
interval: 6000
timeout: 7000
- max-concurrent: 1
- tolerable-failure: 2
storage:
type: hdfs
max-retained: 3
diff --git a/docs/en/seatunnel-engine/deployment.md
b/docs/en/seatunnel-engine/deployment.md
index 1f8692530c..18c1a587a2 100644
--- a/docs/en/seatunnel-engine/deployment.md
+++ b/docs/en/seatunnel-engine/deployment.md
@@ -75,14 +75,6 @@ The interval between two checkpoints, unit is milliseconds.
If the `checkpoint.i
The timeout of a checkpoint. If a checkpoint cannot be completed within the
timeout period, a checkpoint failure will be triggered. Therefore, Job will be
restored.
-**max-concurrent**
-
-How many checkpoints can be performed simultaneously at most.
-
-**tolerable-failure**
-
-Maximum number of retries after checkpoint failure.
-
Example
```
@@ -95,8 +87,6 @@ seatunnel:
checkpoint:
interval: 300000
timeout: 10000
- max-concurrent: 1
- tolerable-failure: 2
```
**checkpoint storage**
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
index 3897ae9503..4276fc8791 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel.yaml
@@ -26,8 +26,6 @@ seatunnel:
checkpoint:
interval: 300000
timeout: 10000
- max-concurrent: 1
- tolerable-failure: 2
storage:
type: localfile
max-retained: 3
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/seatunnel.yaml
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/seatunnel.yaml
index ea5b5ac230..4678cfed3d 100644
--- a/seatunnel-engine/seatunnel-engine-client/src/test/resources/seatunnel.yaml
+++ b/seatunnel-engine/seatunnel-engine-client/src/test/resources/seatunnel.yaml
@@ -24,8 +24,6 @@ seatunnel:
checkpoint:
interval: 6000
timeout: 7000
- max-concurrent: 1
- tolerable-failure: 2
storage:
type: hdfs
max-retained: 3
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
index 5b8bbf6976..2010d1f415 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelDomConfigProcessor.java
@@ -161,16 +161,6 @@ public class YamlSeaTunnelDomConfigProcessor extends
AbstractDomConfigProcessor
getIntegerValue(
ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.key(),
getTextContent(node)));
- } else if
(ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.key().equals(name)) {
- checkpointConfig.setMaxConcurrentCheckpoints(
- getIntegerValue(
-
ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.key(),
- getTextContent(node)));
- } else if
(ServerConfigOptions.CHECKPOINT_TOLERABLE_FAILURE.key().equals(name)) {
- checkpointConfig.setTolerableFailureCheckpoints(
- getIntegerValue(
-
ServerConfigOptions.CHECKPOINT_TOLERABLE_FAILURE.key(),
- getTextContent(node)));
} else if
(ServerConfigOptions.CHECKPOINT_STORAGE.key().equals(name)) {
checkpointConfig.setStorage(parseCheckpointStorageConfig(node));
} else {
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
index 7038a65b42..8d521f2b8b 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
@@ -33,10 +33,6 @@ public class CheckpointConfig implements Serializable {
private long checkpointTimeout =
ServerConfigOptions.CHECKPOINT_TIMEOUT.defaultValue();
private long schemaChangeCheckpointTimeout =
ServerConfigOptions.SCHEMA_CHANGE_CHECKPOINT_TIMEOUT.defaultValue();
- private int maxConcurrentCheckpoints =
- ServerConfigOptions.CHECKPOINT_MAX_CONCURRENT.defaultValue();
- private int tolerableFailureCheckpoints =
- ServerConfigOptions.CHECKPOINT_TOLERABLE_FAILURE.defaultValue();
private CheckpointStorageConfig storage =
ServerConfigOptions.CHECKPOINT_STORAGE.defaultValue();
@@ -60,18 +56,4 @@ public class CheckpointConfig implements Serializable {
"The minimum checkpoint timeout is 10 ms.");
this.schemaChangeCheckpointTimeout = checkpointTimeout;
}
-
- public void setMaxConcurrentCheckpoints(int maxConcurrentCheckpoints) {
- checkArgument(
- maxConcurrentCheckpoints >= 1,
- "The minimum number of concurrent checkpoints is 1.");
- this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
- }
-
- public void setTolerableFailureCheckpoints(int
tolerableFailureCheckpoints) {
- checkArgument(
- maxConcurrentCheckpoints >= 0,
- "The number of tolerance failed checkpoints must be a natural
number.");
- this.tolerableFailureCheckpoints = tolerableFailureCheckpoints;
- }
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
index 2409e59ca2..486f11878e 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/ServerConfigOptions.java
@@ -92,18 +92,6 @@ public class ServerConfigOptions {
.withDescription(
"The timeout (in milliseconds) for a schema change
checkpoint.");
- public static final Option<Integer> CHECKPOINT_MAX_CONCURRENT =
- Options.key("max-concurrent")
- .intType()
- .defaultValue(1)
- .withDescription("The maximum number of concurrent
checkpoints.");
-
- public static final Option<Integer> CHECKPOINT_TOLERABLE_FAILURE =
- Options.key("tolerable-failure")
- .intType()
- .defaultValue(0)
- .withDescription("The tolerable failure number of a
checkpoint.");
-
public static final Option<String> CHECKPOINT_STORAGE_TYPE =
Options.key("type")
.stringType()
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml
b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml
index e5d92281da..cc14d81eaf 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml
+++ b/seatunnel-engine/seatunnel-engine-common/src/main/resources/seatunnel.yaml
@@ -25,8 +25,6 @@ seatunnel:
checkpoint:
interval: 300000
timeout: 10000
- max-concurrent: 1
- tolerable-failure: 2
storage:
type: hdfs
max-retained: 3
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
index 4c199b352e..ed6853e39b 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/test/java/org/apache/seatunnel/engine/common/config/YamlSeaTunnelConfigParserTest.java
@@ -53,12 +53,6 @@ public class YamlSeaTunnelConfigParserTest {
Assertions.assertEquals(
7000,
config.getEngineConfig().getCheckpointConfig().getCheckpointTimeout());
- Assertions.assertEquals(
- 1,
config.getEngineConfig().getCheckpointConfig().getMaxConcurrentCheckpoints());
-
- Assertions.assertEquals(
- 2,
config.getEngineConfig().getCheckpointConfig().getTolerableFailureCheckpoints());
-
Assertions.assertEquals(
"hdfs",
config.getEngineConfig().getCheckpointConfig().getStorage().getStorage());
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
index 4f6ce5f4ef..8453bdeeca 100644
--- a/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
+++ b/seatunnel-engine/seatunnel-engine-common/src/test/resources/seatunnel.yaml
@@ -25,8 +25,6 @@ seatunnel:
checkpoint:
interval: 6000
timeout: 7000
- max-concurrent: 1
- tolerable-failure: 2
storage:
type: hdfs
max-retained: 3
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 1b6bc6b687..222f60a5cb 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -117,7 +117,6 @@ public class CheckpointCoordinator {
private final CheckpointConfig coordinatorConfig;
- private int tolerableFailureCheckpoints;
private transient ScheduledExecutorService scheduler;
private final AtomicLong latestTriggerTimestamp = new AtomicLong(0);
@@ -165,7 +164,6 @@ public class CheckpointCoordinator {
this.runningJobStateIMap = runningJobStateIMap;
this.plan = plan;
this.coordinatorConfig = checkpointConfig;
- this.tolerableFailureCheckpoints =
coordinatorConfig.getTolerableFailureCheckpoints();
this.pendingCheckpoints = new ConcurrentHashMap<>();
this.completedCheckpoints =
new
ArrayDeque<>(coordinatorConfig.getStorage().getMaxRetainedCheckpoints() + 1);
@@ -392,7 +390,6 @@ public class CheckpointCoordinator {
if (checkpointType.notFinalCheckpoint() &&
checkpointType.notSchemaChangeCheckpoint()) {
if (currentTimestamp - latestTriggerTimestamp.get()
< coordinatorConfig.getCheckpointInterval()
- || pendingCounter.get() >=
coordinatorConfig.getMaxConcurrentCheckpoints()
|| !isAllTaskReady) {
return;
}
@@ -531,16 +528,9 @@ public class CheckpointCoordinator {
if
(pendingCheckpoints.get(pendingCheckpoint.getCheckpointId())
!= null
&&
!pendingCheckpoint.isFullyAcknowledged()) {
- if (tolerableFailureCheckpoints-- <= 0
- || pendingCheckpoint
- .getCheckpointType()
-
.isSchemaChangeCheckpoint()) {
- LOG.info(
- "timeout checkpoint: "
- +
pendingCheckpoint.getInfo());
- handleCoordinatorError(
-
CheckpointCloseReason.CHECKPOINT_EXPIRED, null);
- }
+ LOG.info("timeout checkpoint: " +
pendingCheckpoint.getInfo());
+ handleCoordinatorError(
+
CheckpointCloseReason.CHECKPOINT_EXPIRED, null);
}
},
checkpointTimeout,
@@ -746,12 +736,6 @@ public class CheckpointCoordinator {
notifyCompleted(completedCheckpoint);
pendingCheckpoints.remove(checkpointId);
pendingCounter.decrementAndGet();
- if (pendingCheckpoints.size() + 1 ==
coordinatorConfig.getMaxConcurrentCheckpoints()) {
- // latest checkpoint completed time > checkpoint interval
- if (completedCheckpoint.getCheckpointType().notFinalCheckpoint()) {
- scheduleTriggerPendingCheckpoint(0L);
- }
- }
if (isCompleted()) {
cleanPendingCheckpoint(CheckpointCloseReason.CHECKPOINT_COORDINATOR_COMPLETED);
if (latestCompletedCheckpoint.getCheckpointType().isSavepoint()) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 404956a7e7..5137f23b7b 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -264,10 +264,6 @@ public class JobMaster {
CheckpointConfig jobCheckpointConfig = new CheckpointConfig();
jobCheckpointConfig.setCheckpointTimeout(defaultCheckpointConfig.getCheckpointTimeout());
jobCheckpointConfig.setCheckpointInterval(defaultCheckpointConfig.getCheckpointInterval());
- jobCheckpointConfig.setMaxConcurrentCheckpoints(
- defaultCheckpointConfig.getMaxConcurrentCheckpoints());
- jobCheckpointConfig.setTolerableFailureCheckpoints(
- defaultCheckpointConfig.getTolerableFailureCheckpoints());
CheckpointStorageConfig jobCheckpointStorageConfig = new
CheckpointStorageConfig();
jobCheckpointStorageConfig.setStorage(defaultCheckpointConfig.getStorage().getStorage());
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel.yaml
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel.yaml
index 8f22b0613c..f8739cc483 100644
--- a/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel.yaml
+++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/seatunnel.yaml
@@ -25,8 +25,6 @@ seatunnel:
checkpoint:
interval: 6000
timeout: 7000
- max-concurrent: 1
- tolerable-failure: 2
storage:
type: hdfs
max-retained: 3