This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 790150a54b [Feature][Zeta] Support config job retry times in job
config (#6690)
790150a54b is described below
commit 790150a54b559148657a599fa16bf00539a5ab74
Author: Jia Fan <[email protected]>
AuthorDate: Fri Apr 12 19:24:04 2024 +0800
[Feature][Zeta] Support config job retry times in job config (#6690)
---
docs/en/concept/JobEnvConfig.md | 6 +-
.../apache/seatunnel/api/env/EnvCommonOptions.java | 6 ++
.../container/seatunnel/SeaTunnelContainer.java | 78 +++++++++++++++++++++-
.../seatunnel/engine/e2e/JobClientJobProxyIT.java | 76 +++++----------------
.../engine/e2e/classloader/ClassLoaderITBase.java | 74 +-------------------
.../plugin-mapping.properties | 1 +
.../stream_fake_to_inmemory_with_error.conf} | 37 ++++++++--
...tream_fake_to_inmemory_with_error_retry_1.conf} | 38 +++++++++--
.../engine/server/dag/physical/SubPlan.java | 17 +++--
9 files changed, 182 insertions(+), 151 deletions(-)
diff --git a/docs/en/concept/JobEnvConfig.md b/docs/en/concept/JobEnvConfig.md
index 3608b500f7..32bf089e92 100644
--- a/docs/en/concept/JobEnvConfig.md
+++ b/docs/en/concept/JobEnvConfig.md
@@ -1,4 +1,4 @@
-# JobEnvConfig
+# Job Env Config
This document describes env configuration information, the common parameters
can be used in all engines. In order to better distinguish between engine
parameters, the additional parameters of other engine need to carry a prefix.
In flink engine, we use `flink.` as the prefix. In the spark engine, we do not
use any prefixes to modify parameters, because the official spark parameters
themselves start with `spark.`
@@ -29,6 +29,10 @@ In `STREAMING` mode, checkpoints is required, if you do not
set it, it will be o
This parameter configures the parallelism of source and sink.
+### job.retry.times
+
+Used to control the default retry times when a job fails. The default value is
3, and it only works in the Zeta engine.
+
### shade.identifier
Specify the method of encryption, if you didn't have the requirement for
encrypting or decrypting config files, this option can be ignored.
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index 0c010bfb84..75e58a5f5b 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -44,6 +44,12 @@ public interface EnvCommonOptions {
.defaultValue(JobMode.BATCH)
.withDescription("The job mode of this job, support Batch
and Stream");
+ Option<Integer> JOB_RETRY_TIMES =
+ Options.key("job.retry.times")
+ .intType()
+ .defaultValue(3)
+ .withDescription("The retry times of this job");
+
Option<Long> CHECKPOINT_INTERVAL =
Options.key("checkpoint.interval")
.longType()
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 4d68cea7f8..1f59f302f8 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.e2e.common.container.seatunnel;
+import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.TestContainer;
@@ -45,7 +46,9 @@ import groovy.lang.Tuple2;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import java.io.File;
import java.io.IOException;
+import java.net.URL;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
@@ -75,7 +78,11 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
@Override
public void startUp() throws Exception {
- server =
+ server = createSeaTunnelServer();
+ }
+
+ private GenericContainer<?> createSeaTunnelServer() throws IOException,
InterruptedException {
+ GenericContainer<?> server =
new GenericContainer<>(getDockerImage())
.withNetwork(NETWORK)
.withEnv("TZ", "UTC")
@@ -106,6 +113,75 @@ public class SeaTunnelContainer extends
AbstractTestContainer {
executeExtraCommands(server);
server.start();
+ return server;
+ }
+
+ protected GenericContainer<?>
createSeaTunnelContainerWithFakeSourceAndInMemorySink(
+ String configFilePath) throws IOException, InterruptedException {
+ GenericContainer<?> server =
+ new GenericContainer<>(getDockerImage())
+ .withNetwork(NETWORK)
+ .withEnv("TZ", "UTC")
+ .withCommand(
+ ContainerUtil.adaptPathForWin(
+ Paths.get(SEATUNNEL_HOME, "bin",
SERVER_SHELL).toString()))
+ .withNetworkAliases("server")
+ .withExposedPorts()
+ .withLogConsumer(
+ new Slf4jLogConsumer(
+ DockerLoggerFactory.getLogger(
+ "seatunnel-engine:" +
JDK_DOCKER_IMAGE)))
+ .waitingFor(Wait.forListeningPort());
+ copySeaTunnelStarterToContainer(server);
+ server.setPortBindings(Collections.singletonList("5801:5801"));
+ server.setExposedPorts(Collections.singletonList(5801));
+
+ server.withCopyFileToContainer(
+ MountableFile.forHostPath(
+ PROJECT_ROOT_PATH
+ +
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
+ Paths.get(SEATUNNEL_HOME, "config").toString());
+
+ server.withCopyFileToContainer(
+ MountableFile.forHostPath(configFilePath),
+ Paths.get(SEATUNNEL_HOME, "config",
"seatunnel.yaml").toString());
+
+ server.withCopyFileToContainer(
+ MountableFile.forHostPath(
+ PROJECT_ROOT_PATH
+ +
"/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"),
+ Paths.get(SEATUNNEL_HOME,
"lib/seatunnel-hadoop3-3.1.4-uber.jar").toString());
+
+ server.start();
+ // execute extra commands
+ executeExtraCommands(server);
+
+ File module = new File(PROJECT_ROOT_PATH + File.separator +
getConnectorModulePath());
+ List<File> connectorFiles =
+ ContainerUtil.getConnectorFiles(
+ module, Collections.singleton("connector-fake"),
getConnectorNamePrefix());
+ URL url =
+ FileUtils.searchJarFiles(
+ Paths.get(
+ PROJECT_ROOT_PATH
+ + File.separator
+ +
"seatunnel-e2e/seatunnel-e2e-common/target"))
+ .stream()
+ .filter(jar -> jar.toString().endsWith("-tests.jar"))
+ .findFirst()
+ .get();
+ connectorFiles.add(new File(url.getFile()));
+ connectorFiles.forEach(
+ jar ->
+ server.copyFileToContainer(
+
MountableFile.forHostPath(jar.getAbsolutePath()),
+ Paths.get(SEATUNNEL_HOME, "connectors",
jar.getName()).toString()));
+ server.copyFileToContainer(
+ MountableFile.forHostPath(
+ PROJECT_ROOT_PATH
+ +
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake-and-inmemory/plugin-mapping.properties"),
+ Paths.get(SEATUNNEL_HOME, "connectors",
"plugin-mapping.properties").toString());
+ return server;
}
@Override
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
index 7c2e332ba5..84d0e51608 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
@@ -17,87 +17,41 @@
package org.apache.seatunnel.engine.e2e;
-import org.apache.seatunnel.e2e.common.util.ContainerUtil;
-
import org.apache.commons.lang3.StringUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
-import org.testcontainers.DockerClientFactory;
import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.images.PullPolicy;
-import org.testcontainers.utility.DockerLoggerFactory;
-import org.testcontainers.utility.MountableFile;
import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.stream.Collectors;
import static
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
public class JobClientJobProxyIT extends SeaTunnelContainer {
- private static final String JDK_DOCKER_IMAGE = "openjdk:8";
- private static final String SERVER_SHELL = "seatunnel-cluster.sh";
@Override
@BeforeAll
public void startUp() throws Exception {
+ // use seatunnel_fixed_slot_num.yaml replace seatunnel.yaml in
container
this.server =
- new GenericContainer<>(getDockerImage())
- .withNetwork(NETWORK)
- .withCommand(
- ContainerUtil.adaptPathForWin(
- Paths.get(SEATUNNEL_HOME, "bin",
SERVER_SHELL).toString()))
- .withNetworkAliases("server")
- .withImagePullPolicy(PullPolicy.alwaysPull())
- .withExposedPorts()
- .withLogConsumer(
- new Slf4jLogConsumer(
- DockerLoggerFactory.getLogger(
- "seatunnel-engine:" +
JDK_DOCKER_IMAGE)))
- .waitingFor(Wait.forListeningPort());
- copySeaTunnelStarterToContainer(server);
- server.setExposedPorts(Arrays.asList(5801));
- server.setPortBindings(Collections.singletonList("5801:5801"));
- server.withCopyFileToContainer(
- MountableFile.forHostPath(
+ createSeaTunnelContainerWithFakeSourceAndInMemorySink(
PROJECT_ROOT_PATH
- +
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
- Paths.get(SEATUNNEL_HOME, "config").toString());
+ +
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml");
+ }
- // use seatunnel_fixed_slot_num.yaml replace seatunnel.yaml in
container
- server.withCopyFileToContainer(
- MountableFile.forHostPath(
- PROJECT_ROOT_PATH
- +
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml"),
- Paths.get(SEATUNNEL_HOME, "config/seatunnel.yaml").toString());
+ @Test
+ public void testJobRetryTimes() throws IOException, InterruptedException {
+ Container.ExecResult execResult =
+ executeJob(server,
"/retry-times/stream_fake_to_inmemory_with_error_retry_1.conf");
+ Assertions.assertNotEquals(0, execResult.getExitCode());
+ Assertions.assertTrue(server.getLogs().contains("Restore time 1,
pipeline"));
+ Assertions.assertFalse(server.getLogs().contains("Restore time 3,
pipeline"));
- server.withCopyFileToContainer(
- MountableFile.forHostPath(
- PROJECT_ROOT_PATH
- +
"/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"),
- Paths.get(SEATUNNEL_HOME,
"lib/seatunnel-hadoop3-3.1.4-uber.jar").toString());
- LOG.info(
- "find images: "
- +
DockerClientFactory.lazyClient().listImagesCmd().exec().stream()
- .map(
- image -> {
- if (image.getRepoTags() != null) {
- return image.getRepoTags()[0];
- } else {
- return
image.getRepoDigests()[0];
- }
- })
- .collect(Collectors.joining(",")));
- server.start();
- // execute extra commands
- executeExtraCommands(server);
+ Container.ExecResult execResult2 =
+ executeJob(server,
"/retry-times/stream_fake_to_inmemory_with_error.conf");
+ Assertions.assertNotEquals(0, execResult2.getExitCode());
+ Assertions.assertTrue(server.getLogs().contains("Restore time 3,
pipeline"));
}
@Test
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java
index 6355f6523c..60907b3c0e 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.engine.e2e.classloader;
-import org.apache.seatunnel.common.utils.FileUtils;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
import org.apache.seatunnel.engine.e2e.SeaTunnelContainer;
import org.apache.seatunnel.engine.server.rest.RestConstant;
@@ -28,20 +27,12 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.utility.DockerLoggerFactory;
-import org.testcontainers.utility.MountableFile;
import io.restassured.response.Response;
-import java.io.File;
import java.io.IOException;
-import java.net.URL;
import java.nio.file.Path;
import java.nio.file.Paths;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -189,71 +180,10 @@ public abstract class ClassLoaderITBase extends
SeaTunnelContainer {
@BeforeEach
public void startUp() throws Exception {
server =
- new GenericContainer<>(getDockerImage())
- .withNetwork(NETWORK)
- .withEnv("TZ", "UTC")
- .withCommand(
- ContainerUtil.adaptPathForWin(
- Paths.get(SEATUNNEL_HOME, "bin",
SERVER_SHELL).toString()))
- .withNetworkAliases("server")
- .withExposedPorts()
- .withLogConsumer(
- new Slf4jLogConsumer(
- DockerLoggerFactory.getLogger(
- "seatunnel-engine:" +
JDK_DOCKER_IMAGE)))
- .waitingFor(Wait.forListeningPort());
- copySeaTunnelStarterToContainer(server);
- server.setExposedPorts(Collections.singletonList(5801));
-
- server.withCopyFileToContainer(
- MountableFile.forHostPath(
- PROJECT_ROOT_PATH
- +
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
- Paths.get(SEATUNNEL_HOME, "config").toString());
-
- server.withCopyFileToContainer(
- MountableFile.forHostPath(
+ createSeaTunnelContainerWithFakeSourceAndInMemorySink(
PROJECT_ROOT_PATH
+
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/"
- + seatunnelConfigFileName()),
- Paths.get(SEATUNNEL_HOME, "config",
"seatunnel.yaml").toString());
-
- server.withCopyFileToContainer(
- MountableFile.forHostPath(
- PROJECT_ROOT_PATH
- +
"/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"),
- Paths.get(SEATUNNEL_HOME,
"lib/seatunnel-hadoop3-3.1.4-uber.jar").toString());
-
- server.start();
- // execute extra commands
- executeExtraCommands(server);
-
- File module = new File(PROJECT_ROOT_PATH + File.separator +
getConnectorModulePath());
- List<File> connectorFiles =
- ContainerUtil.getConnectorFiles(
- module, Collections.singleton("connector-fake"),
getConnectorNamePrefix());
- URL url =
- FileUtils.searchJarFiles(
- Paths.get(
- PROJECT_ROOT_PATH
- + File.separator
- +
"seatunnel-e2e/seatunnel-e2e-common/target"))
- .stream()
- .filter(jar -> jar.toString().endsWith("-tests.jar"))
- .findFirst()
- .get();
- connectorFiles.add(new File(url.getFile()));
- connectorFiles.forEach(
- jar ->
- server.copyFileToContainer(
-
MountableFile.forHostPath(jar.getAbsolutePath()),
- Paths.get(SEATUNNEL_HOME, "connectors",
jar.getName()).toString()));
-
- server.copyFileToContainer(
- MountableFile.forHostPath(
- PROJECT_ROOT_PATH
- +
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties"),
- Paths.get(SEATUNNEL_HOME, "connectors",
"plugin-mapping.properties").toString());
+ + seatunnelConfigFileName());
}
@AfterEach
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake-and-inmemory/plugin-mapping.properties
similarity index 96%
copy from
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties
copy to
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake-and-inmemory/plugin-mapping.properties
index 9bdf502045..e860ea8b16 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake-and-inmemory/plugin-mapping.properties
@@ -23,4 +23,5 @@
# SeaTunnel Connector-V2
seatunnel.source.FakeSource = connector-fake
+seatunnel.sink.Console = connector-console
seatunnel.sink.InMemory = seatunnel-e2e-common
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/retry-times/stream_fake_to_inmemory_with_error.conf
similarity index 56%
copy from
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties
copy to
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/retry-times/stream_fake_to_inmemory_with_error.conf
index 9bdf502045..47c6c9cb8a 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/retry-times/stream_fake_to_inmemory_with_error.conf
@@ -14,13 +14,38 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
-# This mapping is used to resolve the Jar package name without version (or
call artifactId)
-# corresponding to the module in the user Config, helping SeaTunnel to load
the correct Jar package.
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+}
-## *** WARNING **** : `seatunnel.source.XXX`, the `XXX` should be string which
SeaTunnelSource::getPluginName and TableSinkFactory::factoryIdentifier returned
value##
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ row.num = 100
+ split.num = 5
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ parallelism = 1
+ }
+}
-# SeaTunnel Connector-V2
+transform {
+}
-seatunnel.source.FakeSource = connector-fake
-seatunnel.sink.InMemory = seatunnel-e2e-common
+sink {
+ InMemory {
+ source_table_name="fake"
+ throw_exception=true
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/retry-times/stream_fake_to_inmemory_with_error_retry_1.conf
similarity index 55%
rename from
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties
rename to
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/retry-times/stream_fake_to_inmemory_with_error_retry_1.conf
index 9bdf502045..77c263ae0e 100644
---
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties
+++
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/retry-times/stream_fake_to_inmemory_with_error_retry_1.conf
@@ -14,13 +14,39 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
-# This mapping is used to resolve the Jar package name without version (or
call artifactId)
-# corresponding to the module in the user Config, helping SeaTunnel to load
the correct Jar package.
+env {
+ parallelism = 1
+ job.mode = "STREAMING"
+ checkpoint.interval = 5000
+ job.retry.times = 1
+}
-## *** WARNING **** : `seatunnel.source.XXX`, the `XXX` should be string which
SeaTunnelSource::getPluginName and TableSinkFactory::factoryIdentifier returned
value##
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ row.num = 100
+ split.num = 5
+ schema = {
+ fields {
+ name = "string"
+ age = "int"
+ }
+ }
+ parallelism = 1
+ }
+}
-# SeaTunnel Connector-V2
+transform {
+}
-seatunnel.source.FakeSource = connector-fake
-seatunnel.sink.InMemory = seatunnel-e2e-common
+sink {
+ InMemory {
+ source_table_name="fake"
+ throw_exception=true
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index a2623d5b2c..571e0f7125 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.engine.server.dag.physical;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
@@ -51,7 +52,7 @@ import java.util.concurrent.atomic.AtomicReference;
public class SubPlan {
/** The max num pipeline can restore. */
- public static final int PIPELINE_MAX_RESTORE_NUM = 3; // TODO should set
by config
+ private final int pipelineMaxRestoreNum;
private final List<PhysicalVertex> physicalVertexList;
@@ -98,7 +99,7 @@ public class SubPlan {
private final Object restoreLock = new Object();
- private volatile PipelineStatus currPipelineStatus =
PipelineStatus.INITIALIZING;
+ private volatile PipelineStatus currPipelineStatus;
public volatile boolean isRunning = false;
@@ -121,7 +122,15 @@ public class SubPlan {
this.physicalVertexList = physicalVertexList;
this.coordinatorVertexList = coordinatorVertexList;
pipelineRestoreNum = 0;
-
+ pipelineMaxRestoreNum =
+ Integer.parseInt(
+ jobImmutableInformation
+ .getJobConfig()
+ .getEnvOptions()
+ .computeIfAbsent(
+ EnvCommonOptions.JOB_RETRY_TIMES.key(),
+ key ->
EnvCommonOptions.JOB_RETRY_TIMES.defaultValue())
+ .toString());
Long[] stateTimestamps = new Long[PipelineStatus.values().length];
if (runningJobStateTimestampsIMap.get(pipelineLocation) == null) {
stateTimestamps[PipelineStatus.INITIALIZING.ordinal()] =
initializationTimestamp;
@@ -302,7 +311,7 @@ public class SubPlan {
}
public boolean canRestorePipeline() {
- return jobMaster.isNeedRestore() && getPipelineRestoreNum() <
PIPELINE_MAX_RESTORE_NUM;
+ return jobMaster.isNeedRestore() && getPipelineRestoreNum() <
pipelineMaxRestoreNum;
}
public synchronized void updatePipelineState(@NonNull PipelineStatus
targetState) {