This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a6157cb38 [Improve][Zeta] Don't trigger handleSaveMode when restore
(#5192)
a6157cb38 is described below
commit a6157cb38fc9be6140be349f4846bbede4da74c4
Author: Jia Fan <[email protected]>
AuthorDate: Thu Aug 3 18:18:54 2023 +0800
[Improve][Zeta] Don't trigger handleSaveMode when restore (#5192)
---
.../engine/client/job/JobExecutionEnvironment.java | 2 +-
.../seatunnel/engine/core/parse/JobConfigParser.java | 12 +++++++++---
.../engine/core/parse/MultipleTableJobConfigParser.java | 14 ++++++++++----
3 files changed, 20 insertions(+), 8 deletions(-)
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index 9f28f6fdb..bf3169e4c 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -131,7 +131,7 @@ public class JobExecutionEnvironment {
private MultipleTableJobConfigParser getJobConfigParser() {
return new MultipleTableJobConfigParser(
- jobFilePath, idGenerator, jobConfig, commonPluginJars);
+ jobFilePath, idGenerator, jobConfig, commonPluginJars,
isStartWithSavePoint);
}
private LogicalDagGenerator getLogicalDagGenerator() {
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index d81de1702..09bae74f5 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -63,12 +63,16 @@ import static
org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParse
public class JobConfigParser {
private static final ILogger LOGGER =
Logger.getLogger(JobConfigParser.class);
private IdGenerator idGenerator;
-
+ private boolean isStartWithSavePoint;
private List<URL> commonPluginJars;
- public JobConfigParser(@NonNull IdGenerator idGenerator, @NonNull
List<URL> commonPluginJars) {
+ public JobConfigParser(
+ @NonNull IdGenerator idGenerator,
+ @NonNull List<URL> commonPluginJars,
+ boolean isStartWithSavePoint) {
this.idGenerator = idGenerator;
this.commonPluginJars = commonPluginJars;
+ this.isStartWithSavePoint = isStartWithSavePoint;
}
public Tuple2<CatalogTable, Action> parseSource(
@@ -190,7 +194,9 @@ public class JobConfigParser {
sink.prepare(config);
sink.setJobContext(jobConfig.getJobContext());
sink.setTypeInfo(rowType);
- handleSaveMode(sink);
+ if (!isStartWithSavePoint) {
+ handleSaveMode(sink);
+ }
final String actionName =
createSinkActionName(0, tuple.getLeft().getPluginName(),
getTableName(config));
final SinkAction action =
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 09027a2a2..86c0f3c94 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -107,23 +107,27 @@ public class MultipleTableJobConfigParser {
private final ReadonlyConfig envOptions;
private final JobConfigParser fallbackParser;
+ private final boolean isStartWithSavePoint;
public MultipleTableJobConfigParser(
String jobDefineFilePath, IdGenerator idGenerator, JobConfig
jobConfig) {
- this(jobDefineFilePath, idGenerator, jobConfig,
Collections.emptyList());
+ this(jobDefineFilePath, idGenerator, jobConfig,
Collections.emptyList(), false);
}
public MultipleTableJobConfigParser(
String jobDefineFilePath,
IdGenerator idGenerator,
JobConfig jobConfig,
- List<URL> commonPluginJars) {
+ List<URL> commonPluginJars,
+ boolean isStartWithSavePoint) {
this.idGenerator = idGenerator;
this.jobConfig = jobConfig;
this.commonPluginJars = commonPluginJars;
+ this.isStartWithSavePoint = isStartWithSavePoint;
this.seaTunnelJobConfig =
ConfigBuilder.of(Paths.get(jobDefineFilePath));
this.envOptions =
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
- this.fallbackParser = new JobConfigParser(idGenerator,
commonPluginJars);
+ this.fallbackParser =
+ new JobConfigParser(idGenerator, commonPluginJars,
isStartWithSavePoint);
}
public ImmutablePair<List<Action>, Set<URL>> parse() {
@@ -607,7 +611,9 @@ public class MultipleTableJobConfigParser {
sink,
factoryUrls,
actionConfig);
- handleSaveMode(sink);
+ if (!isStartWithSavePoint) {
+ handleSaveMode(sink);
+ }
sinkAction.setParallelism(parallelism);
return sinkAction;
}