This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 790150a54b [Feature][Zeta] Support config job retry times in job 
config (#6690)
790150a54b is described below

commit 790150a54b559148657a599fa16bf00539a5ab74
Author: Jia Fan <[email protected]>
AuthorDate: Fri Apr 12 19:24:04 2024 +0800

    [Feature][Zeta] Support config job retry times in job config (#6690)
---
 docs/en/concept/JobEnvConfig.md                    |  6 +-
 .../apache/seatunnel/api/env/EnvCommonOptions.java |  6 ++
 .../container/seatunnel/SeaTunnelContainer.java    | 78 +++++++++++++++++++++-
 .../seatunnel/engine/e2e/JobClientJobProxyIT.java  | 76 +++++----------------
 .../engine/e2e/classloader/ClassLoaderITBase.java  | 74 +-------------------
 .../plugin-mapping.properties                      |  1 +
 .../stream_fake_to_inmemory_with_error.conf}       | 37 ++++++++--
 ...tream_fake_to_inmemory_with_error_retry_1.conf} | 38 +++++++++--
 .../engine/server/dag/physical/SubPlan.java        | 17 +++--
 9 files changed, 182 insertions(+), 151 deletions(-)

diff --git a/docs/en/concept/JobEnvConfig.md b/docs/en/concept/JobEnvConfig.md
index 3608b500f7..32bf089e92 100644
--- a/docs/en/concept/JobEnvConfig.md
+++ b/docs/en/concept/JobEnvConfig.md
@@ -1,4 +1,4 @@
-# JobEnvConfig
+# Job Env Config
 
 This document describes env configuration information, the common parameters 
can be used in all engines. In order to better distinguish between engine 
parameters, the additional parameters of other engine need to carry a prefix.
 In flink engine, we use `flink.` as the prefix. In the spark engine, we do not 
use any prefixes to modify parameters, because the official spark parameters 
themselves start with `spark.`
@@ -29,6 +29,10 @@ In `STREAMING` mode, checkpoints is required, if you do not 
set it, it will be o
 
 This parameter configures the parallelism of source and sink.
 
+### job.retry.times
+
+Used to control the default retry times when a job fails. The default value is 
3, and it only works in the Zeta engine.
+
 ### shade.identifier
 
 Specify the method of encryption, if you didn't have the requirement for 
