This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 670bba0c41 [Improve][Zeta] Clean checkpoint file when job
FINISHED/CANCELED (#6938)
670bba0c41 is described below
commit 670bba0c412229743893fe3cc9a9cb28621df188
Author: Jia Fan <[email protected]>
AuthorDate: Fri Jun 14 17:46:05 2024 +0800
[Improve][Zeta] Clean checkpoint file when job FINISHED/CANCELED (#6938)
---
.../server/checkpoint/CheckpointManager.java | 3 +-
.../seatunnel/engine/server/master/JobMaster.java | 1 +
.../server/checkpoint/CheckpointManagerTest.java | 4 +-
.../server/checkpoint/CheckpointStorageTest.java | 64 ++++++++++++++++++++++
.../batch_fakesource_to_file_with_checkpoint.conf | 62 +++++++++++++++++++++
.../stream_fake_to_console_with_checkpoint.conf | 52 ++++++++++++++++++
6 files changed, 181 insertions(+), 5 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
index a38703dee8..0142d8a6a4 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java
@@ -241,12 +241,11 @@ public class CheckpointManager {
* Called by the JobMaster. <br>
* Listen to the {@link JobStatus} of the {@link Job}.
*/
- public CompletableFuture<Void> shutdown(JobStatus jobStatus) {
+ public void clearCheckpointIfNeed(JobStatus jobStatus) {
if ((jobStatus == JobStatus.FINISHED || jobStatus ==
JobStatus.CANCELED)
&& !isSavePointEnd()) {
checkpointStorage.deleteCheckpoint(jobId + "");
}
- return CompletableFuture.completedFuture(null);
}
/**
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
index 8f54402d80..1b7bf6bdad 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java
@@ -518,6 +518,7 @@ public class JobMaster {
}
public void cleanJob() {
+ checkpointManager.clearCheckpointIfNeed(physicalPlan.getJobStatus());
jobHistoryService.storeJobInfo(jobImmutableInformation.getJobId(),
getJobDAGInfo());
jobHistoryService.storeFinishedJobState(this);
removeJobIMap();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
index c4b7b8af7d..ce5cca1780 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java
@@ -41,7 +41,6 @@ import com.hazelcast.map.IMap;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.CompletableFuture;
import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID;
import static
org.apache.seatunnel.engine.common.Constant.IMAP_RUNNING_JOB_STATE;
@@ -94,8 +93,7 @@ public class CheckpointManagerTest extends
AbstractSeaTunnelServerTest {
Assertions.assertTrue(checkpointManager.isCompletedPipeline(1));
checkpointManager.listenPipeline(1, PipelineStatus.FINISHED);
Assertions.assertNull(checkpointIdMap.get(1));
- CompletableFuture<Void> future =
checkpointManager.shutdown(JobStatus.FINISHED);
- future.join();
+ checkpointManager.clearCheckpointIfNeed(JobStatus.FINISHED);
Assertions.assertTrue(checkpointStorage.getAllCheckpoints(jobId +
"").isEmpty());
}
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
index 2a334fe384..63e1277827 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
@@ -43,6 +43,11 @@ public class CheckpointStorageTest extends
AbstractSeaTunnelServerTest {
public static String STREAM_CONF_PATH =
"stream_fake_to_console_biginterval.conf";
public static String BATCH_CONF_PATH = "batch_fakesource_to_file.conf";
+ public static String BATCH_CONF_WITH_CHECKPOINT_PATH =
+ "batch_fakesource_to_file_with_checkpoint.conf";
+
+ public static String STREAM_CONF_WITH_CHECKPOINT_PATH =
+ "stream_fake_to_console_with_checkpoint.conf";
@Override
public SeaTunnelConfig loadSeaTunnelConfig() {
@@ -113,4 +118,63 @@ public class CheckpointStorageTest extends
AbstractSeaTunnelServerTest {
checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
Assertions.assertEquals(0, allCheckpoints.size());
}
+
+ @Test
+ public void testBatchJobWithCheckpoint() throws CheckpointStorageException
{
+ long jobId = System.currentTimeMillis();
+ CheckpointConfig checkpointConfig =
+
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
+
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
+
+ CheckpointStorage checkpointStorage =
+ FactoryUtil.discoverFactory(
+ Thread.currentThread().getContextClassLoader(),
+ CheckpointStorageFactory.class,
+ checkpointConfig.getStorage().getStorage())
+
.create(checkpointConfig.getStorage().getStoragePluginConfig());
+ startJob(jobId, BATCH_CONF_WITH_CHECKPOINT_PATH, false);
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+
server.getCoordinatorService().getJobStatus(jobId),
+ JobStatus.FINISHED));
+ List<PipelineState> allCheckpoints =
+ checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
+ Assertions.assertEquals(0, allCheckpoints.size());
+ }
+
+ @Test
+ public void testStreamJobWithCancel() throws CheckpointStorageException,
InterruptedException {
+ long jobId = System.currentTimeMillis();
+ CheckpointConfig checkpointConfig =
+
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
+
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
+
+ CheckpointStorage checkpointStorage =
+ FactoryUtil.discoverFactory(
+ Thread.currentThread().getContextClassLoader(),
+ CheckpointStorageFactory.class,
+ checkpointConfig.getStorage().getStorage())
+
.create(checkpointConfig.getStorage().getStoragePluginConfig());
+ startJob(jobId, STREAM_CONF_WITH_CHECKPOINT_PATH, false);
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+
server.getCoordinatorService().getJobStatus(jobId),
+ JobStatus.RUNNING));
+ // wait for checkpoint
+ Thread.sleep(10 * 1000);
+ server.getCoordinatorService().getJobMaster(jobId).cancelJob();
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+
server.getCoordinatorService().getJobStatus(jobId),
+ JobStatus.CANCELED));
+ List<PipelineState> allCheckpoints =
+ checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
+ Assertions.assertEquals(0, allCheckpoints.size());
+ }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_with_checkpoint.conf
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_with_checkpoint.conf
new file mode 100644
index 0000000000..721f89fe94
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_with_checkpoint.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ checkpoint.interval = 1000
+}
+
+source {
+ FakeSource {
+ row.num = 100
+ split.num = 5
+ split.read-interval = 3000
+ result_table_name = "fake"
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ parallelism = 1
+ }
+}
+
+transform {
+}
+
+sink {
+ LocalFile {
+ path="/tmp/hive/warehouse/test2"
+ field_delimiter="\t"
+ row_delimiter="\n"
+ partition_by=["age"]
+ partition_dir_expression="${k0}=${v0}"
+ is_partition_field_write_in_file=true
+ file_name_expression="${transactionId}_${now}"
+ file_format_type="text"
+ sink_columns=["name","age"]
+ filename_time_format="yyyy.MM.dd"
+ is_enable_transaction=true
+ save_mode="error"
+
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_with_checkpoint.conf
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_with_checkpoint.conf
new file mode 100644
index 0000000000..de02ec9624
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_with_checkpoint.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
SeaTunnel config
+######
+
+env {
+ # You can set SeaTunnel environment configuration here
+ parallelism = 2
+ job.mode = "STREAMING"
+ checkpoint.interval = 1000
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ parallelism = 2
+ result_table_name = "fake"
+ row.num = 16
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+
+ # If you would like to get more information about how to configure SeaTunnel
and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/category/source-v2
+}
+
+sink {
+ Console {
+ }
+
+ # If you would like to get more information about how to configure SeaTunnel
and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/category/sink-v2
+}