This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this
push:
new c407ec5c2 [improve][api] add multi-table parser (#4175)
c407ec5c2 is described below
commit c407ec5c26c5a95d42e4e4f15484a01c5234ef54
Author: Zongwen Li <[email protected]>
AuthorDate: Tue Feb 21 10:41:56 2023 +0800
[improve][api] add multi-table parser (#4175)
---
.../apache/seatunnel/api/common/CommonOptions.java | 30 ++++
.../apache/seatunnel/api/env/EnvCommonOptions.java | 6 -
.../apache/seatunnel/api/env/EnvOptionRule.java | 3 +-
.../seatunnel/api/sink/SinkCommonOptions.java | 19 ---
.../seatunnel/api/source/SourceCommonOptions.java | 45 -----
.../api/table/catalog/CatalogTableUtil.java | 2 +-
.../seatunnel/api/table/factory/FactoryUtil.java | 18 +-
.../api/transform/TransformCommonOptions.java | 53 ------
.../flink/execution/SinkExecuteProcessor.java | 6 +-
.../flink/execution/SourceExecuteProcessor.java | 6 +-
.../flink/execution/SinkExecuteProcessor.java | 6 +-
.../flink/execution/SourceExecuteProcessor.java | 6 +-
.../spark/execution/SinkExecuteProcessor.java | 11 +-
.../spark/execution/SourceExecuteProcessor.java | 11 +-
.../spark/execution/SinkExecuteProcessor.java | 11 +-
.../spark/execution/SourceExecuteProcessor.java | 11 +-
.../engine/core/parse/JobConfigParser.java | 44 +++--
.../core/parse/MultipleTableJobConfigParser.java | 186 +++++++++++++++++++++
.../common/AbstractSeaTunnelTransform.java | 7 +-
.../spark/source/SeaTunnelSourceSupport.java | 6 +-
.../spark/source/SeaTunnelSourceTable.java | 5 +-
21 files changed, 290 insertions(+), 202 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/CommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/CommonOptions.java
index 18f46873f..66785e511 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/CommonOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/common/CommonOptions.java
@@ -32,4 +32,34 @@ public interface CommonOptions {
.stringType()
.noDefaultValue()
.withDescription("Name of the SPI plugin class.");
+
+ Option<String> RESULT_TABLE_NAME =
+ Options.key("result_table_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "When result_table_name is not specified, " +
+ "the data processed by this plugin will not be registered
as a data set (dataStream/dataset) " +
+ "that can be directly accessed by other plugins, or called
a temporary table (table)" +
+ "When result_table_name is specified, " +
+ "the data processed by this plugin will be registered as a
data set (dataStream/dataset) " +
+ "that can be directly accessed by other plugins, or called
a temporary table (table) . " +
+ "The data set (dataStream/dataset) registered here can be
directly accessed by other plugins " +
+ "by specifying source_table_name .");
+
+ Option<String> SOURCE_TABLE_NAME =
+ Options.key("source_table_name")
+ .stringType()
+ .noDefaultValue()
+ .withDescription(
+ "When source_table_name is not specified, " +
+ "the current plug-in processes the data set dataset output
by the previous plugin in the configuration file. " +
+ "When source_table_name is specified, the current plug-in
is processing the data set corresponding to this parameter.");
+
+ Option<Integer> PARALLELISM =
+ Options.key("parallelism")
+ .intType()
+ .defaultValue(1)
+ .withDescription("When parallelism is not specified, the
parallelism in env is used by default. " +
+ "When parallelism is specified, it will override the
parallelism in env.");
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
index c8b398c6f..fa454684a 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvCommonOptions.java
@@ -24,12 +24,6 @@ import org.apache.seatunnel.common.constants.JobMode;
import java.util.Map;
public class EnvCommonOptions {
- public static final Option<Integer> PARALLELISM =
- Options.key("parallelism")
- .intType()
- .defaultValue(1)
- .withDescription("When parallelism is not specified in connector,
the parallelism in env is used by default. " +
- "When parallelism is specified, it will override the
parallelism in env.");
public static final Option<String> JOB_NAME =
Options.key("job.name")
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
index bc40247b3..ad93970bf 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/env/EnvOptionRule.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.api.env;
+import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.configuration.util.OptionRule;
public class EnvOptionRule {
@@ -25,7 +26,7 @@ public class EnvOptionRule {
return OptionRule.builder()
.required(EnvCommonOptions.JOB_MODE)
.optional(EnvCommonOptions.JOB_NAME,
- EnvCommonOptions.PARALLELISM,
+ CommonOptions.PARALLELISM,
EnvCommonOptions.JARS,
EnvCommonOptions.CHECKPOINT_INTERVAL,
EnvCommonOptions.CUSTOM_PARAMETERS)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
index cdab3566e..4bf320b49 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkCommonOptions.java
@@ -17,26 +17,7 @@
package org.apache.seatunnel.api.sink;
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-
public class SinkCommonOptions {
public static final String DATA_SAVE_MODE = "save_mode";
-
- public static final Option<String> SOURCE_TABLE_NAME =
- Options.key("source_table_name")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "When source_table_name is not specified, " +
- "the current plug-in processes the data set dataset output
by the previous plugin in the configuration file. " +
- "When source_table_name is specified, the current plug-in
is processing the data set corresponding to this parameter.");
-
- public static final Option<Integer> PARALLELISM =
- Options.key("parallelism")
- .intType()
- .defaultValue(1)
- .withDescription("When parallelism is not specified, the
parallelism in env is used by default. " +
- "When parallelism is specified, it will override the
parallelism in env.");
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceCommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceCommonOptions.java
deleted file mode 100644
index 6a54d88cf..000000000
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SourceCommonOptions.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.source;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-
-public class SourceCommonOptions {
-
- public static final Option<String> RESULT_TABLE_NAME =
- Options.key("result_table_name")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "When result_table_name is not specified, " +
- "the data processed by this plugin will not be registered
as a data set (dataStream/dataset) " +
- "that can be directly accessed by other plugins, or called
a temporary table (table)" +
- "When result_table_name is specified, " +
- "the data processed by this plugin will be registered as a
data set (dataStream/dataset) " +
- "that can be directly accessed by other plugins, or called
a temporary table (table) . " +
- "The data set (dataStream/dataset) registered here can be
directly accessed by other plugins " +
- "by specifying source_table_name .");
-
- public static final Option<Integer> PARALLELISM =
- Options.key("parallelism")
- .intType()
- .defaultValue(1)
- .withDescription("When parallelism is not specified, the
parallelism in env is used by default. " +
- "When parallelism is specified, it will override the
parallelism in env.");
-}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index d4ed86d4e..344cda1f0 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -69,7 +69,7 @@ public class CatalogTableUtil implements Serializable {
this.catalogTable = catalogTable;
}
- private static List<CatalogTable> getCatalogTables(Config config,
ClassLoader classLoader) {
+ public static List<CatalogTable> getCatalogTables(Config config,
ClassLoader classLoader) {
ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(config);
Map<String, String> catalogOptions =
readonlyConfig.getOptional(CatalogOptions.CATALOG_OPTIONS).orElse(new
HashMap<>());
// TODO: fallback key
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index 5da8f3c48..17a4c4fd9 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.api.table.factory;
+import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
@@ -26,7 +27,6 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceCommonOptions;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.Catalog;
@@ -47,6 +47,8 @@ import java.util.ServiceConfigurationError;
import java.util.ServiceLoader;
import java.util.stream.Collectors;
+import scala.Tuple2;
+
/**
* Use SPI to create {@link TableSourceFactory}, {@link TableSinkFactory} and
{@link CatalogFactory}.
*/
@@ -54,7 +56,7 @@ public final class FactoryUtil {
private static final Logger LOG =
LoggerFactory.getLogger(FactoryUtil.class);
- public static <T, SplitT extends SourceSplit, StateT extends Serializable>
List<SeaTunnelSource<T, SplitT, StateT>> createAndPrepareSource(
+ public static <T, SplitT extends SourceSplit, StateT extends Serializable>
List<Tuple2<SeaTunnelSource<T, SplitT, StateT>, List<CatalogTable>>>
createAndPrepareSource(
List<CatalogTable> multipleTables,
ReadonlyConfig options,
ClassLoader classLoader,
@@ -62,20 +64,20 @@ public final class FactoryUtil {
try {
final TableSourceFactory factory = discoverFactory(classLoader,
TableSourceFactory.class, factoryIdentifier);
- List<SeaTunnelSource<T, SplitT, StateT>> sources = new
ArrayList<>(multipleTables.size());
+ List<Tuple2<SeaTunnelSource<T, SplitT, StateT>,
List<CatalogTable>>> sources = new ArrayList<>(multipleTables.size());
if (factory instanceof SupportMultipleTable) {
List<CatalogTable> remainingTables = multipleTables;
while (!remainingTables.isEmpty()) {
TableFactoryContext context = new
TableFactoryContext(remainingTables, options, classLoader);
SupportMultipleTable.Result result =
((SupportMultipleTable) factory).applyTables(context);
List<CatalogTable> acceptedTables =
result.getAcceptedTables();
- sources.add(createAndPrepareSource(factory,
acceptedTables, options, classLoader));
+ sources.add(new Tuple2<>(createAndPrepareSource(factory,
acceptedTables, options, classLoader), acceptedTables));
remainingTables = result.getRemainingTables();
}
} else {
for (CatalogTable catalogTable : multipleTables) {
List<CatalogTable> acceptedTables =
Collections.singletonList(catalogTable);
- sources.add(createAndPrepareSource(factory,
acceptedTables, options, classLoader));
+ sources.add(new Tuple2<>(createAndPrepareSource(factory,
acceptedTables, options, classLoader), acceptedTables));
}
}
return sources;
@@ -87,7 +89,7 @@ public final class FactoryUtil {
}
}
- public static <T, SplitT extends SourceSplit, StateT extends Serializable>
SeaTunnelSource<T, SplitT, StateT> createAndPrepareSource(
+ private static <T, SplitT extends SourceSplit, StateT extends
Serializable> SeaTunnelSource<T, SplitT, StateT> createAndPrepareSource(
TableSourceFactory factory,
List<CatalogTable> acceptedTables,
ReadonlyConfig options,
@@ -104,8 +106,8 @@ public final class FactoryUtil {
public static <IN, StateT, CommitInfoT, AggregatedCommitInfoT>
SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
createAndPrepareSink(
CatalogTable catalogTable,
- ClassLoader classLoader,
ReadonlyConfig options,
+ ClassLoader classLoader,
String factoryIdentifier) {
try {
TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
factory = discoverFactory(classLoader, TableSinkFactory.class,
factoryIdentifier);
@@ -208,7 +210,7 @@ public final class FactoryUtil {
// TODO: Implement SupportParallelism in the TableSourceFactory
instead of the SeaTunnelSource
|| SupportParallelism.class.isAssignableFrom(sourceClass)) {
OptionRule sourceCommonOptionRule =
-
OptionRule.builder().optional(SourceCommonOptions.PARALLELISM).build();
+
OptionRule.builder().optional(CommonOptions.PARALLELISM).build();
sourceOptionRule.getOptionalOptions().addAll(sourceCommonOptionRule.getOptionalOptions());
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/TransformCommonOptions.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/TransformCommonOptions.java
deleted file mode 100644
index ec277d308..000000000
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/TransformCommonOptions.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.transform;
-
-import org.apache.seatunnel.api.configuration.Option;
-import org.apache.seatunnel.api.configuration.Options;
-
-public class TransformCommonOptions {
- public static final Option<String> RESULT_TABLE_NAME =
- Options.key("result_table_name")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "When result_table_name is not specified, " +
- "the data processed by this plugin will not be registered
as a data set (dataStream/dataset) " +
- "that can be directly accessed by other plugins, or called
a temporary table (table)" +
- "When result_table_name is specified, " +
- "the data processed by this plugin will be registered as a
data set (dataStream/dataset) " +
- "that can be directly accessed by other plugins, or called
a temporary table (table) . " +
- "The data set (dataStream/dataset) registered here can be
directly accessed by other plugins " +
- "by specifying source_table_name .");
-
- public static final Option<String> SOURCE_TABLE_NAME =
- Options.key("source_table_name")
- .stringType()
- .noDefaultValue()
- .withDescription(
- "When source_table_name is not specified, " +
- "the current plug-in processes the data set dataset output
by the previous plugin in the configuration file. " +
- "When source_table_name is specified, the current plug-in
is processing the data set corresponding to this parameter.");
-
- public static final Option<Integer> PARALLELISM =
- Options.key("parallelism")
- .intType()
- .noDefaultValue()
- .withDescription("When parallelism is not specified, the
parallelism in env is used by default. " +
- "When parallelism is specified, it will override the
parallelism in env.");
-}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 6ca5a541c..00c0bb0b2 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -17,11 +17,11 @@
package org.apache.seatunnel.core.starter.flink.execution;
+import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
-import org.apache.seatunnel.api.source.SourceCommonOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.core.starter.enums.PluginType;
@@ -88,8 +88,8 @@ public class SinkExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor
saveModeSink.handleSaveMode(dataSaveMode);
}
DataStreamSink<Row> dataStreamSink = stream.sinkTo(new
FlinkSink<>(seaTunnelSink)).name(seaTunnelSink.getPluginName());
- if (sinkConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
- int parallelism =
sinkConfig.getInt(SourceCommonOptions.PARALLELISM.key());
+ if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
+ int parallelism =
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
dataStreamSink.setParallelism(parallelism);
}
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 8dc555ade..90a995f6e 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-13-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -19,9 +19,9 @@ package org.apache.seatunnel.core.starter.flink.execution;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceCommonOptions;
import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.starter.enums.PluginType;
@@ -73,8 +73,8 @@ public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<
"SeaTunnel " + internalSource.getClass().getSimpleName(),
internalSource.getBoundedness() ==
org.apache.seatunnel.api.source.Boundedness.BOUNDED);
Config pluginConfig = pluginConfigs.get(i);
- if (pluginConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
- int parallelism =
pluginConfig.getInt(SourceCommonOptions.PARALLELISM.key());
+ if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
+ int parallelism =
pluginConfig.getInt(CommonOptions.PARALLELISM.key());
sourceStream.setParallelism(parallelism);
}
registerResultTable(pluginConfig, sourceStream);
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 001f95db7..af328a976 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -17,11 +17,11 @@
package org.apache.seatunnel.core.starter.flink.execution;
+import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
-import org.apache.seatunnel.api.source.SourceCommonOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.core.starter.enums.PluginType;
@@ -90,8 +90,8 @@ public class SinkExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor
}
DataStreamSink<Row> dataStreamSink =
stream.sinkTo(SinkV1Adapter.wrap(new FlinkSink<>(seaTunnelSink)))
.name(seaTunnelSink.getPluginName());
- if (sinkConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
- int parallelism =
sinkConfig.getInt(SourceCommonOptions.PARALLELISM.key());
+ if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
+ int parallelism =
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
dataStreamSink.setParallelism(parallelism);
}
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
index 8dc555ade..90a995f6e 100644
---
a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-15-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SourceExecuteProcessor.java
@@ -19,9 +19,9 @@ package org.apache.seatunnel.core.starter.flink.execution;
import static org.apache.flink.util.Preconditions.checkNotNull;
+import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceCommonOptions;
import org.apache.seatunnel.api.source.SupportCoordinate;
import org.apache.seatunnel.common.constants.JobMode;
import org.apache.seatunnel.core.starter.enums.PluginType;
@@ -73,8 +73,8 @@ public class SourceExecuteProcessor extends
FlinkAbstractPluginExecuteProcessor<
"SeaTunnel " + internalSource.getClass().getSimpleName(),
internalSource.getBoundedness() ==
org.apache.seatunnel.api.source.Boundedness.BOUNDED);
Config pluginConfig = pluginConfigs.get(i);
- if (pluginConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
- int parallelism =
pluginConfig.getInt(SourceCommonOptions.PARALLELISM.key());
+ if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
+ int parallelism =
pluginConfig.getInt(CommonOptions.PARALLELISM.key());
sourceStream.setParallelism(parallelism);
}
registerResultTable(pluginConfig, sourceStream);
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index fb7b25e14..af74dea4e 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -17,11 +17,10 @@
package org.apache.seatunnel.core.starter.spark.execution;
+import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.core.starter.enums.PluginType;
@@ -82,13 +81,13 @@ public class SinkExecuteProcessor extends
SparkAbstractPluginExecuteProcessor<Se
SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = plugins.get(i);
Dataset<Row> dataset = fromSourceTable(sinkConfig,
sparkRuntimeEnvironment).orElse(input);
int parallelism;
- if (sinkConfig.hasPath(SinkCommonOptions.PARALLELISM.key())) {
- parallelism =
sinkConfig.getInt(SinkCommonOptions.PARALLELISM.key());
+ if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
+ parallelism =
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
} else {
parallelism = sparkRuntimeEnvironment.getSparkConf()
- .getInt(EnvCommonOptions.PARALLELISM.key(),
EnvCommonOptions.PARALLELISM.defaultValue());
+ .getInt(CommonOptions.PARALLELISM.key(),
CommonOptions.PARALLELISM.defaultValue());
}
-
dataset.sparkSession().read().option(SinkCommonOptions.PARALLELISM.key(),
parallelism);
+
dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(),
parallelism);
// TODO modify checkpoint location
seaTunnelSink.setTypeInfo((SeaTunnelRowType)
TypeConverterUtils.convert(dataset.schema()));
if
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index 4ca60dada..c4af663ed 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-2-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -17,10 +17,9 @@
package org.apache.seatunnel.core.starter.spark.execution;
+import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceCommonOptions;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -56,16 +55,16 @@ public class SourceExecuteProcessor extends
SparkAbstractPluginExecuteProcessor<
SeaTunnelSource<?, ?, ?> source = plugins.get(i);
Config pluginConfig = pluginConfigs.get(i);
int parallelism;
- if (pluginConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
- parallelism =
pluginConfig.getInt(SourceCommonOptions.PARALLELISM.key());
+ if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
+ parallelism =
pluginConfig.getInt(CommonOptions.PARALLELISM.key());
} else {
parallelism = sparkRuntimeEnvironment.getSparkConf()
- .getInt(EnvCommonOptions.PARALLELISM.key(),
EnvCommonOptions.PARALLELISM.defaultValue());
+ .getInt(CommonOptions.PARALLELISM.key(),
CommonOptions.PARALLELISM.defaultValue());
}
Dataset<Row> dataset = sparkRuntimeEnvironment.getSparkSession()
.read()
.format(SeaTunnelSource.class.getSimpleName())
- .option(SourceCommonOptions.PARALLELISM.key(), parallelism)
+ .option(CommonOptions.PARALLELISM.key(), parallelism)
.option(Constants.SOURCE_SERIALIZATION,
SerializationUtils.objectToString(source))
.schema((StructType)
TypeConverterUtils.convert(source.getProducedType())).load();
sources.add(dataset);
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 854b14936..e5c3d6e34 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -17,11 +17,10 @@
package org.apache.seatunnel.core.starter.spark.execution;
+import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.core.starter.enums.PluginType;
@@ -83,13 +82,13 @@ public class SinkExecuteProcessor extends
SparkAbstractPluginExecuteProcessor<Se
SeaTunnelSink<?, ?, ?, ?> seaTunnelSink = plugins.get(i);
Dataset<Row> dataset = fromSourceTable(sinkConfig,
sparkRuntimeEnvironment).orElse(input);
int parallelism;
- if (sinkConfig.hasPath(SinkCommonOptions.PARALLELISM.key())) {
- parallelism =
sinkConfig.getInt(SinkCommonOptions.PARALLELISM.key());
+ if (sinkConfig.hasPath(CommonOptions.PARALLELISM.key())) {
+ parallelism =
sinkConfig.getInt(CommonOptions.PARALLELISM.key());
} else {
parallelism = sparkRuntimeEnvironment.getSparkConf()
- .getInt(EnvCommonOptions.PARALLELISM.key(),
EnvCommonOptions.PARALLELISM.defaultValue());
+ .getInt(CommonOptions.PARALLELISM.key(),
CommonOptions.PARALLELISM.defaultValue());
}
-
dataset.sparkSession().read().option(SinkCommonOptions.PARALLELISM.key(),
parallelism);
+
dataset.sparkSession().read().option(CommonOptions.PARALLELISM.key(),
parallelism);
// TODO modify checkpoint location
seaTunnelSink.setTypeInfo((SeaTunnelRowType)
TypeConverterUtils.convert(dataset.schema()));
if
(SupportDataSaveMode.class.isAssignableFrom(seaTunnelSink.getClass())) {
diff --git
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
index 4ca60dada..c4af663ed 100644
---
a/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/seatunnel-spark-3-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SourceExecuteProcessor.java
@@ -17,10 +17,9 @@
package org.apache.seatunnel.core.starter.spark.execution;
+import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceCommonOptions;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.utils.SerializationUtils;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -56,16 +55,16 @@ public class SourceExecuteProcessor extends
SparkAbstractPluginExecuteProcessor<
SeaTunnelSource<?, ?, ?> source = plugins.get(i);
Config pluginConfig = pluginConfigs.get(i);
int parallelism;
- if (pluginConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
- parallelism =
pluginConfig.getInt(SourceCommonOptions.PARALLELISM.key());
+ if (pluginConfig.hasPath(CommonOptions.PARALLELISM.key())) {
+ parallelism =
pluginConfig.getInt(CommonOptions.PARALLELISM.key());
} else {
parallelism = sparkRuntimeEnvironment.getSparkConf()
- .getInt(EnvCommonOptions.PARALLELISM.key(),
EnvCommonOptions.PARALLELISM.defaultValue());
+ .getInt(CommonOptions.PARALLELISM.key(),
CommonOptions.PARALLELISM.defaultValue());
}
Dataset<Row> dataset = sparkRuntimeEnvironment.getSparkSession()
.read()
.format(SeaTunnelSource.class.getSimpleName())
- .option(SourceCommonOptions.PARALLELISM.key(), parallelism)
+ .option(CommonOptions.PARALLELISM.key(), parallelism)
.option(Constants.SOURCE_SERIALIZATION,
SerializationUtils.objectToString(source))
.schema((StructType)
TypeConverterUtils.convert(source.getProducedType())).load();
sources.add(dataset);
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
index a322bd374..f1ffca0a1 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java
@@ -17,19 +17,17 @@
package org.apache.seatunnel.engine.core.parse;
+import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkCommonOptions;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceCommonOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.PartitionSeaTunnelTransform;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
-import org.apache.seatunnel.api.transform.TransformCommonOptions;
import org.apache.seatunnel.common.config.TypesafeConfigUtils;
import org.apache.seatunnel.common.constants.CollectionConstants;
import org.apache.seatunnel.common.constants.JobMode;
@@ -142,7 +140,7 @@ public class JobConfigParser {
}
}
- private void jobConfigAnalyze(@NonNull Config envConfigs) {
+ void jobConfigAnalyze(@NonNull Config envConfigs) {
if (envConfigs.hasPath(EnvCommonOptions.JOB_MODE.key())) {
jobConfig.getJobContext().setJobMode(envConfigs.getEnum(JobMode.class,
EnvCommonOptions.JOB_MODE.key()));
} else {
@@ -186,11 +184,11 @@ public class JobConfigParser {
sinkListImmutablePair.getLeft(),
sinkListImmutablePair.getRight());
actions.add(sinkAction);
- if (!config.hasPath(SinkCommonOptions.SOURCE_TABLE_NAME.key())) {
- throw new
JobDefineCheckException(SinkCommonOptions.SOURCE_TABLE_NAME
+ if (!config.hasPath(CommonOptions.SOURCE_TABLE_NAME.key())) {
+ throw new
JobDefineCheckException(CommonOptions.SOURCE_TABLE_NAME
+ " must be set in the sink plugin config when the job
have complex dependencies");
}
- String sourceTableName =
config.getString(SinkCommonOptions.SOURCE_TABLE_NAME.key());
+ String sourceTableName =
config.getString(CommonOptions.SOURCE_TABLE_NAME.key());
List<Config> transformConfigList =
transformResultTableNameMap.get(sourceTableName);
SeaTunnelDataType<?> dataType;
if (CollectionUtils.isEmpty(transformConfigList)) {
@@ -261,7 +259,7 @@ public class JobConfigParser {
action.addUpstream(transformAction);
SeaTunnelDataType dataType =
-
transformAnalyze(config.getString(SinkCommonOptions.SOURCE_TABLE_NAME.key()),
+
transformAnalyze(config.getString(CommonOptions.SOURCE_TABLE_NAME.key()),
transformAction);
transformListImmutablePair.getLeft().setTypeInfo(dataType);
dataTypeResult =
transformListImmutablePair.getLeft().getProducedType();
@@ -274,27 +272,27 @@ public class JobConfigParser {
private void initRelationMap(List<? extends Config> sourceConfigs, List<?
extends Config> transformConfigs) {
for (Config config : sourceConfigs) {
- if (!config.hasPath(SourceCommonOptions.RESULT_TABLE_NAME.key())) {
- throw new
JobDefineCheckException(SourceCommonOptions.RESULT_TABLE_NAME.key()
+ if (!config.hasPath(CommonOptions.RESULT_TABLE_NAME.key())) {
+ throw new
JobDefineCheckException(CommonOptions.RESULT_TABLE_NAME.key()
+ " must be set in the source plugin config when the job
have complex dependencies");
}
- String resultTableName =
config.getString(SourceCommonOptions.RESULT_TABLE_NAME.key());
+ String resultTableName =
config.getString(CommonOptions.RESULT_TABLE_NAME.key());
sourceResultTableNameMap.computeIfAbsent(resultTableName, k -> new
ArrayList<>());
sourceResultTableNameMap.get(resultTableName).add(config);
}
for (Config config : transformConfigs) {
- if (!config.hasPath(SourceCommonOptions.RESULT_TABLE_NAME.key())) {
- throw new
JobDefineCheckException(SourceCommonOptions.RESULT_TABLE_NAME.key()
+ if (!config.hasPath(CommonOptions.RESULT_TABLE_NAME.key())) {
+ throw new
JobDefineCheckException(CommonOptions.RESULT_TABLE_NAME.key()
+ " must be set in the transform plugin config when the
job have complex dependencies");
}
- if (!config.hasPath(SinkCommonOptions.SOURCE_TABLE_NAME.key())) {
- throw new
JobDefineCheckException(SinkCommonOptions.SOURCE_TABLE_NAME.key()
+ if (!config.hasPath(CommonOptions.SOURCE_TABLE_NAME.key())) {
+ throw new
JobDefineCheckException(CommonOptions.SOURCE_TABLE_NAME.key()
+ " must be set in the transform plugin config when the
job have complex dependencies");
}
- String resultTableName =
config.getString(SourceCommonOptions.RESULT_TABLE_NAME.key());
- String sourceTableName =
config.getString(SinkCommonOptions.SOURCE_TABLE_NAME.key());
+ String resultTableName =
config.getString(CommonOptions.RESULT_TABLE_NAME.key());
+ String sourceTableName =
config.getString(CommonOptions.SOURCE_TABLE_NAME.key());
if (Objects.equals(sourceTableName, resultTableName)) {
throw new JobDefineCheckException(String.format(
"Source{%s} and result{%s} table name cannot be equals",
sourceTableName, resultTableName));
@@ -375,10 +373,10 @@ public class JobConfigParser {
private void initTransformParallelism(List<? extends Config>
transformConfigs, Action upstreamAction,
SeaTunnelTransform
seaTunnelTransform, TransformAction transformAction) {
if (seaTunnelTransform instanceof PartitionSeaTunnelTransform
- &&
transformConfigs.get(0).hasPath(TransformCommonOptions.PARALLELISM.key())) {
+ &&
transformConfigs.get(0).hasPath(CommonOptions.PARALLELISM.key())) {
transformAction.setParallelism(transformConfigs
.get(0)
- .getInt(TransformCommonOptions.PARALLELISM.key()));
+ .getInt(CommonOptions.PARALLELISM.key()));
} else {
// If transform type is not RePartitionTransform, Using the
parallelism of its upstream operators.
transformAction.setParallelism(upstreamAction.getParallelism());
@@ -386,13 +384,13 @@ public class JobConfigParser {
}
private int getSourceParallelism(Config sourceConfig) {
- if (sourceConfig.hasPath(SourceCommonOptions.PARALLELISM.key())) {
- int sourceParallelism =
sourceConfig.getInt(SourceCommonOptions.PARALLELISM.key());
+ if (sourceConfig.hasPath(CommonOptions.PARALLELISM.key())) {
+ int sourceParallelism =
sourceConfig.getInt(CommonOptions.PARALLELISM.key());
return Math.max(sourceParallelism, 1);
}
int executionParallelism = 0;
- if (envConfigs.hasPath(EnvCommonOptions.PARALLELISM.key())) {
- executionParallelism =
envConfigs.getInt(EnvCommonOptions.PARALLELISM.key());
+ if (envConfigs.hasPath(CommonOptions.PARALLELISM.key())) {
+ executionParallelism =
envConfigs.getInt(CommonOptions.PARALLELISM.key());
}
return Math.max(executionParallelism, 1);
}
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
new file mode 100644
index 000000000..8902b90a6
--- /dev/null
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.engine.core.parse;
+
+import org.apache.seatunnel.api.common.CommonOptions;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.env.EnvCommonOptions;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.factory.FactoryUtil;
+import org.apache.seatunnel.core.starter.utils.ConfigBuilder;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.exception.JobDefineCheckException;
+import
org.apache.seatunnel.engine.common.loader.SeaTunnelChildFirstClassLoader;
+import org.apache.seatunnel.engine.common.utils.IdGenerator;
+import org.apache.seatunnel.engine.core.dag.actions.Action;
+import org.apache.seatunnel.engine.core.dag.actions.SinkAction;
+import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.hazelcast.logging.ILogger;
+import com.hazelcast.logging.Logger;
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+
+import java.io.Serializable;
+import java.net.URL;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import scala.Tuple2;
+
+public class MultipleTableJobConfigParser {
+
+ private static final ILogger LOGGER =
Logger.getLogger(JobConfigParser.class);
+ private final String jobDefineFilePath;
+ private final IdGenerator idGenerator;
+ private final JobConfig jobConfig;
+
+ private final List<URL> commonFactoryJars;
+ private final Config seaTunnelJobConfig;
+
+ private final ReadonlyConfig envOptions;
+
+ private final Map<String, List<Tuple2<CatalogTable, Action>>> graph;
+
+ private final Set<URL> jarUrls;
+
+ public MultipleTableJobConfigParser(String jobDefineFilePath,
+ IdGenerator idGenerator,
+ JobConfig jobConfig) {
+ this(jobDefineFilePath,
+ idGenerator,
+ jobConfig,
+ Collections.emptyList());
+ }
+
+ public MultipleTableJobConfigParser(String jobDefineFilePath,
+ IdGenerator idGenerator,
+ JobConfig jobConfig,
+ List<URL> commonPluginJars) {
+ this.jobDefineFilePath = jobDefineFilePath;
+ this.idGenerator = idGenerator;
+ this.jobConfig = jobConfig;
+ this.commonFactoryJars = commonPluginJars;
+ this.seaTunnelJobConfig =
ConfigBuilder.of(Paths.get(jobDefineFilePath));
+ this.envOptions =
ReadonlyConfig.fromConfig(seaTunnelJobConfig.getConfig("env"));
+ this.graph = new HashMap<>();
+ this.jarUrls = new HashSet<>();
+ }
+
+ public ImmutablePair<List<Action>, Set<URL>> parse() {
+ ClassLoader classLoader = new SeaTunnelChildFirstClassLoader(new
ArrayList<>());
+ Thread.currentThread().setContextClassLoader(classLoader);
+ // TODO: Support configuration transform
+ List<? extends Config> sourceConfigs =
seaTunnelJobConfig.getConfigList("source");
+ List<? extends Config> sinkConfigs =
seaTunnelJobConfig.getConfigList("sink");
+ if (CollectionUtils.isEmpty(sourceConfigs) ||
CollectionUtils.isEmpty(sinkConfigs)) {
+ throw new JobDefineCheckException("Source And Sink can not be
null");
+ }
+ this.fillJobConfig();
+ for (Config sourceConfig : sourceConfigs) {
+ parserSource(sourceConfig, classLoader);
+ }
+ List<Action> sinkActions = new ArrayList<>();
+ for (Config sinkConfig : sinkConfigs) {
+ sinkActions.addAll(parserSink(sinkConfig, classLoader));
+ }
+ return new ImmutablePair<>(sinkActions, null);
+ }
+
+ private void fillJobConfig() {
+
jobConfig.getJobContext().setJobMode(envOptions.get(EnvCommonOptions.JOB_MODE));
+ if (StringUtils.isEmpty(jobConfig.getName())) {
+ jobConfig.setName(envOptions.get(EnvCommonOptions.JOB_NAME));
+ }
+ envOptions.getOptional(EnvCommonOptions.CHECKPOINT_INTERVAL)
+ .ifPresent(interval -> jobConfig.getEnvOptions()
+ .put(EnvCommonOptions.CHECKPOINT_INTERVAL.key(), interval));
+ }
+
+ public void parserSource(Config sourceConfig, ClassLoader classLoader) {
+ List<CatalogTable> catalogTables =
CatalogTableUtil.getCatalogTables(sourceConfig, classLoader);
+ ReadonlyConfig readonlyConfig =
ReadonlyConfig.fromConfig(sourceConfig);
+ String factoryId =
readonlyConfig.getOptional(CommonOptions.FACTORY_ID).orElse(readonlyConfig.get(CommonOptions.PLUGIN_NAME));
+ String tableId =
readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse("default");
+
+ List<Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>,
List<CatalogTable>>> sources =
+ FactoryUtil.createAndPrepareSource(catalogTables, readonlyConfig,
classLoader, factoryId);
+ // TODO: get factory jar
+ Set<URL> factoryUrls = new HashSet<>();
+ List<Tuple2<CatalogTable, Action>> actions = new ArrayList<>();
+ int parallelism = getParallelism(readonlyConfig);
+ for (Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>,
List<CatalogTable>> tuple2 : sources) {
+ long id = idGenerator.getNextId();
+ SourceAction<Object, SourceSplit, Serializable> action = new
SourceAction<>(id, factoryId, tuple2._1(), factoryUrls);
+ action.setParallelism(parallelism);
+ for (CatalogTable catalogTable : tuple2._2()) {
+ actions.add(new Tuple2<>(catalogTable, action));
+ }
+ }
+ graph.put(tableId, actions);
+ }
+
+ private int getParallelism(ReadonlyConfig config) {
+ return Math.max(1,
+ config.getOptional(CommonOptions.PARALLELISM)
+ .orElse(envOptions.get(CommonOptions.PARALLELISM)));
+ }
+
+ public List<SinkAction<?, ?, ?, ?>> parserSink(Config sinkConfig,
ClassLoader classLoader) {
+ Map<TablePath, CatalogTable> tableMap =
CatalogTableUtil.getCatalogTables(sinkConfig, classLoader)
+ .stream()
+ .collect(Collectors.toMap(catalogTable ->
catalogTable.getTableId().toTablePath(), catalogTable -> catalogTable));
+ ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(sinkConfig);
+ String factoryId =
readonlyConfig.getOptional(CommonOptions.FACTORY_ID).orElse(readonlyConfig.get(CommonOptions.PLUGIN_NAME));
+ String leftTableId =
readonlyConfig.getOptional(CommonOptions.SOURCE_TABLE_NAME).orElse("default");
+ List<Tuple2<CatalogTable, Action>> tableTuples =
graph.get(leftTableId);
+ // TODO: get factory jar
+ Set<URL> factoryUrls = new HashSet<>();
+ List<SinkAction<?, ?, ?, ?>> sinkActions = new ArrayList<>();
+ for (Tuple2<CatalogTable, Action> tableTuple : tableTuples) {
+ CatalogTable catalogTable = tableTuple._1();
+ Action leftAction = tableTuple._2();
+ // TODO: another table full name map
+ CatalogTable insteadTable =
tableMap.get(catalogTable.getTableId().toTablePath());
+ if (insteadTable != null) {
+ catalogTable = insteadTable;
+ }
+ SeaTunnelSink<?, ?, ?, ?> sink =
FactoryUtil.createAndPrepareSink(catalogTable, readonlyConfig, classLoader,
factoryId);
+ long id = idGenerator.getNextId();
+ SinkAction<?, ?, ?, ?> sinkAction = new SinkAction<>(id,
factoryId, Collections.singletonList(leftAction), sink, factoryUrls);
+ sinkAction.setParallelism(leftAction.getParallelism());
+ sinkActions.add(sinkAction);
+ }
+ return sinkActions;
+ }
+}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
index d0de07266..b6183aeaa 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java
@@ -17,9 +17,8 @@
package org.apache.seatunnel.transform.common;
+import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.sink.SinkCommonOptions;
-import org.apache.seatunnel.api.source.SourceCommonOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -31,8 +30,8 @@ import java.util.Objects;
public abstract class AbstractSeaTunnelTransform implements
SeaTunnelTransform<SeaTunnelRow> {
- private static final String RESULT_TABLE_NAME =
SourceCommonOptions.RESULT_TABLE_NAME.key();
- private static final String SOURCE_TABLE_NAME =
SinkCommonOptions.SOURCE_TABLE_NAME.key();
+ private static final String RESULT_TABLE_NAME =
CommonOptions.RESULT_TABLE_NAME.key();
+ private static final String SOURCE_TABLE_NAME =
CommonOptions.SOURCE_TABLE_NAME.key();
private String inputTableName;
private SeaTunnelRowType inputRowType;
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
index ef9ae092b..cb9cb75d0 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-2.4/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceSupport.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.translation.spark.source;
+import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceCommonOptions;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.Constants;
import org.apache.seatunnel.common.utils.SerializationUtils;
@@ -60,14 +60,14 @@ public class SeaTunnelSourceSupport implements
DataSourceV2, ReadSupport, MicroB
@Override
public DataSourceReader createReader(DataSourceOptions options) {
SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource =
getSeaTunnelSource(options);
- int parallelism =
options.getInt(SourceCommonOptions.PARALLELISM.key(), 1);
+ int parallelism = options.getInt(CommonOptions.PARALLELISM.key(), 1);
return new BatchSourceReader(seaTunnelSource, parallelism);
}
@Override
public MicroBatchReader createMicroBatchReader(Optional<StructType>
rowTypeOptional, String checkpointLocation, DataSourceOptions options) {
SeaTunnelSource<SeaTunnelRow, ?, ?> seaTunnelSource =
getSeaTunnelSource(options);
- Integer parallelism =
options.getInt(SourceCommonOptions.PARALLELISM.key(), 1);
+ Integer parallelism = options.getInt(CommonOptions.PARALLELISM.key(),
1);
Integer checkpointInterval =
options.getInt(Constants.CHECKPOINT_INTERVAL, CHECKPOINT_INTERVAL_DEFAULT);
String checkpointPath = StringUtils.replacePattern(checkpointLocation,
"sources/\\d+", "sources-state");
Configuration configuration =
SparkSession.getActiveSession().get().sparkContext().hadoopConfiguration();
diff --git
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceTable.java
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceTable.java
index 17395b368..d7cc7da09 100644
---
a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceTable.java
+++
b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-3.3/src/main/java/org/apache/seatunnel/translation/spark/source/SeaTunnelSourceTable.java
@@ -17,8 +17,7 @@
package org.apache.seatunnel.translation.spark.source;
-import static org.apache.seatunnel.api.source.SourceCommonOptions.PARALLELISM;
-
+import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.Constants;
@@ -65,7 +64,7 @@ public class SeaTunnelSourceTable implements Table,
SupportsRead {
*/
@Override
public ScanBuilder newScanBuilder(CaseInsensitiveStringMap
caseInsensitiveStringMap) {
- int parallelism =
Integer.parseInt(properties.getOrDefault(PARALLELISM.key(), "1"));
+ int parallelism =
Integer.parseInt(properties.getOrDefault(CommonOptions.PARALLELISM.key(), "1"));
return new SeaTunnelScanBuilder(source, parallelism,
caseInsensitiveStringMap);
}