This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new 5d2415117 [improve][DAG]  use multi-table config parser (#4176)
5d2415117 is described below

commit 5d24151172d40307f507d2df9678a81450a5a70f
Author: Zongwen Li <[email protected]>
AuthorDate: Tue Feb 21 14:45:07 2023 +0800

    [improve][DAG]  use multi-table config parser (#4176)
---
 .../apache/seatunnel/api/env/EnvCommonOptions.java | 18 ++++++++++++------
 .../engine/client/job/JobExecutionEnvironment.java |  6 +++---
 .../core/parse/MultipleTableJobConfigParser.java   | 22 ++++++++++++++++++----
 3 files changed, 33 insertions(+), 13 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index fa454684a..4b596e93f 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -23,33 +23,39 @@ import org.apache.seatunnel.common.constants.JobMode;
 
 import java.util.Map;
 
-public class EnvCommonOptions {
+public interface EnvCommonOptions {
 
-    public static final Option<String> JOB_NAME =
+    Option<String> JOB_NAME =
         Options.key("job.name")
             .stringType()
             .defaultValue("SeaTunnel_Job")
             .withDescription("The job name of this job");
 
-    public static final Option<JobMode> JOB_MODE =
+    Option<JobMode> JOB_MODE =
         Options.key("job.mode")
             .enumType(JobMode.class)
             .noDefaultValue()
             .withDescription("The job mode of this job, support Batch and 
Stream");
 
-    public static final Option<Long> CHECKPOINT_INTERVAL =
+    Option<Boolean> MULTIPLE_TABLE_ENABLE =
+        Options.key("multi-table.enable")
+            .booleanType()
+            .defaultValue(false)
+            .withDescription("Whether to enable parsing support for 
multi-table jobs");
+
+    Option<Long> CHECKPOINT_INTERVAL =
         Options.key("checkpoint.interval")
             .longType()
             .noDefaultValue()
             .withDescription("The interval (in milliseconds) between two 
consecutive checkpoints.");
 
-    public static final Option<String> JARS =
+    Option<String> JARS =
         Options.key("jars")
             .stringType()
             .noDefaultValue()
             .withDescription("third-party packages can be loaded via `jars`");
 
-    public static final Option<Map<String, String>> CUSTOM_PARAMETERS =
+    Option<Map<String, String>> CUSTOM_PARAMETERS =
         Options.key("custom_parameters")
             .mapType()
             .noDefaultValue()
diff --git 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
index 11ebbe631..35104d3da 100644
--- 
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
+++ 
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/JobExecutionEnvironment.java
@@ -29,7 +29,7 @@ import org.apache.seatunnel.engine.core.dag.actions.Action;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDag;
 import org.apache.seatunnel.engine.core.dag.logical.LogicalDagGenerator;
 import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
-import org.apache.seatunnel.engine.core.parse.JobConfigParser;
+import org.apache.seatunnel.engine.core.parse.MultipleTableJobConfigParser;
 
 import com.hazelcast.logging.ILogger;
 import com.hazelcast.logging.Logger;
@@ -127,8 +127,8 @@ public class JobExecutionEnvironment {
         return Collections.emptySet();
     }
 
-    private JobConfigParser getJobConfigParser() {
-        return new JobConfigParser(jobFilePath, idGenerator, jobConfig, 
commonPluginJars);
+    private MultipleTableJobConfigParser getJobConfigParser() {
+        return new MultipleTableJobConfigParser(jobFilePath, idGenerator, 
jobConfig, commonPluginJars);
     }
 
     private LogicalDagGenerator getLogicalDagGenerator() {
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index 8902b90a6..02bf2211a 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -61,11 +61,11 @@ import scala.Tuple2;
 public class MultipleTableJobConfigParser {
 
     private static final ILogger LOGGER = 
Logger.getLogger(JobConfigParser.class);
-    private final String jobDefineFilePath;
+
     private final IdGenerator idGenerator;
     private final JobConfig jobConfig;
 
-    private final List<URL> commonFactoryJars;
+    private final List<URL> commonPluginJars;
     private final Config seaTunnelJobConfig;
 
     private final ReadonlyConfig envOptions;
@@ -74,6 +74,8 @@ public class MultipleTableJobConfigParser {
 
     private final Set<URL> jarUrls;
 
+    private final JobConfigParser fallbackParser;
+
     public MultipleTableJobConfigParser(String jobDefineFilePath,
                                         IdGenerator idGenerator,
                                         JobConfig jobConfig) {
@@ -87,17 +89,20 @@ public class MultipleTableJobConfigParser {
                                         IdGenerator idGenerator,
                                         JobConfig jobConfig,
                                         List<URL> commonPluginJars) {
-        this.jobDefineFilePath = jobDefineFilePath;
         this.idGenerator = idGenerator;
         this.jobConfig = jobConfig;
-        this.commonFactoryJars = commonPluginJars;
+        this.commonPluginJars = commonPluginJars;
         this.seaTunnelJobConfig = 
ConfigBuilder.of(Paths.get(jobDefineFilePath));
         this.envOptions = 
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
         this.graph = new HashMap<>();
         this.jarUrls = new HashSet<>();
+        this.fallbackParser = new JobConfigParser(jobDefineFilePath, 
idGenerator, jobConfig, commonPluginJars);
     }
 
     public ImmutablePair<List<Action>, Set<URL>> parse() {
+        if (!envOptions.get(EnvCommonOptions.MULTIPLE_TABLE_ENABLE)) {
+            return fallbackParser.parse();
+        }
         ClassLoader classLoader = new SeaTunnelChildFirstClassLoader(new 
ArrayList<>());
         Thread.currentThread().setContextClassLoader(classLoader);
         // TODO: Support configuration transform
@@ -114,9 +119,18 @@ public class MultipleTableJobConfigParser {
         for (Config sinkConfig : sinkConfigs) {
             sinkActions.addAll(parserSink(sinkConfig, classLoader));
         }
+        sinkActions.forEach(this::addCommonPluginJarsToAction);
+        jarUrls.addAll(commonPluginJars);
         return new ImmutablePair<>(sinkActions, null);
     }
 
+    void addCommonPluginJarsToAction(Action action) {
+        action.getJarUrls().addAll(commonPluginJars);
+        if (!action.getUpstream().isEmpty()) {
+            action.getUpstream().forEach(this::addCommonPluginJarsToAction);
+        }
+    }
+
     private void fillJobConfig() {
         
jobConfig.getJobContext().setJobMode(envOptions.get(EnvCommonOptions.JOB_MODE));
         if (StringUtils.isEmpty(jobConfig.getName())) {

Reply via email to