This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new 6e336d0e4 [feature][dag] support sharding tables (#4205)
6e336d0e4 is described below

commit 6e336d0e4f7049911fb23a084c2e3f0495e226f5
Author: Zongwen Li <[email protected]>
AuthorDate: Fri Feb 24 14:51:46 2023 +0800

    [feature][dag] support sharding tables (#4205)
---
 .../apache/seatunnel/api/env/EnvCommonOptions.java |  8 ++--
 .../org/apache/seatunnel/api/env/ParsingMode.java  | 54 ++++++++++++++++++++++
 .../core/parse/MultipleTableJobConfigParser.java   | 41 ++++++++++------
 3 files changed, 85 insertions(+), 18 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index 4b596e93f..8a96bf823 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -37,10 +37,10 @@ public interface EnvCommonOptions {
             .noDefaultValue()
             .withDescription("The job mode of this job, support Batch and 
Stream");
 
-    Option<Boolean> MULTIPLE_TABLE_ENABLE =
-        Options.key("multi-table.enable")
-            .booleanType()
-            .defaultValue(false)
+    Option<ParsingMode> DAG_PARSING_MODE =
+        Options.key("dag-parsing.mode")
+            .enumType(ParsingMode.class)
+            .defaultValue(ParsingMode.SINGLENESS)
             .withDescription("Whether to enable parsing support for 
multi-table jobs");
 
     Option<Long> CHECKPOINT_INTERVAL =
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/ParsingMode.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/ParsingMode.java
new file mode 100644
index 000000000..c2d6ad600
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/ParsingMode.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.env;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+
+/**
+ * Multiple parsing modes for converting multi-{@link CatalogTable} retrieved 
through the {@link Catalog} into DAG.
+ */
+public enum ParsingMode {
+    /**
+     * Each table is processed using a separate Source and Sink.
+     * <pre>
+     * customer -> source(customer) -> sink(customer)
+     * product  -> source(product)  -> sink(product)
+     * stock    -> source(stock)    -> sink(stock)
+     * </pre>
+     */
+    SINGLENESS,
+    /**
+     * Use a Source and Sink to process sharding-table.
+     * <pre>
+     * customer1
+     * customer2 --> customer\\d+ --> source(customer\\d+) -> sink(customer)
+     * customer3
+     * </pre>
+     */
+    SHARDING,
+    /**
+     * Multiple tables are processed using a single source, each table using a 
separate sink.
+     * <pre>
+     * customer                   -> sink(customer)
+     * product   --> source(.*)   -> sink(product)
+     * stock                      -> sink(stock)
+     * </pre>
+     */
+    MULTIPLEX
+}
diff --git 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index c164416e3..6916cf870 100644
--- 
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++ 
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.core.parse;
 import org.apache.seatunnel.api.common.CommonOptions;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.env.EnvCommonOptions;
+import org.apache.seatunnel.api.env.ParsingMode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceSplit;
@@ -79,10 +80,10 @@ public class MultipleTableJobConfigParser {
 
     private final ReadonlyConfig envOptions;
 
-    private final Map<String, List<Tuple2<CatalogTable, Action>>> graph;
-
     private final JobConfigParser fallbackParser;
 
+    private final ParsingMode parsingMode;
+
     public MultipleTableJobConfigParser(String jobDefineFilePath,
                                         IdGenerator idGenerator,
                                         JobConfig jobConfig) {
@@ -101,12 +102,12 @@ public class MultipleTableJobConfigParser {
         this.commonPluginJars = commonPluginJars;
         this.seaTunnelJobConfig = 
ConfigBuilder.of(Paths.get(jobDefineFilePath));
         this.envOptions = 
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
-        this.graph = new HashMap<>();
         this.fallbackParser = new JobConfigParser(jobDefineFilePath, 
idGenerator, jobConfig, commonPluginJars);
+        this.parsingMode = envOptions.get(EnvCommonOptions.DAG_PARSING_MODE);
     }
 
     public ImmutablePair<List<Action>, Set<URL>> parse() {
-        if (!envOptions.get(EnvCommonOptions.MULTIPLE_TABLE_ENABLE)) {
+        if (parsingMode == ParsingMode.SINGLENESS) {
             return fallbackParser.parse();
         }
         List<URL> connectorJars = new ArrayList<>();
@@ -124,12 +125,14 @@ public class MultipleTableJobConfigParser {
             throw new JobDefineCheckException("Source And Sink can not be 
null");
         }
         this.fillJobConfig();
+        Map<String, List<Tuple2<CatalogTable, Action>>> tableWithActionMap = 
new HashMap<>();
         for (Config sourceConfig : sourceConfigs) {
-            parserSource(sourceConfig, classLoader);
+            Tuple2<String, List<Tuple2<CatalogTable, Action>>> tuple2 = 
parserSource(sourceConfig, classLoader);
+            tableWithActionMap.put(tuple2._1(), tuple2._2());
         }
         List<Action> sinkActions = new ArrayList<>();
         for (Config sinkConfig : sinkConfigs) {
-            sinkActions.addAll(parserSink(sinkConfig, classLoader));
+            sinkActions.addAll(parserSink(sinkConfig, classLoader, 
tableWithActionMap));
         }
         Set<URL> factoryUrls = getUsedFactoryUrls(sinkActions);
         factoryUrls.addAll(commonPluginJars);
@@ -169,7 +172,7 @@ public class MultipleTableJobConfigParser {
                 .put(EnvCommonOptions.CHECKPOINT_INTERVAL.key(), interval));
     }
 
-    public void parserSource(Config sourceConfig, ClassLoader classLoader) {
+    public Tuple2<String, List<Tuple2<CatalogTable, Action>>> 
parserSource(Config sourceConfig, ClassLoader classLoader) {
         List<CatalogTable> catalogTables = 
CatalogTableUtil.getCatalogTables(sourceConfig, classLoader);
         if (catalogTables.isEmpty()) {
             throw new JobDefineCheckException("The source needs catalog table, 
please configure `catalog` or `schema` options.");
@@ -178,6 +181,9 @@ public class MultipleTableJobConfigParser {
         String factoryId = getFactoryId(readonlyConfig);
         String tableId = 
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse("default");
 
+        if (parsingMode == ParsingMode.SHARDING) {
+            catalogTables = Collections.singletonList(catalogTables.get(0));
+        }
         List<Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>, 
List<CatalogTable>>> sources =
             FactoryUtil.createAndPrepareSource(catalogTables, readonlyConfig, 
classLoader, factoryId);
 
@@ -197,7 +203,7 @@ public class MultipleTableJobConfigParser {
                 actions.add(new Tuple2<>(catalogTable, action));
             }
         }
-        graph.put(tableId, actions);
+        return new Tuple2<>(tableId, actions);
     }
 
     public static Optional<URL> getCatalogFactoryUrl(Config config, 
ClassLoader classLoader) {
@@ -216,14 +222,16 @@ public class MultipleTableJobConfigParser {
                 .orElse(envOptions.get(CommonOptions.PARALLELISM)));
     }
 
-    public List<SinkAction<?, ?, ?, ?>> parserSink(Config sinkConfig, 
ClassLoader classLoader) {
+    public List<SinkAction<?, ?, ?, ?>> parserSink(Config sinkConfig,
+                                                   ClassLoader classLoader,
+                                                   Map<String, 
List<Tuple2<CatalogTable, Action>>> tableWithActionMap) {
         Map<TablePath, CatalogTable> tableMap = 
CatalogTableUtil.getCatalogTables(sinkConfig, classLoader)
             .stream()
             .collect(Collectors.toMap(catalogTable -> 
catalogTable.getTableId().toTablePath(), catalogTable -> catalogTable));
         ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig);
         String factoryId = getFactoryId(readonlyConfig);
         String leftTableId = 
readonlyConfig.getOptional(CommonOptions.SOURCE_TABLE_NAME).orElse("default");
-        List<Tuple2<CatalogTable, Action>> tableTuples = 
graph.get(leftTableId);
+        List<Tuple2<CatalogTable, Action>> tableTuples = 
tableWithActionMap.get(leftTableId);
 
         // get factory urls
         Set<URL> factoryUrls = new HashSet<>();
@@ -235,10 +243,15 @@ public class MultipleTableJobConfigParser {
         for (Tuple2<CatalogTable, Action> tableTuple : tableTuples) {
             CatalogTable catalogTable = tableTuple._1();
             Action leftAction = tableTuple._2();
-            // TODO: another table full name map
-            CatalogTable insteadTable = 
tableMap.get(catalogTable.getTableId().toTablePath());
-            if (insteadTable != null) {
-                catalogTable = insteadTable;
+            Optional<CatalogTable> insteadTable;
+            if (parsingMode == ParsingMode.SHARDING) {
+                insteadTable = tableMap.values().stream().findFirst();
+            } else {
+                // TODO: another table full name map
+                insteadTable = 
Optional.ofNullable(tableMap.get(catalogTable.getTableId().toTablePath()));
+            }
+            if (insteadTable.isPresent()) {
+                catalogTable = insteadTable.get();
             }
             SeaTunnelSink<?, ?, ?, ?> sink = 
FactoryUtil.createAndPrepareSink(catalogTable, readonlyConfig, classLoader, 
factoryId);
             long id = idGenerator.getNextId();

Reply via email to