This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new 5d2415117 [improve][DAG] use multi-table config parser (#4176)
5d2415117 is described below
commit 5d24151172d40307f507d2df9678a81450a5a70f
Author: Zongwen Li <[email protected]>
AuthorDate: Tue Feb 21 14:45:07 2023 +0800
[improve][DAG] use multi-table config parser (#4176)
---
.../apache/seatunnel/api/env/EnvCommonOptions.java | 18 ++++++++++++------
.../engine/client/job/JobExecutionEnvironment.java | 6 +++---
.../core/parse/MultipleTableJobConfigParser.java | 22 ++++++++++++++++++----
3 files changed, 33 insertions(+), 13 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index fa454684a..4b596e93f 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -23,33 +23,39 @@ import org.apache.seatunnel.common.constants.JobMode;
import java.util.Map;
-public class EnvCommonOptions {
+public interface EnvCommonOptions {
- public static final Option<String> JOB_NAME =
+ Option<String> JOB_NAME =
Options.key("job.name")
.stringType()
.defaultValue("SeaTunnel_Job")
.withDescription("The job name of this job");
- public static final Option<JobMode> JOB_MODE =
+ Option<JobMode> JOB_MODE =
Options.key("job.mode")
.enumType(JobMode.class)
.noDefaultValue()
.withDescription("The job mode of this job, support Batch and
Stream");
- public static final Option<Long> CHECKPOINT_INTERVAL =
+ Option<Boolean> MULTIPLE_TABLE_ENABLE =
+ Options.key("multi-table.enable")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("Whether to enable parsing support for
multi-table jobs");
+
+ Option<Long> CHECKPOINT_INTERVAL =
Options.key("checkpoint.interval")
.longType()
.noDefaultValue()
.withDescription("The interval (in milliseconds) between two
consecutive checkpoints.");
- public static final Option<String> JARS =
+ Option<String> JARS =
Options.key("jars")
.stringType()
.noDefaultValue()
.withDescription("third-party packages can be loaded via `jars`");
- public static final Option<Map<String, String>> CUSTOM_PARAMETERS =
+ Option<Map<String, String>> CUSTOM_PARAMETERS =
Options.key("custom_parameters")
.mapType()
.noDefaultValue()
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index 11ebbe631..35104d3da 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -29,7 +29,7 @@ import org.apache.seatunnel.engine.core.dag.actions.Action;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
-import org.apache.seatunnel.engine.core.parse.JobConfigParser;
+import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
@@ -127,8 +127,8 @@ public class JobExecutionEnvironment {
return Collections.emptySet();
}
- private JobConfigParser getJobConfigParser() {
- return new JobConfigParser(jobFilePath, idGenerator, jobConfig,
commonPluginJars);
+ private MultipleTableJobConfigParser getJobConfigParser() {
+ return new MultipleTableJobConfigParser(jobFilePath, idGenerator,
jobConfig, commonPluginJars);
}
private LogicalDagGenerator getLogicalDagGenerator() {
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 8902b90a6..02bf2211a 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -61,11 +61,11 @@ import scala.Tuple2;
public class MultipleTableJobConfigParser {
private static final ILogger LOGGER =
Logger.getLogger(JobConfigParser.class);
- private final String jobDefineFilePath;
+
private final IdGenerator idGenerator;
private final JobConfig jobConfig;
- private final List<URL> commonFactoryJars;
+ private final List<URL> commonPluginJars;
private final Config seaTunnelJobConfig;
private final ReadonlyConfig envOptions;
@@ -74,6 +74,8 @@ public class MultipleTableJobConfigParser {
private final Set<URL> jarUrls;
+ private final JobConfigParser fallbackParser;
+
public MultipleTableJobConfigParser(String jobDefineFilePath,
IdGenerator idGenerator,
JobConfig jobConfig) {
@@ -87,17 +89,20 @@ public class MultipleTableJobConfigParser {
IdGenerator idGenerator,
JobConfig jobConfig,
List<URL> commonPluginJars) {
- this.jobDefineFilePath = jobDefineFilePath;
this.idGenerator = idGenerator;
this.jobConfig = jobConfig;
- this.commonFactoryJars = commonPluginJars;
+ this.commonPluginJars = commonPluginJars;
this.seaTunnelJobConfig =
ConfigBuilder.of(Paths.get(jobDefineFilePath));
this.envOptions =
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
this.graph = new HashMap<>();
this.jarUrls = new HashSet<>();
+ this.fallbackParser = new JobConfigParser(jobDefineFilePath,
idGenerator, jobConfig, commonPluginJars);
}
public ImmutablePair<List<Action>, Set<URL>> parse() {
+ if (!envOptions.get(EnvCommonOptions.MULTIPLE_TABLE_ENABLE)) {
+ return fallbackParser.parse();
+ }
ClassLoader classLoader = new SeaTunnelChildFirstClassLoader(new
ArrayList<>());
Thread.currentThread().setContextClassLoader(classLoader);
// TODO: Support configuration transform
@@ -114,9 +119,18 @@ public class MultipleTableJobConfigParser {
for (Config sinkConfig : sinkConfigs) {
sinkActions.addAll(parserSink(sinkConfig, classLoader));
}
+ sinkActions.forEach(this::addCommonPluginJarsToAction);
+ jarUrls.addAll(commonPluginJars);
return new ImmutablePair<>(sinkActions, null);
}
+ void addCommonPluginJarsToAction(Action action) {
+ action.getJarUrls().addAll(commonPluginJars);
+ if (!action.getUpstream().isEmpty()) {
+ action.getUpstream().forEach(this::addCommonPluginJarsToAction);
+ }
+ }
+
private void fillJobConfig() {
jobConfig.getJobContext().setJobMode(envOptions.get(EnvCommonOptions.JOB_MODE));
if (StringUtils.isEmpty(jobConfig.getName())) {