This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new 6e336d0e4 [feature][dag] support sharding tables (#4205)
6e336d0e4 is described below
commit 6e336d0e4f7049911fb23a084c2e3f0495e226f5
Author: Zongwen Li <[email protected]>
AuthorDate: Fri Feb 24 14:51:46 2023 +0800
[feature][dag] support sharding tables (#4205)
---
.../apache/seatunnel/api/env/EnvCommonOptions.java | 8 ++--
.../org/apache/seatunnel/api/env/ParsingMode.java | 54 ++++++++++++++++++++++
.../core/parse/MultipleTableJobConfigParser.java | 41 ++++++++++------
3 files changed, 85 insertions(+), 18 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index 4b596e93f..8a96bf823 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -37,10 +37,10 @@ public interface EnvCommonOptions {
.noDefaultValue()
.withDescription("The job mode of this job, support Batch and
Stream");
- Option<Boolean> MULTIPLE_TABLE_ENABLE =
- Options.key("multi-table.enable")
- .booleanType()
- .defaultValue(false)
+ Option<ParsingMode> DAG_PARSING_MODE =
+ Options.key("dag-parsing.mode")
+ .enumType(ParsingMode.class)
+ .defaultValue(ParsingMode.SINGLENESS)
.withDescription("Whether to enable parsing support for
multi-table jobs");
Option<Long> CHECKPOINT_INTERVAL =
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/ParsingMode.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/ParsingMode.java
new file mode 100644
index 000000000..c2d6ad600
--- /dev/null
+++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/ParsingMode.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.env;
+
+import org.apache.seatunnel.api.table.catalog.Catalog;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+
+/**
+ * Multiple parsing modes for converting multi-{@link CatalogTable} retrieved
through the {@link Catalog} into DAG.
+ */
+public enum ParsingMode {
+ /**
+ * Each table is processed using a separate Source and Sink.
+ * <pre>
+ * customer -> source(customer) -> sink(customer)
+ * product -> source(product) -> sink(product)
+ * stock -> source(stock) -> sink(stock)
+ * </pre>
+ */
+ SINGLENESS,
+ /**
+ * Use a Source and Sink to process sharding-table.
+ * <pre>
+ * customer1
+ * customer2 --> customer\\d+ --> source(customer\\d+) -> sink(customer)
+ * customer3
+ * </pre>
+ */
+ SHARDING,
+ /**
+ * Multiple tables are processed using a single source, each table using a
separate sink.
+ * <pre>
+ * customer -> sink(customer)
+ * product --> source(.*) -> sink(product)
+ * stock -> sink(stock)
+ * </pre>
+ */
+ MULTIPLEX
+}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index c164416e3..6916cf870 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.core.parse;
import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.env.EnvCommonOptions;
+import org.apache.seatunnel.api.env.ParsingMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
@@ -79,10 +80,10 @@ public class MultipleTableJobConfigParser {
private final ReadonlyConfig envOptions;
- private final Map<String, List<Tuple2<CatalogTable, Action>>> graph;
-
private final JobConfigParser fallbackParser;
+ private final ParsingMode parsingMode;
+
public MultipleTableJobConfigParser(String jobDefineFilePath,
IdGenerator idGenerator,
JobConfig jobConfig) {
@@ -101,12 +102,12 @@ public class MultipleTableJobConfigParser {
this.commonPluginJars = commonPluginJars;
this.seaTunnelJobConfig =
ConfigBuilder.of(Paths.get(jobDefineFilePath));
this.envOptions =
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
- this.graph = new HashMap<>();
this.fallbackParser = new JobConfigParser(jobDefineFilePath,
idGenerator, jobConfig, commonPluginJars);
+ this.parsingMode = envOptions.get(EnvCommonOptions.DAG_PARSING_MODE);
}
public ImmutablePair<List<Action>, Set<URL>> parse() {
- if (!envOptions.get(EnvCommonOptions.MULTIPLE_TABLE_ENABLE)) {
+ if (parsingMode == ParsingMode.SINGLENESS) {
return fallbackParser.parse();
}
List<URL> connectorJars = new ArrayList<>();
@@ -124,12 +125,14 @@ public class MultipleTableJobConfigParser {
throw new JobDefineCheckException("Source And Sink can not be
null");
}
this.fillJobConfig();
+ Map<String, List<Tuple2<CatalogTable, Action>>> tableWithActionMap =
new HashMap<>();
for (Config sourceConfig : sourceConfigs) {
- parserSource(sourceConfig, classLoader);
+ Tuple2<String, List<Tuple2<CatalogTable, Action>>> tuple2 =
parserSource(sourceConfig, classLoader);
+ tableWithActionMap.put(tuple2._1(), tuple2._2());
}
List<Action> sinkActions = new ArrayList<>();
for (Config sinkConfig : sinkConfigs) {
- sinkActions.addAll(parserSink(sinkConfig, classLoader));
+ sinkActions.addAll(parserSink(sinkConfig, classLoader,
tableWithActionMap));
}
Set<URL> factoryUrls = getUsedFactoryUrls(sinkActions);
factoryUrls.addAll(commonPluginJars);
@@ -169,7 +172,7 @@ public class MultipleTableJobConfigParser {
.put(EnvCommonOptions.CHECKPOINT_INTERVAL.key(), interval));
}
- public void parserSource(Config sourceConfig, ClassLoader classLoader) {
+ public Tuple2<String, List<Tuple2<CatalogTable, Action>>>
parserSource(Config sourceConfig, ClassLoader classLoader) {
List<CatalogTable> catalogTables =
CatalogTableUtil.getCatalogTables(sourceConfig, classLoader);
if (catalogTables.isEmpty()) {
throw new JobDefineCheckException("The source needs catalog table,
please configure `catalog` or `schema` options.");
@@ -178,6 +181,9 @@ public class MultipleTableJobConfigParser {
String factoryId = getFactoryId(readonlyConfig);
String tableId =
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse("default");
+ if (parsingMode == ParsingMode.SHARDING) {
+ catalogTables = Collections.singletonList(catalogTables.get(0));
+ }
List<Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>,
List<CatalogTable>>> sources =
FactoryUtil.createAndPrepareSource(catalogTables, readonlyConfig,
classLoader, factoryId);
@@ -197,7 +203,7 @@ public class MultipleTableJobConfigParser {
actions.add(new Tuple2<>(catalogTable, action));
}
}
- graph.put(tableId, actions);
+ return new Tuple2<>(tableId, actions);
}
public static Optional<URL> getCatalogFactoryUrl(Config config,
ClassLoader classLoader) {
@@ -216,14 +222,16 @@ public class MultipleTableJobConfigParser {
.orElse(envOptions.get(CommonOptions.PARALLELISM)));
}
- public List<SinkAction<?, ?, ?, ?>> parserSink(Config sinkConfig,
ClassLoader classLoader) {
+ public List<SinkAction<?, ?, ?, ?>> parserSink(Config sinkConfig,
+ ClassLoader classLoader,
+ Map<String,
List<Tuple2<CatalogTable, Action>>> tableWithActionMap) {
Map<TablePath, CatalogTable> tableMap =
CatalogTableUtil.getCatalogTables(sinkConfig, classLoader)
.stream()
.collect(Collectors.toMap(catalogTable ->
catalogTable.getTableId().toTablePath(), catalogTable -> catalogTable));
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig);
String factoryId = getFactoryId(readonlyConfig);
String leftTableId =
readonlyConfig.getOptional(CommonOptions.SOURCE_TABLE_NAME).orElse("default");
- List<Tuple2<CatalogTable, Action>> tableTuples =
graph.get(leftTableId);
+ List<Tuple2<CatalogTable, Action>> tableTuples =
tableWithActionMap.get(leftTableId);
// get factory urls
Set<URL> factoryUrls = new HashSet<>();
@@ -235,10 +243,15 @@ public class MultipleTableJobConfigParser {
for (Tuple2<CatalogTable, Action> tableTuple : tableTuples) {
CatalogTable catalogTable = tableTuple._1();
Action leftAction = tableTuple._2();
- // TODO: another table full name map
- CatalogTable insteadTable =
tableMap.get(catalogTable.getTableId().toTablePath());
- if (insteadTable != null) {
- catalogTable = insteadTable;
+ Optional<CatalogTable> insteadTable;
+ if (parsingMode == ParsingMode.SHARDING) {
+ insteadTable = tableMap.values().stream().findFirst();
+ } else {
+ // TODO: another table full name map
+ insteadTable =
Optional.ofNullable(tableMap.get(catalogTable.getTableId().toTablePath()));
+ }
+ if (insteadTable.isPresent()) {
+ catalogTable = insteadTable.get();
}
SeaTunnelSink<?, ?, ?, ?> sink =
FactoryUtil.createAndPrepareSink(catalogTable, readonlyConfig, classLoader,
factoryId);
long id = idGenerator.getNextId();