encrypting or decrypting config files, this option can be ignored.
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index 0c010bfb84..75e58a5f5b 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -44,6 +44,12 @@ public interface EnvCommonOptions {
                     .defaultValue(JobMode.BATCH)
                     .withDescription("The job mode of this job, support Batch 
and Stream");
 
+    Option<Integer> JOB_RETRY_TIMES =
+            Options.key("job.retry.times")
+                    .intType()
+                    .defaultValue(3)
+                    .withDescription("The retry times of this job");
+
     Option<Long> CHECKPOINT_INTERVAL =
             Options.key("checkpoint.interval")
                     .longType()
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
index 4d68cea7f8..1f59f302f8 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/seatunnel/SeaTunnelContainer.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.e2e.common.container.seatunnel;
 
+import org.apache.seatunnel.common.utils.FileUtils;
 import org.apache.seatunnel.e2e.common.container.AbstractTestContainer;
 import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
@@ -45,7 +46,9 @@ import groovy.lang.Tuple2;
 import lombok.NoArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
+import java.io.File;
 import java.io.IOException;
+import java.net.URL;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -75,7 +78,11 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
 
     @Override
     public void startUp() throws Exception {
-        server =
+        server = createSeaTunnelServer();
+    }
+
+    private GenericContainer<?> createSeaTunnelServer() throws IOException, 
InterruptedException {
+        GenericContainer<?> server =
                 new GenericContainer<>(getDockerImage())
                         .withNetwork(NETWORK)
                         .withEnv("TZ", "UTC")
@@ -106,6 +113,75 @@ public class SeaTunnelContainer extends 
AbstractTestContainer {
         executeExtraCommands(server);
 
         server.start();
+        return server;
+    }
+
+    protected GenericContainer<?> 
createSeaTunnelContainerWithFakeSourceAndInMemorySink(
+            String configFilePath) throws IOException, InterruptedException {
+        GenericContainer<?> server =
+                new GenericContainer<>(getDockerImage())
+                        .withNetwork(NETWORK)
+                        .withEnv("TZ", "UTC")
+                        .withCommand(
+                                ContainerUtil.adaptPathForWin(
+                                        Paths.get(SEATUNNEL_HOME, "bin", 
SERVER_SHELL).toString()))
+                        .withNetworkAliases("server")
+                        .withExposedPorts()
+                        .withLogConsumer(
+                                new Slf4jLogConsumer(
+                                        DockerLoggerFactory.getLogger(
+                                                "seatunnel-engine:" + 
JDK_DOCKER_IMAGE)))
+                        .waitingFor(Wait.forListeningPort());
+        copySeaTunnelStarterToContainer(server);
+        server.setPortBindings(Collections.singletonList("5801:5801"));
+        server.setExposedPorts(Collections.singletonList(5801));
+
+        server.withCopyFileToContainer(
+                MountableFile.forHostPath(
+                        PROJECT_ROOT_PATH
+                                + 
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
+                Paths.get(SEATUNNEL_HOME, "config").toString());
+
+        server.withCopyFileToContainer(
+                MountableFile.forHostPath(configFilePath),
+                Paths.get(SEATUNNEL_HOME, "config", 
"seatunnel.yaml").toString());
+
+        server.withCopyFileToContainer(
+                MountableFile.forHostPath(
+                        PROJECT_ROOT_PATH
+                                + 
"/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"),
+                Paths.get(SEATUNNEL_HOME, 
"lib/seatunnel-hadoop3-3.1.4-uber.jar").toString());
+
+        server.start();
+        // execute extra commands
+        executeExtraCommands(server);
+
+        File module = new File(PROJECT_ROOT_PATH + File.separator + 
getConnectorModulePath());
+        List<File> connectorFiles =
+                ContainerUtil.getConnectorFiles(
+                        module, Collections.singleton("connector-fake"), 
getConnectorNamePrefix());
+        URL url =
+                FileUtils.searchJarFiles(
+                                Paths.get(
+                                        PROJECT_ROOT_PATH
+                                                + File.separator
+                                                + 
"seatunnel-e2e/seatunnel-e2e-common/target"))
+                        .stream()
+                        .filter(jar -> jar.toString().endsWith("-tests.jar"))
+                        .findFirst()
+                        .get();
+        connectorFiles.add(new File(url.getFile()));
+        connectorFiles.forEach(
+                jar ->
+                        server.copyFileToContainer(
+                                
MountableFile.forHostPath(jar.getAbsolutePath()),
+                                Paths.get(SEATUNNEL_HOME, "connectors", 
jar.getName()).toString()));
+        server.copyFileToContainer(
+                MountableFile.forHostPath(
+                        PROJECT_ROOT_PATH
+                                + 
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake-and-inmemory/plugin-mapping.properties"),
+                Paths.get(SEATUNNEL_HOME, "connectors", 
"plugin-mapping.properties").toString());
+        return server;
     }
 
     @Override
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
index 7c2e332ba5..84d0e51608 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/JobClientJobProxyIT.java
@@ -17,87 +17,41 @@
 
 package org.apache.seatunnel.engine.e2e;
 
-import org.apache.seatunnel.e2e.common.util.ContainerUtil;
-
 import org.apache.commons.lang3.StringUtils;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
-import org.testcontainers.DockerClientFactory;
 import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.images.PullPolicy;
-import org.testcontainers.utility.DockerLoggerFactory;
-import org.testcontainers.utility.MountableFile;
 
 import java.io.IOException;
-import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.stream.Collectors;
 
 import static 
org.apache.seatunnel.e2e.common.util.ContainerUtil.PROJECT_ROOT_PATH;
 
 public class JobClientJobProxyIT extends SeaTunnelContainer {
-    private static final String JDK_DOCKER_IMAGE = "openjdk:8";
-    private static final String SERVER_SHELL = "seatunnel-cluster.sh";
 
     @Override
     @BeforeAll
     public void startUp() throws Exception {
+        // use seatunnel_fixed_slot_num.yaml replace seatunnel.yaml in 
container
         this.server =
-                new GenericContainer<>(getDockerImage())
-                        .withNetwork(NETWORK)
-                        .withCommand(
-                                ContainerUtil.adaptPathForWin(
-                                        Paths.get(SEATUNNEL_HOME, "bin", 
SERVER_SHELL).toString()))
-                        .withNetworkAliases("server")
-                        .withImagePullPolicy(PullPolicy.alwaysPull())
-                        .withExposedPorts()
-                        .withLogConsumer(
-                                new Slf4jLogConsumer(
-                                        DockerLoggerFactory.getLogger(
-                                                "seatunnel-engine:" + 
JDK_DOCKER_IMAGE)))
-                        .waitingFor(Wait.forListeningPort());
-        copySeaTunnelStarterToContainer(server);
-        server.setExposedPorts(Arrays.asList(5801));
-        server.setPortBindings(Collections.singletonList("5801:5801"));
-        server.withCopyFileToContainer(
-                MountableFile.forHostPath(
+                createSeaTunnelContainerWithFakeSourceAndInMemorySink(
                         PROJECT_ROOT_PATH
-                                + 
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
-                Paths.get(SEATUNNEL_HOME, "config").toString());
+                                + 
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml");
+    }
 
-        // use seatunnel_fixed_slot_num.yaml replace seatunnel.yaml in 
container
-        server.withCopyFileToContainer(
-                MountableFile.forHostPath(
-                        PROJECT_ROOT_PATH
-                                + 
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/seatunnel_fixed_slot_num.yaml"),
-                Paths.get(SEATUNNEL_HOME, "config/seatunnel.yaml").toString());
+    @Test
+    public void testJobRetryTimes() throws IOException, InterruptedException {
+        Container.ExecResult execResult =
+                executeJob(server, 
"/retry-times/stream_fake_to_inmemory_with_error_retry_1.conf");
+        Assertions.assertNotEquals(0, execResult.getExitCode());
+        Assertions.assertTrue(server.getLogs().contains("Restore time 1, 
pipeline"));
+        Assertions.assertFalse(server.getLogs().contains("Restore time 3, 
pipeline"));
 
-        server.withCopyFileToContainer(
-                MountableFile.forHostPath(
-                        PROJECT_ROOT_PATH
-                                + 
"/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"),
-                Paths.get(SEATUNNEL_HOME, 
"lib/seatunnel-hadoop3-3.1.4-uber.jar").toString());
-        LOG.info(
-                "find images: "
-                        + 
DockerClientFactory.lazyClient().listImagesCmd().exec().stream()
-                                .map(
-                                        image -> {
-                                            if (image.getRepoTags() != null) {
-                                                return image.getRepoTags()[0];
-                                            } else {
-                                                return 
image.getRepoDigests()[0];
-                                            }
-                                        })
-                                .collect(Collectors.joining(",")));
-        server.start();
-        // execute extra commands
-        executeExtraCommands(server);
+        Container.ExecResult execResult2 =
+                executeJob(server, 
"/retry-times/stream_fake_to_inmemory_with_error.conf");
+        Assertions.assertNotEquals(0, execResult2.getExitCode());
+        Assertions.assertTrue(server.getLogs().contains("Restore time 3, 
pipeline"));
     }
 
     @Test
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java
index 6355f6523c..60907b3c0e 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/classloader/ClassLoaderITBase.java
@@ -17,7 +17,6 @@
 
 package org.apache.seatunnel.engine.e2e.classloader;
 
-import org.apache.seatunnel.common.utils.FileUtils;
 import org.apache.seatunnel.e2e.common.util.ContainerUtil;
 import org.apache.seatunnel.engine.e2e.SeaTunnelContainer;
 import org.apache.seatunnel.engine.server.rest.RestConstant;
@@ -28,20 +27,12 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.testcontainers.containers.Container;
-import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
-import org.testcontainers.containers.wait.strategy.Wait;
-import org.testcontainers.utility.DockerLoggerFactory;
-import org.testcontainers.utility.MountableFile;
 
 import io.restassured.response.Response;
 
-import java.io.File;
 import java.io.IOException;
-import java.net.URL;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
@@ -189,71 +180,10 @@ public abstract class ClassLoaderITBase extends 
SeaTunnelContainer {
     @BeforeEach
     public void startUp() throws Exception {
         server =
-                new GenericContainer<>(getDockerImage())
-                        .withNetwork(NETWORK)
-                        .withEnv("TZ", "UTC")
-                        .withCommand(
-                                ContainerUtil.adaptPathForWin(
-                                        Paths.get(SEATUNNEL_HOME, "bin", 
SERVER_SHELL).toString()))
-                        .withNetworkAliases("server")
-                        .withExposedPorts()
-                        .withLogConsumer(
-                                new Slf4jLogConsumer(
-                                        DockerLoggerFactory.getLogger(
-                                                "seatunnel-engine:" + 
JDK_DOCKER_IMAGE)))
-                        .waitingFor(Wait.forListeningPort());
-        copySeaTunnelStarterToContainer(server);
-        server.setExposedPorts(Collections.singletonList(5801));
-
-        server.withCopyFileToContainer(
-                MountableFile.forHostPath(
-                        PROJECT_ROOT_PATH
-                                + 
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/"),
-                Paths.get(SEATUNNEL_HOME, "config").toString());
-
-        server.withCopyFileToContainer(
-                MountableFile.forHostPath(
+                createSeaTunnelContainerWithFakeSourceAndInMemorySink(
                         PROJECT_ROOT_PATH
                                 + 
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/"
-                                + seatunnelConfigFileName()),
-                Paths.get(SEATUNNEL_HOME, "config", 
"seatunnel.yaml").toString());
-
-        server.withCopyFileToContainer(
-                MountableFile.forHostPath(
-                        PROJECT_ROOT_PATH
-                                + 
"/seatunnel-shade/seatunnel-hadoop3-3.1.4-uber/target/seatunnel-hadoop3-3.1.4-uber.jar"),
-                Paths.get(SEATUNNEL_HOME, 
"lib/seatunnel-hadoop3-3.1.4-uber.jar").toString());
-
-        server.start();
-        // execute extra commands
-        executeExtraCommands(server);
-
-        File module = new File(PROJECT_ROOT_PATH + File.separator + 
getConnectorModulePath());
-        List<File> connectorFiles =
-                ContainerUtil.getConnectorFiles(
-                        module, Collections.singleton("connector-fake"), 
getConnectorNamePrefix());
-        URL url =
-                FileUtils.searchJarFiles(
-                                Paths.get(
-                                        PROJECT_ROOT_PATH
-                                                + File.separator
-                                                + 
"seatunnel-e2e/seatunnel-e2e-common/target"))
-                        .stream()
-                        .filter(jar -> jar.toString().endsWith("-tests.jar"))
-                        .findFirst()
-                        .get();
-        connectorFiles.add(new File(url.getFile()));
-        connectorFiles.forEach(
-                jar ->
-                        server.copyFileToContainer(
-                                
MountableFile.forHostPath(jar.getAbsolutePath()),
-                                Paths.get(SEATUNNEL_HOME, "connectors", 
jar.getName()).toString()));
-
-        server.copyFileToContainer(
-                MountableFile.forHostPath(
-                        PROJECT_ROOT_PATH
-                                + 
"/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties"),
-                Paths.get(SEATUNNEL_HOME, "connectors", 
"plugin-mapping.properties").toString());
+                                + seatunnelConfigFileName());
     }
 
     @AfterEach
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake-and-inmemory/plugin-mapping.properties
similarity index 96%
copy from 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties
copy to 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake-and-inmemory/plugin-mapping.properties
index 9bdf502045..e860ea8b16 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/fake-and-inmemory/plugin-mapping.properties
@@ -23,4 +23,5 @@
 # SeaTunnel Connector-V2
 
 seatunnel.source.FakeSource = connector-fake
+seatunnel.sink.Console = connector-console
 seatunnel.sink.InMemory = seatunnel-e2e-common
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/retry-times/stream_fake_to_inmemory_with_error.conf
similarity index 56%
copy from 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties
copy to 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/retry-times/stream_fake_to_inmemory_with_error.conf
index 9bdf502045..47c6c9cb8a 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/retry-times/stream_fake_to_inmemory_with_error.conf
@@ -14,13 +14,38 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
 
-# This mapping is used to resolve the Jar package name without version (or 
call artifactId)
-# corresponding to the module in the user Config, helping SeaTunnel to load 
the correct Jar package.
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+}
 
-## *** WARNING **** : `seatunnel.source.XXX`, the `XXX` should be string which 
SeaTunnelSource::getPluginName and TableSinkFactory::factoryIdentifier returned 
value##
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+    FakeSource {
+      result_table_name = "fake"
+      row.num = 100
+      split.num = 5
+      schema = {
+        fields {
+          name = "string"
+          age = "int"
+        }
+      }
+      parallelism = 1
+    }
+}
 
-# SeaTunnel Connector-V2
+transform {
+}
 
-seatunnel.source.FakeSource = connector-fake
-seatunnel.sink.InMemory = seatunnel-e2e-common
+sink {
+  InMemory {
+    source_table_name="fake"
+    throw_exception=true
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties
 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/retry-times/stream_fake_to_inmemory_with_error_retry_1.conf
similarity index 55%
rename from 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties
rename to 
seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/retry-times/stream_fake_to_inmemory_with_error_retry_1.conf
index 9bdf502045..77c263ae0e 100644
--- 
a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/classloader/plugin-mapping.properties
+++ 
b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/retry-times/stream_fake_to_inmemory_with_error_retry_1.conf
@@ -14,13 +14,39 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
 
-# This mapping is used to resolve the Jar package name without version (or 
call artifactId)
-# corresponding to the module in the user Config, helping SeaTunnel to load 
the correct Jar package.
+env {
+  parallelism = 1
+  job.mode = "STREAMING"
+  checkpoint.interval = 5000
+  job.retry.times = 1
+}
 
-## *** WARNING **** : `seatunnel.source.XXX`, the `XXX` should be string which 
SeaTunnelSource::getPluginName and TableSinkFactory::factoryIdentifier returned 
value##
+source {
+  # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
+    FakeSource {
+      result_table_name = "fake"
+      row.num = 100
+      split.num = 5
+      schema = {
+        fields {
+          name = "string"
+          age = "int"
+        }
+      }
+      parallelism = 1
+    }
+}
 
-# SeaTunnel Connector-V2
+transform {
+}
 
-seatunnel.source.FakeSource = connector-fake
-seatunnel.sink.InMemory = seatunnel-e2e-common
+sink {
+  InMemory {
+    source_table_name="fake"
+    throw_exception=true
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
index a2623d5b2c..571e0f7125 100644
--- 
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
+++ 
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/SubPlan.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.engine.server.dag.physical;
 
+import org.apache.seatunnel.api.env.EnvCommonOptions;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.common.utils.RetryUtils;
 import org.apache.seatunnel.engine.common.Constant;
@@ -51,7 +52,7 @@ import java.util.concurrent.atomic.AtomicReference;
 public class SubPlan {
 
     /** The max num pipeline can restore. */
-    public static final int PIPELINE_MAX_RESTORE_NUM = 3; // TODO should set 
by config
+    private final int pipelineMaxRestoreNum;
 
     private final List<PhysicalVertex> physicalVertexList;
 
@@ -98,7 +99,7 @@ public class SubPlan {
 
     private final Object restoreLock = new Object();
 
-    private volatile PipelineStatus currPipelineStatus = 
PipelineStatus.INITIALIZING;
+    private volatile PipelineStatus currPipelineStatus;
 
     public volatile boolean isRunning = false;
 
@@ -121,7 +122,15 @@ public class SubPlan {
         this.physicalVertexList = physicalVertexList;
         this.coordinatorVertexList = coordinatorVertexList;
         pipelineRestoreNum = 0;
-
+        pipelineMaxRestoreNum =
+                Integer.parseInt(
+                        jobImmutableInformation
+                                .getJobConfig()
+                                .getEnvOptions()
+                                .computeIfAbsent(
+                                        EnvCommonOptions.JOB_RETRY_TIMES.key(),
+                                        key -> 
EnvCommonOptions.JOB_RETRY_TIMES.defaultValue())
+                                .toString());
         Long[] stateTimestamps = new Long[PipelineStatus.values().length];
         if (runningJobStateTimestampsIMap.get(pipelineLocation) == null) {
             stateTimestamps[PipelineStatus.INITIALIZING.ordinal()] = 
initializationTimestamp;
@@ -302,7 +311,7 @@ public class SubPlan {
     }
 
     public boolean canRestorePipeline() {
-        return jobMaster.isNeedRestore() && getPipelineRestoreNum() < 
PIPELINE_MAX_RESTORE_NUM;
+        return jobMaster.isNeedRestore() && getPipelineRestoreNum() < 
pipelineMaxRestoreNum;
     }
 
     public synchronized void updatePipelineState(@NonNull PipelineStatus 
targetState) {

Reply via email to