This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7763541be9 [Improve][Zeta] when job finished, the checkpoint won't 
write to file (#6674)
7763541be9 is described below

commit 7763541be997baac2d8ae1b3455c009ba86c9abf
Author: Jarvis <[email protected]>
AuthorDate: Fri Apr 26 13:36:17 2024 +0800

    [Improve][Zeta] when job finished, the checkpoint won't write to file 
(#6674)
---
 .../engine/core/checkpoint/CheckpointType.java     |   1 +
 .../server/checkpoint/CheckpointCoordinator.java   |  18 ++--
 .../engine/server/AbstractSeaTunnelServerTest.java |  30 ++++++
 .../server/checkpoint/CheckpointStorageTest.java   | 118 +++++++++++++++++++++
 .../engine/server/checkpoint/SavePointTest.java    |  26 -----
 .../engine/server/master/JobMetricsTest.java       |  26 -----
 .../stream_fake_to_console_biginterval.conf        |  52 +++++++++
 7 files changed, 211 insertions(+), 60 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
index aa057a2e88..3a999726a6 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
@@ -91,6 +91,7 @@ public enum CheckpointType {
         return !isSchemaChangeCheckpoint();
     }
 
+    /** only batch job FINISHED will return true. other case all return false. 
*/
     public boolean notCompletedCheckpoint() {
         return this != COMPLETED_POINT_TYPE;
     }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 313e4e9dd6..47392f7a31 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -767,14 +767,16 @@ public class CheckpointCoordinator {
         final long checkpointId = completedCheckpoint.getCheckpointId();
         
completedCheckpointIds.addLast(String.valueOf(completedCheckpoint.getCheckpointId()));
         try {
-            byte[] states = serializer.serialize(completedCheckpoint);
-            checkpointStorage.storeCheckPoint(
-                    PipelineState.builder()
-                            .checkpointId(checkpointId)
-                            .jobId(String.valueOf(jobId))
-                            .pipelineId(pipelineId)
-                            .states(states)
-                            .build());
+            if 
(completedCheckpoint.getCheckpointType().notCompletedCheckpoint()) {
+                byte[] states = serializer.serialize(completedCheckpoint);
+                checkpointStorage.storeCheckPoint(
+                        PipelineState.builder()
+                                .checkpointId(checkpointId)
+                                .jobId(String.valueOf(jobId))
+                                .pipelineId(pipelineId)
+                                .states(states)
+                                .build());
+            }
             if (completedCheckpointIds.size()
                                     % 
coordinatorConfig.getStorage().getMaxRetainedCheckpoints()
                             == 0
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
index 5a0d288f4a..2710c2cbd7 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
@@ -20,6 +20,9 @@ package org.apache.seatunnel.engine.server;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.engine.common.config.ConfigProvider;
 import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
@@ -27,10 +30,13 @@ import org.junit.jupiter.api.TestInstance;
 
 import com.hazelcast.config.Config;
 import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.internal.serialization.Data;
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.spi.impl.NodeEngine;
 import lombok.extern.slf4j.Slf4j;
 
+import java.util.Collections;
+
 @Slf4j
 @TestInstance(TestInstance.Lifecycle.PER_CLASS)
 public abstract class AbstractSeaTunnelServerTest<T extends 
AbstractSeaTunnelServerTest> {
@@ -83,6 +89,30 @@ public abstract class AbstractSeaTunnelServerTest<T extends 
AbstractSeaTunnelSer
         LOGGER = nodeEngine.getLogger(AbstractSeaTunnelServerTest.class);
     }
 
+    public SeaTunnelConfig loadSeaTunnelConfig() {
+        return ConfigProvider.locateAndGetSeaTunnelConfig();
+    }
+
+    protected void startJob(Long jobId, String path, boolean 
isStartWithSavePoint) {
+        LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, 
jobId.toString(), jobId);
+
+        JobImmutableInformation jobImmutableInformation =
+                new JobImmutableInformation(
+                        jobId,
+                        "Test",
+                        isStartWithSavePoint,
+                        
nodeEngine.getSerializationService().toData(testLogicalDag),
+                        testLogicalDag.getJobConfig(),
+                        Collections.emptyList(),
+                        Collections.emptyList());
+
+        Data data = 
nodeEngine.getSerializationService().toData(jobImmutableInformation);
+
+        PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+                server.getCoordinatorService().submitJob(jobId, data);
+        voidPassiveCompletableFuture.join();
+    }
+
     @AfterAll
     public void after() {
         try {
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
new file mode 100644
index 0000000000..13d86d011a
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
+import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
+import 
org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
+import 
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
+import org.apache.seatunnel.engine.common.utils.FactoryUtil;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+
+@DisabledOnOs(OS.WINDOWS)
+public class CheckpointStorageTest extends AbstractSeaTunnelServerTest {
+
+    public static String STREAM_CONF_PATH = 
"stream_fake_to_console_biginterval.conf";
+    public static String BATCH_CONF_PATH = "batch_fakesource_to_file.conf";
+
+    @Override
+    public SeaTunnelConfig loadSeaTunnelConfig() {
+        SeaTunnelConfig seaTunnelConfig = super.loadSeaTunnelConfig();
+        CheckpointConfig checkpointConfig = 
seaTunnelConfig.getEngineConfig().getCheckpointConfig();
+        // set a bigger interval in here and config file to avoid auto trigger 
checkpoint affect
+        // test result
+        checkpointConfig.setCheckpointInterval(Integer.MAX_VALUE);
+        
seaTunnelConfig.getEngineConfig().setCheckpointConfig(checkpointConfig);
+        return seaTunnelConfig;
+    }
+
+    @Test
+    public void testGenerateFileWhenSavepoint()
+            throws CheckpointStorageException, InterruptedException {
+        long jobId = System.currentTimeMillis();
+        CheckpointConfig checkpointConfig =
+                
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
+        
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
+
+        CheckpointStorage checkpointStorage =
+                FactoryUtil.discoverFactory(
+                                Thread.currentThread().getContextClassLoader(),
+                                CheckpointStorageFactory.class,
+                                checkpointConfig.getStorage().getStorage())
+                        
.create(checkpointConfig.getStorage().getStoragePluginConfig());
+        startJob(jobId, STREAM_CONF_PATH, false);
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertTrue(
+                                        server.getCoordinatorService()
+                                                .getJobStatus(jobId)
+                                                .equals(JobStatus.RUNNING)));
+        Thread.sleep(1000);
+        CompletableFuture<Void> future1 =
+                server.getCoordinatorService().getJobMaster(jobId).savePoint();
+        future1.join();
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        
server.getCoordinatorService().getJobStatus(jobId),
+                                        JobStatus.SAVEPOINT_DONE));
+        List<PipelineState> savepoint1 = 
checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
+        Assertions.assertEquals(1, savepoint1.size());
+    }
+
+    @Test
+    public void testBatchJob() throws CheckpointStorageException {
+        long jobId = System.currentTimeMillis();
+        CheckpointConfig checkpointConfig =
+                
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
+        
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
+
+        CheckpointStorage checkpointStorage =
+                FactoryUtil.discoverFactory(
+                                Thread.currentThread().getContextClassLoader(),
+                                CheckpointStorageFactory.class,
+                                checkpointConfig.getStorage().getStorage())
+                        
.create(checkpointConfig.getStorage().getStoragePluginConfig());
+        startJob(jobId, BATCH_CONF_PATH, false);
+        await().atMost(120000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () ->
+                                Assertions.assertEquals(
+                                        
server.getCoordinatorService().getJobStatus(jobId),
+                                        JobStatus.FINISHED));
+        List<PipelineState> allCheckpoints =
+                checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
+        Assertions.assertEquals(0, allCheckpoints.size());
+    }
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
index 8b2ca9ae35..c062a95941 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
@@ -20,11 +20,8 @@ package org.apache.seatunnel.engine.server.checkpoint;
 import org.apache.seatunnel.common.utils.FileUtils;
 import org.apache.seatunnel.engine.common.exception.SavePointFailedException;
 import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
-import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
-import org.apache.seatunnel.engine.server.TestUtils;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Disabled;
@@ -32,9 +29,6 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
-import com.hazelcast.internal.serialization.Data;
-
-import java.util.Collections;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 
@@ -221,24 +215,4 @@ public class SavePointTest extends 
AbstractSeaTunnelServerTest<SavePointTest> {
 
         Thread.sleep(1000);
     }
-
-    private void startJob(Long jobId, String path, boolean 
isStartWithSavePoint) {
-        LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, 
jobId.toString(), jobId);
-
-        JobImmutableInformation jobImmutableInformation =
-                new JobImmutableInformation(
-                        jobId,
-                        "Test",
-                        isStartWithSavePoint,
-                        
nodeEngine.getSerializationService().toData(testLogicalDag),
-                        testLogicalDag.getJobConfig(),
-                        Collections.emptyList(),
-                        Collections.emptyList());
-
-        Data data = 
nodeEngine.getSerializationService().toData(jobImmutableInformation);
-
-        PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
-                server.getCoordinatorService().submitJob(jobId, data);
-        voidPassiveCompletableFuture.join();
-    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
index ed12a565d7..0e6202a0a6 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
@@ -18,13 +18,9 @@
 package org.apache.seatunnel.engine.server.master;
 
 import org.apache.seatunnel.api.common.metrics.JobMetrics;
-import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
-import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
 import org.apache.seatunnel.engine.core.job.JobStatus;
 import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
 import org.apache.seatunnel.engine.server.CoordinatorService;
-import org.apache.seatunnel.engine.server.TestUtils;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -32,10 +28,8 @@ import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.condition.DisabledOnOs;
 import org.junit.jupiter.api.condition.OS;
 
-import com.hazelcast.internal.serialization.Data;
 import lombok.extern.slf4j.Slf4j;
 
-import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
@@ -160,24 +154,4 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
                         });
         server.getCoordinatorService().cancelJob(jobId3);
     }
-
-    private void startJob(Long jobId, String path, boolean 
isStartWithSavePoint) {
-        LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, 
jobId.toString(), jobId);
-
-        JobImmutableInformation jobImmutableInformation =
-                new JobImmutableInformation(
-                        jobId,
-                        "Test",
-                        isStartWithSavePoint,
-                        
nodeEngine.getSerializationService().toData(testLogicalDag),
-                        testLogicalDag.getJobConfig(),
-                        Collections.emptyList(),
-                        Collections.emptyList());
-
-        Data data = 
nodeEngine.getSerializationService().toData(jobImmutableInformation);
-
-        PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
-                server.getCoordinatorService().submitJob(jobId, data);
-        voidPassiveCompletableFuture.join();
-    }
 }
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_biginterval.conf
 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_biginterval.conf
new file mode 100644
index 0000000000..73d79fa660
--- /dev/null
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_biginterval.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
SeaTunnel config
+######
+
+env {
+  # You can set SeaTunnel environment configuration here
+  parallelism = 2
+  job.mode = "STREAMING"
+  checkpoint.interval = 2147483640
+}
+
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+  FakeSource {
+    parallelism = 2
+    result_table_name = "fake"
+    row.num = 16
+    schema = {
+      fields {
+        name = "string"
+        age = "int"
+      }
+    }
+  }
+
+  # If you would like to get more information about how to configure SeaTunnel 
and see full list of source plugins,
+  # please go to https://seatunnel.apache.org/docs/category/source-v2
+}
+
+sink {
+  Console {
+  }
+
+  # If you would like to get more information about how to configure SeaTunnel 
and see full list of sink plugins,
+  # please go to https://seatunnel.apache.org/docs/category/sink-v2
+}

Reply via email to