This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a6157cb38 [Improve][Zeta] Don't trigger handleSaveMode when restore 
(#5192)
a6157cb38 is described below

commit a6157cb38fc9be6140be349f4846bbede4da74c4
Author: Jia Fan <[email protected]>
AuthorDate: Thu Aug 3 18:18:54 2023 +0800

    [Improve][Zeta] Don't trigger handleSaveMode when restore (#5192)
---
 .../engine/client/job/JobExecutionEnvironment.java         |  2 +-
 .../seatunnel/engine/core/parse/JobConfigParser.java       | 12 +++++++++---
 .../engine/core/parse/MultipleTableJobConfigParser.java    | 14 ++++++++++----
 3 files changed, 20 insertions(+), 8 deletions(-)

diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index 9f28f6fdb..bf3169e4c 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -131,7 +131,7 @@ public class JobExecutionEnvironment {
 
     private MultipleTableJobConfigParser getJobConfigParser() {
         return new MultipleTableJobConfigParser(
-                jobFilePath, idGenerator, jobConfig, commonPluginJars);
+                jobFilePath, idGenerator, jobConfig, commonPluginJars, 
isStartWithSavePoint);
     }
 
     private LogicalDagGenerator getLogicalDagGenerator() {
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index d81de1702..09bae74f5 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -63,12 +63,16 @@ import static 
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParse
 public class JobConfigParser {
     private static final ILogger LOGGER = 
Logger.getLogger(JobConfigParser.class);
     private IdGenerator idGenerator;
-
+    private boolean isStartWithSavePoint;
     private List<URL> commonPluginJars;
 
-    public JobConfigParser(@NonNull IdGenerator idGenerator, @NonNull 
List<URL> commonPluginJars) {
+    public JobConfigParser(
+            @NonNull IdGenerator idGenerator,
+            @NonNull List<URL> commonPluginJars,
+            boolean isStartWithSavePoint) {
         this.idGenerator = idGenerator;
         this.commonPluginJars = commonPluginJars;
+        this.isStartWithSavePoint = isStartWithSavePoint;
     }
 
     public Tuple2<CatalogTable, Action> parseSource(
@@ -190,7 +194,9 @@ public class JobConfigParser {
         sink.prepare(config);
         sink.setJobContext(jobConfig.getJobContext());
         sink.setTypeInfo(rowType);
-        handleSaveMode(sink);
+        if (!isStartWithSavePoint) {
+            handleSaveMode(sink);
+        }
         final String actionName =
                 createSinkActionName(0, tuple.getLeft().getPluginName(), 
getTableName(config));
         final SinkAction action =
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 09027a2a2..86c0f3c94 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -107,23 +107,27 @@ public class MultipleTableJobConfigParser {
     private final ReadonlyConfig envOptions;
 
     private final JobConfigParser fallbackParser;
+    private final boolean isStartWithSavePoint;
 
     public MultipleTableJobConfigParser(
             String jobDefineFilePath, IdGenerator idGenerator, JobConfig 
jobConfig) {
-        this(jobDefineFilePath, idGenerator, jobConfig, 
Collections.emptyList());
+        this(jobDefineFilePath, idGenerator, jobConfig, 
Collections.emptyList(), false);
     }
 
     public MultipleTableJobConfigParser(
             String jobDefineFilePath,
             IdGenerator idGenerator,
             JobConfig jobConfig,
-            List<URL> commonPluginJars) {
+            List<URL> commonPluginJars,
+            boolean isStartWithSavePoint) {
         this.idGenerator = idGenerator;
         this.jobConfig = jobConfig;
         this.commonPluginJars = commonPluginJars;
+        this.isStartWithSavePoint = isStartWithSavePoint;
         this.seaTunnelJobConfig = 
ConfigBuilder.of(Paths.get(jobDefineFilePath));
         this.envOptions = 
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
-        this.fallbackParser = new JobConfigParser(idGenerator, 
commonPluginJars);
+        this.fallbackParser =
+                new JobConfigParser(idGenerator, commonPluginJars, 
isStartWithSavePoint);
     }
 
     public ImmutablePair<List<Action>, Set<URL>> parse() {
@@ -607,7 +611,9 @@ public class MultipleTableJobConfigParser {
                         sink,
                         factoryUrls,
                         actionConfig);
-        handleSaveMode(sink);
+        if (!isStartWithSavePoint) {
+            handleSaveMode(sink);
+        }
         sinkAction.setParallelism(parallelism);
         return sinkAction;
     }

Reply via email to