This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new fc914759ff [Feature][Zeta] In batch mode, checkpoint can be disabled
(#5914)
fc914759ff is described below
commit fc914759ff938130a906d9fa0e814b78dbd44614
Author: happyboy1024 <[email protected]>
AuthorDate: Wed Nov 29 09:14:42 2023 +0800
[Feature][Zeta] In batch mode, checkpoint can be disabled (#5914)
---------
Co-authored-by: dengjunjie <[email protected]>
---
docs/en/concept/JobEnvConfig.md | 2 +
.../src/test/resources/conf/getCatalogTable.conf | 1 -
.../flink/execution/FlinkRuntimeEnvironment.java | 5 +-
.../test/resources/args/user_defined_params.conf | 1 -
.../resources/jdbc_mysql_source_and_sink_xa.conf | 1 -
.../resources/jdbc_hive_source_and_assert.conf | 1 -
.../jdbc_postgres_source_and_sink_xa.conf | 1 -
.../resources/neo4j/fake_to_neo4j_batch_write.conf | 1 -
.../test/resources/batch_pulsar_to_console.conf | 1 -
.../flink/AbstractTestFlinkContainer.java | 5 +
.../connector-seatunnel-e2e-base/pom.xml | 30 +++
.../seatunnel/engine/e2e/CheckpointEnableIT.java | 279 +++++++++++++++++++++
.../test/resources/batch_fakesource_to_file.conf | 1 -
.../batch_fakesource_to_file_complex.conf | 1 -
...fakesource_to_localfile_checkpoint_disable.conf | 26 +-
.../sink_file_text_to_assert.conf | 37 +--
..._fakesource_to_localfile_checkpoint_enable.conf | 27 +-
.../sink_file_text_to_assert.conf | 37 +--
.../sink_file_text_to_assert.conf | 37 +--
.../stream_fakesource_to_localfile.conf | 18 +-
.../stream_fakesource_to_localfile_interval.conf | 19 +-
.../src/test/resources/log4j2.properties | 4 +
.../test/resources/batch_fakesource_to_file.conf | 1 -
.../batch_fakesource_to_file_complex.conf | 1 -
.../resources/batch_fakesource_to_two_file.conf | 1 -
.../src/test/resources/client_test.conf | 1 -
.../common/config/server/CheckpointConfig.java | 2 +
.../server/checkpoint/CheckpointCoordinator.java | 8 +-
.../seatunnel/engine/server/master/JobMaster.java | 12 +-
.../test/resources/batch_fakesource_to_file.conf | 1 -
.../batch_fakesource_to_file_complex.conf | 1 -
.../src/test/resources/fake_to_console.conf | 1 -
.../resources/fake_to_console_job_metrics.conf | 1 -
.../main/resources/examples/fake_to_console.conf | 1 -
34 files changed, 444 insertions(+), 122 deletions(-)
diff --git a/docs/en/concept/JobEnvConfig.md b/docs/en/concept/JobEnvConfig.md
index 7272c90fcc..ce1716e3c0 100644
--- a/docs/en/concept/JobEnvConfig.md
+++ b/docs/en/concept/JobEnvConfig.md
@@ -18,6 +18,8 @@ You can configure whether the task is in batch mode or stream
mode through `job.
Gets the interval in which checkpoints are periodically scheduled.
+In `STREAMING` mode, checkpoints is required, if you do not set it, it will be
obtained from the application configuration file `seatunnel.yaml`. In `BATCH`
mode, you can disable checkpoints by not setting this parameter.
+
## parallelism
This parameter configures the parallelism of source and sink.
diff --git a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf
b/seatunnel-api/src/test/resources/conf/getCatalogTable.conf
index 485f026a0d..63cf0f6cf8 100644
--- a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf
+++ b/seatunnel-api/src/test/resources/conf/getCatalogTable.conf
@@ -17,7 +17,6 @@
env {
job.mode = "BATCH"
- checkpoint.interval = 5000
}
source {
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
index 12168921d8..1f01687a1a 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkRuntimeEnvironment.java
@@ -236,7 +236,10 @@ public class FlinkRuntimeEnvironment implements
RuntimeEnvironment {
}
private void setCheckpoint() {
-
+ if (jobMode == JobMode.BATCH) {
+ log.warn(
+ "Disabled Checkpointing. In flink execution environment,
checkpointing is not supported and not needed when executing jobs in BATCH
mode");
+ }
long interval = 0;
if (config.hasPath(EnvCommonOptions.CHECKPOINT_INTERVAL.key())) {
interval =
config.getLong(EnvCommonOptions.CHECKPOINT_INTERVAL.key());
diff --git
a/seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf
b/seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf
index bc2114443f..2ca5c56c19 100644
---
a/seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf
+++
b/seatunnel-core/seatunnel-starter/src/test/resources/args/user_defined_params.conf
@@ -22,7 +22,6 @@ env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "BATCH"
- checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf
index cfdf7691d0..f70cc87dce 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-1/src/test/resources/jdbc_mysql_source_and_sink_xa.conf
@@ -18,7 +18,6 @@
env {
execution.parallelism = 1
job.mode = "BATCH"
- execution.checkpoint.interval = 1000
}
source {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
index cd1b32c14e..75802d482c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_hive_source_and_assert.conf
@@ -22,7 +22,6 @@ env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "BATCH"
- checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
index 144f11da9b..75e2979b2e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_postgres_source_and_sink_xa.conf
@@ -18,7 +18,6 @@
env {
execution.parallelism = 1
job.mode = "BATCH"
- execution.checkpoint.interval = 1000
}
source {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
index e1d9fed6f8..8fd4eab745 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-neo4j-e2e/src/test/resources/neo4j/fake_to_neo4j_batch_write.conf
@@ -22,7 +22,6 @@ env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "BATCH"
- checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf
index 59d2efc862..563ee34165 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-pulsar-e2e/src/test/resources/batch_pulsar_to_console.conf
@@ -21,7 +21,6 @@
env {
execution.parallelism = 1
job.mode = "BATCH"
- checkpoint.interval = 5000
#spark config
spark.app.name = "SeaTunnel"
spark.executor.instances = 1
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
index c72fad44ee..9a0b353785 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
@@ -154,4 +154,9 @@ public abstract class AbstractTestFlinkContainer extends
AbstractTestContainer {
public String getServerLogs() {
return jobManager.getLogs();
}
+
+ public String executeJobManagerInnerCommand(String command)
+ throws IOException, InterruptedException {
+ return jobManager.execInContainer("bash", "-c", command).getStdout();
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
index 20a2e612a6..973fe6434c 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/pom.xml
@@ -52,6 +52,12 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-assert</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>imap-storage-file</artifactId>
@@ -71,6 +77,30 @@
</exclusion>
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-flink-13-starter</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-flink-15-starter</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-spark-2-starter</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-spark-3-starter</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
new file mode 100644
index 0000000000..398ecdf0b9
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/CheckpointEnableIT.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.e2e;
+
+import org.apache.seatunnel.common.utils.JsonUtils;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import
org.apache.seatunnel.e2e.common.container.flink.AbstractTestFlinkContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.awaitility.Awaitility.await;
+
+@Slf4j
+public class CheckpointEnableIT extends TestSuiteBase {
+
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason =
+ "depending on the engine, the logic for determining
whether a checkpoint is enabled is different")
+ public void testZetaBatchCheckpointEnable(TestContainer container)
+ throws IOException, InterruptedException {
+ // checkpoint disable, log don't contains 'checkpoint is disabled'
+ Container.ExecResult disableExecResult =
+ container.executeJob(
+
"/checkpoint-batch-disable-test-resources/batch_fakesource_to_localfile_checkpoint_disable.conf");
+ Assertions.assertTrue(container.getServerLogs().contains("checkpoint
is disabled"));
+ Assertions.assertEquals(0, disableExecResult.getExitCode());
+ // check sink file is right
+ Container.ExecResult disableSinkFileExecResult =
+ container.executeJob(
+
"/checkpoint-batch-disable-test-resources/sink_file_text_to_assert.conf");
+ Assertions.assertEquals(0, disableSinkFileExecResult.getExitCode());
+
+ // checkpoint enable, log contains 'checkpoint is enabled'
+ Container.ExecResult enableExecResult =
+ container.executeJob(
+
"/checkpoint-batch-enable-test-resources/batch_fakesource_to_localfile_checkpoint_enable.conf");
+ Assertions.assertTrue(container.getServerLogs().contains("checkpoint
is enabled"));
+ Assertions.assertEquals(0, enableExecResult.getExitCode());
+ // check sink file is right
+ Container.ExecResult enableSinkFileExecResult =
+ container.executeJob(
+
"/checkpoint-batch-enable-test-resources/sink_file_text_to_assert.conf");
+ Assertions.assertEquals(0, enableSinkFileExecResult.getExitCode());
+ }
+
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason =
+ "depending on the engine, the logic for determining
whether a checkpoint is enabled is different")
+ public void testZetaStreamingCheckpointInterval(TestContainer container)
+ throws IOException, InterruptedException {
+ // start job
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return container.executeJob(
+
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+
+ // wait obtain job id
+ AtomicReference<String> jobId = new AtomicReference<>();
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Pattern jobIdPattern =
+ Pattern.compile(
+ ".*Init JobMaster for Job
SeaTunnel_Job \\(([0-9]*)\\).*",
+ Pattern.DOTALL);
+ Matcher matcher =
jobIdPattern.matcher(container.getServerLogs());
+ if (matcher.matches()) {
+ jobId.set(matcher.group(1));
+ }
+ Assertions.assertNotNull(jobId.get());
+ });
+
+ Thread.sleep(15000);
+ Assertions.assertTrue(container.getServerLogs().contains("checkpoint
is enabled"));
+ Assertions.assertEquals(0,
container.savepointJob(jobId.get()).getExitCode());
+
+ // restore job
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return container
+ .restoreJob(
+
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf",
+ jobId.get())
+ .getExitCode();
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+
+ // check sink file is right
+ AtomicReference<Boolean> checkSinkFile = new AtomicReference<>(false);
+ await().atMost(300000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Container.ExecResult disableSinkFileExecResult =
+ container.executeJob(
+
"/checkpoint-streaming-enable-test-resources/sink_file_text_to_assert.conf");
+ checkSinkFile.set(0 ==
disableSinkFileExecResult.getExitCode());
+ Assertions.assertEquals(0,
disableSinkFileExecResult.getExitCode());
+ });
+ Assertions.assertTrue(checkSinkFile.get());
+ }
+
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason =
+ "depending on the engine, the logic for determining
whether a checkpoint is enabled is different")
+ public void testZetaStreamingCheckpointNoInterval(TestContainer container)
+ throws IOException, InterruptedException {
+ // start job
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return container.executeJob(
+
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf");
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+
+ // wait obtain job id
+ AtomicReference<String> jobId = new AtomicReference<>();
+ await().atMost(60000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Pattern jobIdPattern =
+ Pattern.compile(
+ ".*Init JobMaster for Job
SeaTunnel_Job \\(([0-9]*)\\).*",
+ Pattern.DOTALL);
+ Matcher matcher =
jobIdPattern.matcher(container.getServerLogs());
+ if (matcher.matches()) {
+ jobId.set(matcher.group(1));
+ }
+ Assertions.assertNotNull(jobId.get());
+ });
+
+ Thread.sleep(15000);
+ Assertions.assertTrue(container.getServerLogs().contains("checkpoint
is enabled"));
+ Assertions.assertEquals(0,
container.savepointJob(jobId.get()).getExitCode());
+
+ // restore job
+ CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return container
+ .restoreJob(
+
"/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf",
+ jobId.get())
+ .getExitCode();
+ } catch (Exception e) {
+ log.error("Commit task exception :" + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+
+ // check sink file is right
+ AtomicReference<Boolean> checkSinkFile = new AtomicReference<>(false);
+ await().atMost(300000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () -> {
+ Container.ExecResult disableSinkFileExecResult =
+ container.executeJob(
+
"/checkpoint-streaming-enable-test-resources/sink_file_text_to_assert.conf");
+ checkSinkFile.set(0 ==
disableSinkFileExecResult.getExitCode());
+ Assertions.assertEquals(0,
disableSinkFileExecResult.getExitCode());
+ });
+ Assertions.assertTrue(checkSinkFile.get());
+ }
+
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SEATUNNEL, EngineType.SPARK},
+ disabledReason =
+ "depending on the engine, the logic for determining
whether a checkpoint is enabled is different")
+ public void testFlinkCheckpointEnable(AbstractTestFlinkContainer container)
+ throws IOException, InterruptedException {
+ /**
+ * In flink execution environment, checkpoint is not supported and not
needed when executing
+ * jobs in BATCH mode. So it is only necessary to determine whether
flink has enabled
+ * checkpoint by configuring tasks with 'checkpoint.interval'.
+ */
+ Container.ExecResult enableExecResult =
+ container.executeJob(
+
"/checkpoint-batch-enable-test-resources/batch_fakesource_to_localfile_checkpoint_enable.conf");
+ // obtain flink job configuration
+ Matcher matcher =
+
Pattern.compile("JobID\\s([a-fA-F0-9]+)").matcher(enableExecResult.getStdout());
+ Assertions.assertTrue(matcher.find());
+ String jobId = matcher.group(1);
+ Map<String, Object> jobConfig =
+ JsonUtils.toMap(
+ container.executeJobManagerInnerCommand(
+ String.format(
+ "curl
http://localhost:8081/jobs/%s/checkpoints/config",
+ jobId)),
+ String.class,
+ Object.class);
+ /**
+ * when the checkpoint interval is 0x7fffffffffffffff, indicates that
checkpoint is
+ * disabled. reference {@link
+ * org.apache.flink.runtime.jobgraph.JobGraph#isCheckpointingEnabled()}
+ */
+ Assertions.assertEquals(Long.MAX_VALUE,
jobConfig.getOrDefault("interval", 0L));
+ Assertions.assertEquals(0, enableExecResult.getExitCode());
+ }
+
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SEATUNNEL, EngineType.FLINK},
+ disabledReason =
+ "depending on the engine, the logic for determining
whether a checkpoint is enabled is different")
+ public void testSparkCheckpointEnable(TestContainer container)
+ throws IOException, InterruptedException {
+ /**
+ * In spark execution environment, checkpoint is not supported and not
needed when executing
+ * jobs in BATCH mode. So it is only necessary to determine whether
spark has enabled
+ * checkpoint by configuring tasks with 'checkpoint.interval'.
+ */
+ Container.ExecResult enableExecResult =
+ container.executeJob(
+
"/checkpoint-batch-enable-test-resources/batch_fakesource_to_localfile_checkpoint_enable.conf");
+ // according to logs, if checkpoint.interval is configured, spark also
ignores this
+ // configuration
+ Assertions.assertTrue(
+ enableExecResult
+ .getStderr()
+ .contains("Ignoring non-Spark config property:
checkpoint.interval"));
+ Assertions.assertEquals(0, enableExecResult.getExitCode());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
index bc61e06312..10efe3a5e9 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
@@ -21,7 +21,6 @@
env {
# You can set flink configuration here
job.mode = "BATCH"
- execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
index f6fda45b1f..48173bb786 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -21,7 +21,6 @@
env {
# You can set flink configuration here
job.mode = "BATCH"
- execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-disable-test-resources/batch_fakesource_to_localfile_checkpoint_disable.conf
similarity index 72%
copy from
seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
copy to
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-disable-test-resources/batch_fakesource_to_localfile_checkpoint_disable.conf
index 181a9fc1ad..80bd1ba374 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-disable-test-resources/batch_fakesource_to_localfile_checkpoint_disable.conf
@@ -15,48 +15,44 @@
# limitations under the License.
#
######
-###### This config file is a demonstration of streaming processing in
seatunnel config
+###### This config file is a demonstration of disabled checkpoint in batch mode
######
env {
- # You can set flink configuration here
execution.parallelism = 1
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.name = "DISABLE_CHECKPOINT"
job.mode = "BATCH"
- execution.checkpoint.interval = 5000
- #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
source {
# This is a example source plugin **only for test and demonstrate the
feature source plugin**
FakeSource {
result_table_name = "fake"
+ row.num = 100
+ split.num = 5
+ split.read-interval = 3000
schema = {
fields {
+ id = "int"
name = "string"
age = "int"
}
}
- parallelism = 3
}
}
-
transform {
}
-
sink {
LocalFile {
- path = "/tmp/hive/warehouse/test2"
- field_delimiter = "\t"
+ path =
"/tmp/seatunnel/config/checkpoint-batch-disable-test-resources/sinkfile/"
row_delimiter = "\n"
- partition_by = ["age"]
- partition_dir_expression = "${k0}=${v0}"
- is_partition_field_write_in_file = true
file_name_expression = "${transactionId}_${now}"
file_format_type = "text"
- sink_columns = ["name", "age"]
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
- save_mode = "error"
-
}
}
\ No newline at end of file
diff --git a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-disable-test-resources/sink_file_text_to_assert.conf
similarity index 65%
copy from seatunnel-api/src/test/resources/conf/getCatalogTable.conf
copy to
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-disable-test-resources/sink_file_text_to_assert.conf
index 485f026a0d..a30a538425 100644
--- a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-disable-test-resources/sink_file_text_to_assert.conf
@@ -16,29 +16,36 @@
#
env {
+ job.name = "DISABLE_CHECKPOINT_ASSERT"
job.mode = "BATCH"
- checkpoint.interval = 5000
}
source {
- InMemory {
+ LocalFile {
+ path =
"/tmp/seatunnel/config/checkpoint-batch-disable-test-resources/sinkfile"
+ file_format_type = "text"
+ schema = {
+ fields {
+ c_string = string
+ }
+ }
result_table_name = "fake"
- username = "st"
- password = "stpassword"
- table-names = ["st.public.table1", "st.public.table2"]
- parallelism = 3
}
}
-transform {
-}
-
sink {
- InMemory {
- source_table_name = "fake"
- username = "st"
- password = "stpassword"
- address = "localhost"
- port = 1234
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 100
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ]
+ }
}
}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-enable-test-resources/batch_fakesource_to_localfile_checkpoint_enable.conf
similarity index 72%
copy from
seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
copy to
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-enable-test-resources/batch_fakesource_to_localfile_checkpoint_enable.conf
index 181a9fc1ad..b3b5afec9e 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-enable-test-resources/batch_fakesource_to_localfile_checkpoint_enable.conf
@@ -15,48 +15,45 @@
# limitations under the License.
#
######
-###### This config file is a demonstration of streaming processing in
seatunnel config
+###### This config file is a demonstration of enabled checkpoint in batch mode
######
env {
- # You can set flink configuration here
execution.parallelism = 1
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.name = "ENABLE_CHECKPOINT"
job.mode = "BATCH"
- execution.checkpoint.interval = 5000
- #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+ checkpoint.interval = 1000
}
source {
# This is a example source plugin **only for test and demonstrate the
feature source plugin**
FakeSource {
result_table_name = "fake"
+ row.num = 100
+ split.num = 5
+ split.read-interval = 3000
schema = {
fields {
+ id = "int"
name = "string"
age = "int"
}
}
- parallelism = 3
}
}
-
transform {
}
-
sink {
LocalFile {
- path = "/tmp/hive/warehouse/test2"
- field_delimiter = "\t"
+ path =
"/tmp/seatunnel/config/checkpoint-batch-enable-test-resources/sinkfile/"
row_delimiter = "\n"
- partition_by = ["age"]
- partition_dir_expression = "${k0}=${v0}"
- is_partition_field_write_in_file = true
file_name_expression = "${transactionId}_${now}"
file_format_type = "text"
- sink_columns = ["name", "age"]
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
- save_mode = "error"
-
}
}
\ No newline at end of file
diff --git a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-enable-test-resources/sink_file_text_to_assert.conf
similarity index 65%
copy from seatunnel-api/src/test/resources/conf/getCatalogTable.conf
copy to
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-enable-test-resources/sink_file_text_to_assert.conf
index 485f026a0d..a951c1e002 100644
--- a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-batch-enable-test-resources/sink_file_text_to_assert.conf
@@ -16,29 +16,36 @@
#
env {
+ job.name = "ENABLE_CHECKPOINT_ASSERT"
job.mode = "BATCH"
- checkpoint.interval = 5000
}
source {
- InMemory {
+ LocalFile {
+ path =
"/tmp/seatunnel/config/checkpoint-batch-enable-test-resources/sinkfile"
+ file_format_type = "text"
+ schema = {
+ fields {
+ c_string = string
+ }
+ }
result_table_name = "fake"
- username = "st"
- password = "stpassword"
- table-names = ["st.public.table1", "st.public.table2"]
- parallelism = 3
}
}
-transform {
-}
-
sink {
- InMemory {
- source_table_name = "fake"
- username = "st"
- password = "stpassword"
- address = "localhost"
- port = 1234
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 100
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ]
+ }
}
}
\ No newline at end of file
diff --git a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/sink_file_text_to_assert.conf
similarity index 66%
copy from seatunnel-api/src/test/resources/conf/getCatalogTable.conf
copy to
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/sink_file_text_to_assert.conf
index 485f026a0d..65883ccd68 100644
--- a/seatunnel-api/src/test/resources/conf/getCatalogTable.conf
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/sink_file_text_to_assert.conf
@@ -16,29 +16,36 @@
#
env {
+ job.name = "STREAM_JOB_ASSERT"
job.mode = "BATCH"
- checkpoint.interval = 5000
}
source {
- InMemory {
+ LocalFile {
+ path =
"/tmp/seatunnel/config/checkpoint-streaming-enable-test-resources/sinkfile"
+ file_format_type = "text"
+ schema = {
+ fields {
+ c_string = string
+ }
+ }
result_table_name = "fake"
- username = "st"
- password = "stpassword"
- table-names = ["st.public.table1", "st.public.table2"]
- parallelism = 3
}
}
-transform {
-}
-
sink {
- InMemory {
- source_table_name = "fake"
- username = "st"
- password = "stpassword"
- address = "localhost"
- port = 1234
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 100
+ },
+ {
+ rule_type = MIN_ROW
+ rule_value = 100
+ }
+ ]
+ }
}
}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf
similarity index 78%
copy from
seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
copy to
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf
index 181a9fc1ad..fa278ae2e0 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile.conf
@@ -21,22 +21,23 @@
env {
# You can set flink configuration here
execution.parallelism = 1
- job.mode = "BATCH"
- execution.checkpoint.interval = 5000
- #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+ job.mode = "STREAMING"
}
source {
# This is a example source plugin **only for test and demonstrate the
feature source plugin**
FakeSource {
result_table_name = "fake"
+ row.num = 100
+ split.num = 5
+ split.read-interval = 3000
schema = {
fields {
+ id = "int"
name = "string"
age = "int"
}
}
- parallelism = 3
}
}
@@ -45,18 +46,11 @@ transform {
sink {
LocalFile {
- path = "/tmp/hive/warehouse/test2"
- field_delimiter = "\t"
+ path =
"/tmp/seatunnel/config/checkpoint-streaming-enable-test-resources/sinkfile/"
row_delimiter = "\n"
- partition_by = ["age"]
- partition_dir_expression = "${k0}=${v0}"
- is_partition_field_write_in_file = true
file_name_expression = "${transactionId}_${now}"
file_format_type = "text"
- sink_columns = ["name", "age"]
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
- save_mode = "error"
-
}
}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf
similarity index 78%
copy from
seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
copy to
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf
index 181a9fc1ad..288b369e4c 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/checkpoint-streaming-enable-test-resources/stream_fakesource_to_localfile_interval.conf
@@ -21,22 +21,24 @@
env {
# You can set flink configuration here
execution.parallelism = 1
- job.mode = "BATCH"
- execution.checkpoint.interval = 5000
- #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+ job.mode = "STREAMING"
+ checkpoint.interval = 3000
}
source {
# This is a example source plugin **only for test and demonstrate the
feature source plugin**
FakeSource {
result_table_name = "fake"
+ row.num = 100
+ split.num = 5
+ split.read-interval = 3000
schema = {
fields {
+ id = "int"
name = "string"
age = "int"
}
}
- parallelism = 3
}
}
@@ -45,18 +47,11 @@ transform {
sink {
LocalFile {
- path = "/tmp/hive/warehouse/test2"
- field_delimiter = "\t"
+ path =
"/tmp/seatunnel/config/checkpoint-streaming-enable-test-resources/sinkfile/"
row_delimiter = "\n"
- partition_by = ["age"]
- partition_dir_expression = "${k0}=${v0}"
- is_partition_field_write_in_file = true
file_name_expression = "${transactionId}_${now}"
file_format_type = "text"
- sink_columns = ["name", "age"]
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
- save_mode = "error"
-
}
}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties
index 051f67d12d..bfcd94a55a 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/log4j2.properties
@@ -25,6 +25,10 @@ logger.zeta.level=WARN
logger.zetaMaster.name=org.apache.seatunnel.engine.server.master
logger.zetaMaster.level=INFO
+# For print checkpoint info
+logger.checkpoint.name=org.apache.seatunnel.engine.server.checkpoint.CheckpointCoordinator
+logger.checkpoint.level=INFO
+
logger.debezium.name=io.debezium.connector
logger.debezium.level=WARN
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
index 181a9fc1ad..ae9e400fc5 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file.conf
@@ -22,7 +22,6 @@ env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
- execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
index 3a44886274..2ecc1d36dd 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -22,7 +22,6 @@ env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
- execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_two_file.conf
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_two_file.conf
index 7ff3c21f78..254737fcfc 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_two_file.conf
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/batch_fakesource_to_two_file.conf
@@ -22,7 +22,6 @@ env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
- execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
index a4404b9f91..2b44e9c757 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
+++
b/seatunnel-engine/seatunnel-engine-client/src/test/resources/client_test.conf
@@ -22,7 +22,6 @@ env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
- execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
diff --git
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
index 78add9c883..76fb99f4d8 100644
---
a/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
+++
b/seatunnel-engine/seatunnel-engine-common/src/main/java/org/apache/seatunnel/engine/common/config/server/CheckpointConfig.java
@@ -35,6 +35,8 @@ public class CheckpointConfig implements Serializable {
private CheckpointStorageConfig storage =
ServerConfigOptions.CHECKPOINT_STORAGE.defaultValue();
+ private boolean checkpointEnable = true;
+
public void setCheckpointInterval(long checkpointInterval) {
checkArgument(
checkpointInterval >= MINIMAL_CHECKPOINT_TIME,
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 60459eaff5..09bf416f6a 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -327,7 +327,13 @@ public class CheckpointCoordinator {
InvocationFuture<?>[] futures = notifyTaskStart();
CompletableFuture.allOf(futures).join();
notifyCompleted(latestCompletedCheckpoint);
-
scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
+ if (coordinatorConfig.isCheckpointEnable()) {
+ LOG.info("checkpoint is enabled, start schedule trigger pending
checkpoint.");
+
scheduleTriggerPendingCheckpoint(coordinatorConfig.getCheckpointInterval());
+ } else {
+ LOG.info(
+ "checkpoint is disabled, because in batch mode and
'checkpoint.interval' of env is missing.");
+ }
}
private void notifyCompleted(CompletedCheckpoint completedCheckpoint) {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 6c527732d6..9bd40804fb 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.common.utils.SeaTunnelException;
import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.config.EngineConfig;
+import org.apache.seatunnel.engine.common.config.JobConfig;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import
org.apache.seatunnel.engine.common.config.server.CheckpointStorageConfig;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
@@ -85,6 +86,7 @@ import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import static com.hazelcast.jet.impl.util.ExceptionUtil.withTryCatch;
+import static org.apache.seatunnel.common.constants.JobMode.BATCH;
public class JobMaster {
private static final ILogger LOGGER = Logger.getLogger(JobMaster.class);
@@ -188,8 +190,7 @@ public class JobMaster {
nodeEngine.getSerializationService().toObject(jobImmutableInformationData);
jobCheckpointConfig =
createJobCheckpointConfig(
- engineConfig.getCheckpointConfig(),
-
jobImmutableInformation.getJobConfig().getEnvOptions());
+ engineConfig.getCheckpointConfig(),
jobImmutableInformation.getJobConfig());
LOGGER.info(
String.format(
@@ -257,7 +258,8 @@ public class JobMaster {
// TODO replace it after ReadableConfig Support parse yaml format, then
use only one config to
// read engine and env config.
private CheckpointConfig createJobCheckpointConfig(
- CheckpointConfig defaultCheckpointConfig, Map<String, Object>
jobEnv) {
+ CheckpointConfig defaultCheckpointConfig, JobConfig jobConfig) {
+ Map<String, Object> jobEnv = jobConfig.getEnvOptions();
CheckpointConfig jobCheckpointConfig = new CheckpointConfig();
jobCheckpointConfig.setCheckpointTimeout(defaultCheckpointConfig.getCheckpointTimeout());
jobCheckpointConfig.setCheckpointInterval(defaultCheckpointConfig.getCheckpointInterval());
@@ -274,6 +276,10 @@ public class JobMaster {
jobCheckpointConfig.setCheckpointInterval(
Long.parseLong(
jobEnv.get(EnvCommonOptions.CHECKPOINT_INTERVAL.key()).toString()));
+ } else if (jobConfig.getJobContext().getJobMode() == BATCH) {
+ LOGGER.info(
+ "in batch mode, the 'checkpoint.interval' configuration of
env is missing, so checkpoint will be disabled");
+ jobCheckpointConfig.setCheckpointEnable(false);
}
if (jobEnv.containsKey(EnvCommonOptions.CHECKPOINT_TIMEOUT.key())) {
jobCheckpointConfig.setCheckpointTimeout(
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file.conf
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file.conf
index 24339945e7..382ba3db75 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file.conf
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file.conf
@@ -22,7 +22,6 @@ env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
- execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_complex.conf
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_complex.conf
index e3e0e00d9b..30c0242137 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_complex.conf
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -22,7 +22,6 @@ env {
# You can set flink configuration here
execution.parallelism = 1
job.mode = "BATCH"
- execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf
index 9e04be8bec..c59738b46f 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console.conf
@@ -22,7 +22,6 @@ env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "BATCH"
- execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console_job_metrics.conf
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console_job_metrics.conf
index 7edeb4ef71..68f70e5178 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console_job_metrics.conf
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/fake_to_console_job_metrics.conf
@@ -22,7 +22,6 @@ env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "BATCH"
- execution.checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}
diff --git
a/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf
b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf
index f6482a5475..aa0d49c034 100644
---
a/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf
+++
b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf
@@ -22,7 +22,6 @@ env {
# You can set engine configuration here
execution.parallelism = 1
job.mode = "BATCH"
- checkpoint.interval = 5000
#execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
}