This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7763541be9 [Improve][Zeta] when job finished, the checkpoint won't
write to file (#6674)
7763541be9 is described below
commit 7763541be997baac2d8ae1b3455c009ba86c9abf
Author: Jarvis <[email protected]>
AuthorDate: Fri Apr 26 13:36:17 2024 +0800
[Improve][Zeta] when job finished, the checkpoint won't write to file
(#6674)
---
.../engine/core/checkpoint/CheckpointType.java | 1 +
.../server/checkpoint/CheckpointCoordinator.java | 18 ++--
.../engine/server/AbstractSeaTunnelServerTest.java | 30 ++++++
.../server/checkpoint/CheckpointStorageTest.java | 118 +++++++++++++++++++++
.../engine/server/checkpoint/SavePointTest.java | 26 -----
.../engine/server/master/JobMetricsTest.java | 26 -----
.../stream_fake_to_console_biginterval.conf | 52 +++++++++
7 files changed, 211 insertions(+), 60 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
index aa057a2e88..3a999726a6 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/checkpoint/CheckpointType.java
@@ -91,6 +91,7 @@ public enum CheckpointType {
return !isSchemaChangeCheckpoint();
}
+ /** only batch job FINISHED will return true. other case all return false.
*/
public boolean notCompletedCheckpoint() {
return this != COMPLETED_POINT_TYPE;
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
index 313e4e9dd6..47392f7a31 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java
@@ -767,14 +767,16 @@ public class CheckpointCoordinator {
final long checkpointId = completedCheckpoint.getCheckpointId();
completedCheckpointIds.addLast(String.valueOf(completedCheckpoint.getCheckpointId()));
try {
- byte[] states = serializer.serialize(completedCheckpoint);
- checkpointStorage.storeCheckPoint(
- PipelineState.builder()
- .checkpointId(checkpointId)
- .jobId(String.valueOf(jobId))
- .pipelineId(pipelineId)
- .states(states)
- .build());
+ if
(completedCheckpoint.getCheckpointType().notCompletedCheckpoint()) {
+ byte[] states = serializer.serialize(completedCheckpoint);
+ checkpointStorage.storeCheckPoint(
+ PipelineState.builder()
+ .checkpointId(checkpointId)
+ .jobId(String.valueOf(jobId))
+ .pipelineId(pipelineId)
+ .states(states)
+ .build());
+ }
if (completedCheckpointIds.size()
%
coordinatorConfig.getStorage().getMaxRetainedCheckpoints()
== 0
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
index 5a0d288f4a..2710c2cbd7 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/AbstractSeaTunnelServerTest.java
@@ -20,6 +20,9 @@ package org.apache.seatunnel.engine.server;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.engine.common.config.ConfigProvider;
import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
+import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
+import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -27,10 +30,13 @@ import org.junit.jupiter.api.TestInstance;
import com.hazelcast.config.Config;
import com.hazelcast.instance.impl.HazelcastInstanceImpl;
+import com.hazelcast.internal.serialization.Data;
import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.impl.NodeEngine;
import lombok.extern.slf4j.Slf4j;
+import java.util.Collections;
+
@Slf4j
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public abstract class AbstractSeaTunnelServerTest<T extends
AbstractSeaTunnelServerTest> {
@@ -83,6 +89,30 @@ public abstract class AbstractSeaTunnelServerTest<T extends
AbstractSeaTunnelSer
LOGGER = nodeEngine.getLogger(AbstractSeaTunnelServerTest.class);
}
+ public SeaTunnelConfig loadSeaTunnelConfig() {
+ return ConfigProvider.locateAndGetSeaTunnelConfig();
+ }
+
+ protected void startJob(Long jobId, String path, boolean
isStartWithSavePoint) {
+ LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path,
jobId.toString(), jobId);
+
+ JobImmutableInformation jobImmutableInformation =
+ new JobImmutableInformation(
+ jobId,
+ "Test",
+ isStartWithSavePoint,
+
nodeEngine.getSerializationService().toData(testLogicalDag),
+ testLogicalDag.getJobConfig(),
+ Collections.emptyList(),
+ Collections.emptyList());
+
+ Data data =
nodeEngine.getSerializationService().toData(jobImmutableInformation);
+
+ PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
+ server.getCoordinatorService().submitJob(jobId, data);
+ voidPassiveCompletableFuture.join();
+ }
+
@AfterAll
public void after() {
try {
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
new file mode 100644
index 0000000000..13d86d011a
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.server.checkpoint;
+
+import org.apache.seatunnel.engine.checkpoint.storage.PipelineState;
+import org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorage;
+import
org.apache.seatunnel.engine.checkpoint.storage.api.CheckpointStorageFactory;
+import
org.apache.seatunnel.engine.checkpoint.storage.exception.CheckpointStorageException;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
+import org.apache.seatunnel.engine.common.utils.FactoryUtil;
+import org.apache.seatunnel.engine.core.job.JobStatus;
+import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.awaitility.Awaitility.await;
+
+@DisabledOnOs(OS.WINDOWS)
+public class CheckpointStorageTest extends AbstractSeaTunnelServerTest {
+
+ public static String STREAM_CONF_PATH =
"stream_fake_to_console_biginterval.conf";
+ public static String BATCH_CONF_PATH = "batch_fakesource_to_file.conf";
+
+ @Override
+ public SeaTunnelConfig loadSeaTunnelConfig() {
+ SeaTunnelConfig seaTunnelConfig = super.loadSeaTunnelConfig();
+ CheckpointConfig checkpointConfig =
seaTunnelConfig.getEngineConfig().getCheckpointConfig();
+ // set a bigger interval in here and config file to avoid auto trigger
checkpoint affect
+ // test result
+ checkpointConfig.setCheckpointInterval(Integer.MAX_VALUE);
+
seaTunnelConfig.getEngineConfig().setCheckpointConfig(checkpointConfig);
+ return seaTunnelConfig;
+ }
+
+ @Test
+ public void testGenerateFileWhenSavepoint()
+ throws CheckpointStorageException, InterruptedException {
+ long jobId = System.currentTimeMillis();
+ CheckpointConfig checkpointConfig =
+
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
+
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
+
+ CheckpointStorage checkpointStorage =
+ FactoryUtil.discoverFactory(
+ Thread.currentThread().getContextClassLoader(),
+ CheckpointStorageFactory.class,
+ checkpointConfig.getStorage().getStorage())
+
.create(checkpointConfig.getStorage().getStoragePluginConfig());
+ startJob(jobId, STREAM_CONF_PATH, false);
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertTrue(
+ server.getCoordinatorService()
+ .getJobStatus(jobId)
+ .equals(JobStatus.RUNNING)));
+ Thread.sleep(1000);
+ CompletableFuture<Void> future1 =
+ server.getCoordinatorService().getJobMaster(jobId).savePoint();
+ future1.join();
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+
server.getCoordinatorService().getJobStatus(jobId),
+ JobStatus.SAVEPOINT_DONE));
+ List<PipelineState> savepoint1 =
checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
+ Assertions.assertEquals(1, savepoint1.size());
+ }
+
+ @Test
+ public void testBatchJob() throws CheckpointStorageException {
+ long jobId = System.currentTimeMillis();
+ CheckpointConfig checkpointConfig =
+
server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig();
+
server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig);
+
+ CheckpointStorage checkpointStorage =
+ FactoryUtil.discoverFactory(
+ Thread.currentThread().getContextClassLoader(),
+ CheckpointStorageFactory.class,
+ checkpointConfig.getStorage().getStorage())
+
.create(checkpointConfig.getStorage().getStoragePluginConfig());
+ startJob(jobId, BATCH_CONF_PATH, false);
+ await().atMost(120000, TimeUnit.MILLISECONDS)
+ .untilAsserted(
+ () ->
+ Assertions.assertEquals(
+
server.getCoordinatorService().getJobStatus(jobId),
+ JobStatus.FINISHED));
+ List<PipelineState> allCheckpoints =
+ checkpointStorage.getAllCheckpoints(String.valueOf(jobId));
+ Assertions.assertEquals(0, allCheckpoints.size());
+ }
+}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
index 8b2ca9ae35..c062a95941 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/SavePointTest.java
@@ -20,11 +20,8 @@ package org.apache.seatunnel.engine.server.checkpoint;
import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.engine.common.exception.SavePointFailedException;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
-import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
-import org.apache.seatunnel.engine.server.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
@@ -32,9 +29,6 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
-import com.hazelcast.internal.serialization.Data;
-
-import java.util.Collections;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
@@ -221,24 +215,4 @@ public class SavePointTest extends
AbstractSeaTunnelServerTest<SavePointTest> {
Thread.sleep(1000);
}
-
- private void startJob(Long jobId, String path, boolean
isStartWithSavePoint) {
- LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path,
jobId.toString(), jobId);
-
- JobImmutableInformation jobImmutableInformation =
- new JobImmutableInformation(
- jobId,
- "Test",
- isStartWithSavePoint,
-
nodeEngine.getSerializationService().toData(testLogicalDag),
- testLogicalDag.getJobConfig(),
- Collections.emptyList(),
- Collections.emptyList());
-
- Data data =
nodeEngine.getSerializationService().toData(jobImmutableInformation);
-
- PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
- server.getCoordinatorService().submitJob(jobId, data);
- voidPassiveCompletableFuture.join();
- }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
index ed12a565d7..0e6202a0a6 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java
@@ -18,13 +18,9 @@
package org.apache.seatunnel.engine.server.master;
import org.apache.seatunnel.api.common.metrics.JobMetrics;
-import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
-import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
-import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.JobStatus;
import org.apache.seatunnel.engine.server.AbstractSeaTunnelServerTest;
import org.apache.seatunnel.engine.server.CoordinatorService;
-import org.apache.seatunnel.engine.server.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -32,10 +28,8 @@ import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.condition.DisabledOnOs;
import org.junit.jupiter.api.condition.OS;
-import com.hazelcast.internal.serialization.Data;
import lombok.extern.slf4j.Slf4j;
-import java.util.Collections;
import java.util.concurrent.TimeUnit;
import static
org.apache.seatunnel.api.common.metrics.MetricNames.SINK_WRITE_COUNT;
@@ -160,24 +154,4 @@ class JobMetricsTest extends AbstractSeaTunnelServerTest {
});
server.getCoordinatorService().cancelJob(jobId3);
}
-
- private void startJob(Long jobId, String path, boolean
isStartWithSavePoint) {
- LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path,
jobId.toString(), jobId);
-
- JobImmutableInformation jobImmutableInformation =
- new JobImmutableInformation(
- jobId,
- "Test",
- isStartWithSavePoint,
-
nodeEngine.getSerializationService().toData(testLogicalDag),
- testLogicalDag.getJobConfig(),
- Collections.emptyList(),
- Collections.emptyList());
-
- Data data =
nodeEngine.getSerializationService().toData(jobImmutableInformation);
-
- PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
- server.getCoordinatorService().submitJob(jobId, data);
- voidPassiveCompletableFuture.join();
- }
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_biginterval.conf
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_biginterval.conf
new file mode 100644
index 0000000000..73d79fa660
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_biginterval.conf
@@ -0,0 +1,52 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
SeaTunnel config
+######
+
+env {
+ # You can set SeaTunnel environment configuration here
+ parallelism = 2
+ job.mode = "STREAMING"
+ checkpoint.interval = 2147483640
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ parallelism = 2
+ result_table_name = "fake"
+ row.num = 16
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ }
+
+ # If you would like to get more information about how to configure SeaTunnel
and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/category/source-v2
+}
+
+sink {
+ Console {
+ }
+
+ # If you would like to get more information about how to configure SeaTunnel
and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/category/sink-v2
+}