This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 41173357f8 [Improve] Refactor CatalogTable and add
`SeaTunnelSource::getProducedCatalogTables` (#5562)
41173357f8 is described below
commit 41173357f847f7bc72119b3e0ebd5019f94a4c68
Author: Jia Fan <[email protected]>
AuthorDate: Thu Sep 28 16:55:42 2023 +0800
[Improve] Refactor CatalogTable and add
`SeaTunnelSource::getProducedCatalogTables` (#5562)
---
.../seatunnel/api/source/SeaTunnelSource.java | 18 ++++-
.../api/table/catalog/CatalogTableUtil.java | 23 +++++-
.../seatunnel/api/table/factory/FactoryUtil.java | 92 ++++++++++++----------
.../api/table/factory/SupportMultipleTable.java | 56 -------------
.../api/table/factory/TableFactoryContext.java | 33 +-------
.../api/table/factory/TableSinkFactory.java | 2 +-
...rmFactory.java => TableSinkFactoryContext.java} | 28 +++----
.../api/table/factory/TableSourceFactory.java | 2 +-
...Factory.java => TableSourceFactoryContext.java} | 23 ++----
.../api/table/factory/TableTransformFactory.java | 2 +-
...text.java => TableTransformFactoryContext.java} | 30 +------
.../seatunnel/api/table/type/SeaTunnelRow.java | 4 +-
.../cdc/base/source/IncrementalSource.java | 13 ++-
.../cdc/mongodb/MongodbIncrementalSource.java | 8 +-
.../mongodb/MongodbIncrementalSourceFactory.java | 41 +++-------
.../cdc/mysql/source/MySqlIncrementalSource.java | 7 +-
.../source/MySqlIncrementalSourceFactory.java | 39 +++------
.../source/source/SqlServerIncrementalSource.java | 8 +-
.../source/SqlServerIncrementalSourceFactory.java | 38 +++------
.../seatunnel/console/sink/ConsoleSinkFactory.java | 4 +-
.../sqlserver/SqlServerDataTypeConvertor.java | 1 +
.../seatunnel/jdbc/sink/JdbcSinkFactory.java | 4 +-
.../seatunnel/jdbc/source/JdbcSourceFactory.java | 70 ++++++++++++----
.../seatunnel/kafka/sink/KafkaSinkFactory.java | 4 +-
.../starrocks/sink/StarRocksSinkFactory.java | 4 +-
.../dag/actions/ShuffleMultipleRowStrategy.java | 10 +--
.../core/parse/MultipleTableJobConfigParser.java | 67 +++++-----------
.../dag/execution/ExecutionPlanGenerator.java | 14 ++--
.../server/dag/physical/PhysicalPlanGenerator.java | 7 --
.../engine/server/task/SourceSeaTunnelTask.java | 14 +++-
.../transform/copy/CopyFieldTransformFactory.java | 7 +-
.../fieldmapper/FieldMapperTransformFactory.java | 6 +-
.../filter/FilterFieldTransformFactory.java | 6 +-
.../FilterRowKindTransformFactory.java | 6 +-
.../transform/replace/ReplaceTransformFactory.java | 6 +-
.../transform/split/SplitTransformFactory.java | 7 +-
.../transform/sql/SQLTransformFactory.java | 6 +-
37 files changed, 316 insertions(+), 394 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
index 0535de68b8..924c2e5244 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/source/SeaTunnelSource.java
@@ -21,9 +21,11 @@ import
org.apache.seatunnel.api.common.PluginIdentifierInterface;
import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle;
import org.apache.seatunnel.api.serialization.DefaultSerializer;
import org.apache.seatunnel.api.serialization.Serializer;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import java.io.Serializable;
+import java.util.List;
/**
* The interface for Source. It acts like a factory class that helps construct
the {@link
@@ -49,9 +51,23 @@ public interface SeaTunnelSource<T, SplitT extends
SourceSplit, StateT extends S
/**
* Get the data type of the records produced by this source.
*
+ * @deprecated Please use {@link #getProducedCatalogTables}
* @return SeaTunnel data type.
*/
- SeaTunnelDataType<T> getProducedType();
+ @Deprecated
+ default SeaTunnelDataType<T> getProducedType() {
+ throw new UnsupportedOperationException("getProducedType method has
not been implemented.");
+ }
+
+ /**
+ * Get the catalog tables output by this source, It is recommended that
all connectors implement
+ * this method instead of {@link #getProducedType}. CatalogTable contains
more information to
+ * help downstream support more accurate and complete synchronization
capabilities.
+ */
+ default List<CatalogTable> getProducedCatalogTables() {
+ throw new UnsupportedOperationException(
+ "getProducedCatalogTables method has not been implemented.");
+ }
/**
* Create source reader, used to produce data.
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index a9b921ce5b..def005eeb6 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -33,8 +33,10 @@ import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.utils.JsonUtils;
@@ -138,9 +140,14 @@ public class CatalogTableUtil implements Serializable {
@Deprecated
public static List<CatalogTable> getCatalogTablesFromConfig(
ReadonlyConfig readonlyConfig, ClassLoader classLoader) {
-
// We use plugin_name as factoryId, so MySQL-CDC should be MySQL
String factoryId =
readonlyConfig.get(CommonOptions.PLUGIN_NAME).replace("-CDC", "");
+ return getCatalogTablesFromConfig(factoryId, readonlyConfig,
classLoader);
+ }
+
+ @Deprecated
+ public static List<CatalogTable> getCatalogTablesFromConfig(
+ String factoryId, ReadonlyConfig readonlyConfig, ClassLoader
classLoader) {
// Highest priority: specified schema
Map<String, String> schemaMap =
readonlyConfig.get(CatalogTableUtil.SCHEMA);
if (schemaMap != null) {
@@ -188,6 +195,20 @@ public class CatalogTableUtil implements Serializable {
return buildWithConfig(readonlyConfig);
}
+ public static SeaTunnelDataType<SeaTunnelRow> convertToDataType(
+ List<CatalogTable> catalogTables) {
+ if (catalogTables.size() == 1) {
+ return
catalogTables.get(0).getTableSchema().toPhysicalRowDataType();
+ } else {
+ Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
+ for (CatalogTable catalogTable : catalogTables) {
+ String tableId =
catalogTable.getTableId().toTablePath().toString();
+ rowTypeMap.put(tableId,
catalogTable.getTableSchema().toPhysicalRowDataType());
+ }
+ return new MultipleRowType(rowTypeMap);
+ }
+ }
+
public static CatalogTable buildWithConfig(ReadonlyConfig readonlyConfig) {
if (readonlyConfig.get(SCHEMA) == null) {
throw new RuntimeException(
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
index f309002699..48ed785c39 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/FactoryUtil.java
@@ -21,13 +21,20 @@ import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.ConfigValidator;
import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.env.ParsingMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceOptions;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.source.SupportParallelism;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSource;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.slf4j.Logger;
@@ -55,10 +62,11 @@ public final class FactoryUtil {
private static final Logger LOG =
LoggerFactory.getLogger(FactoryUtil.class);
+ static final String DEFAULT_ID = "default-identifier";
+
public static <T, SplitT extends SourceSplit, StateT extends Serializable>
List<Tuple2<SeaTunnelSource<T, SplitT, StateT>,
List<CatalogTable>>>
createAndPrepareSource(
- List<CatalogTable> multipleTables,
ReadonlyConfig options,
ClassLoader classLoader,
String factoryIdentifier) {
@@ -67,32 +75,44 @@ public final class FactoryUtil {
final TableSourceFactory factory =
discoverFactory(classLoader, TableSourceFactory.class,
factoryIdentifier);
List<Tuple2<SeaTunnelSource<T, SplitT, StateT>,
List<CatalogTable>>> sources =
- new ArrayList<>(multipleTables.size());
- if (factory instanceof SupportMultipleTable) {
- List<CatalogTable> remainingTables = multipleTables;
- while (!remainingTables.isEmpty()) {
- TableFactoryContext context =
- new TableFactoryContext(remainingTables, options,
classLoader);
- SupportMultipleTable.Result result =
- ((SupportMultipleTable)
factory).applyTables(context);
- List<CatalogTable> acceptedTables =
result.getAcceptedTables();
- sources.add(
- new Tuple2<>(
- createAndPrepareSource(
- factory, acceptedTables, options,
classLoader),
- acceptedTables));
- remainingTables = result.getRemainingTables();
- }
- } else {
- for (CatalogTable catalogTable : multipleTables) {
- List<CatalogTable> acceptedTables =
Collections.singletonList(catalogTable);
- sources.add(
- new Tuple2<>(
- createAndPrepareSource(
- factory, acceptedTables, options,
classLoader),
- acceptedTables));
+ new ArrayList<>();
+ SeaTunnelSource<T, SplitT, StateT> source =
+ createAndPrepareSource(factory, options, classLoader);
+ List<CatalogTable> catalogTables;
+ try {
+ catalogTables = source.getProducedCatalogTables();
+ } catch (UnsupportedOperationException e) {
+ // TODO remove it when all connector use
`getProducedCatalogTables`
+ SeaTunnelDataType<T> seaTunnelDataType =
source.getProducedType();
+ final String tableId =
+
options.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse(DEFAULT_ID);
+ if (seaTunnelDataType instanceof MultipleRowType) {
+ catalogTables = new ArrayList<>();
+ for (String id : ((MultipleRowType)
seaTunnelDataType).getTableIds()) {
+ catalogTables.add(
+ CatalogTableUtil.getCatalogTable(
+ id, ((MultipleRowType)
seaTunnelDataType).getRowType(id)));
+ }
+ } else {
+ catalogTables =
+ Collections.singletonList(
+ CatalogTableUtil.getCatalogTable(
+ tableId, (SeaTunnelRowType)
seaTunnelDataType));
}
}
+ LOG.info(
+ "get the CatalogTable from source {}: {}",
+ source.getPluginName(),
+ catalogTables.stream()
+ .map(CatalogTable::getTableId)
+ .map(TableIdentifier::toString)
+ .collect(Collectors.joining(",")));
+ if (options.get(SourceOptions.DAG_PARSING_MODE) ==
ParsingMode.SHARDING) {
+ CatalogTable catalogTable = catalogTables.get(0);
+ catalogTables.clear();
+ catalogTables.add(catalogTable);
+ }
+ sources.add(new Tuple2<>(source, catalogTables));
return sources;
} catch (Throwable t) {
throw new FactoryException(
@@ -104,22 +124,13 @@ public final class FactoryUtil {
private static <T, SplitT extends SourceSplit, StateT extends Serializable>
SeaTunnelSource<T, SplitT, StateT> createAndPrepareSource(
- TableSourceFactory factory,
- List<CatalogTable> acceptedTables,
- ReadonlyConfig options,
- ClassLoader classLoader) {
- TableFactoryContext context = new TableFactoryContext(acceptedTables,
options, classLoader);
+ TableSourceFactory factory, ReadonlyConfig options,
ClassLoader classLoader) {
+ TableSourceFactoryContext context = new
TableSourceFactoryContext(options, classLoader);
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
TableSource<T, SplitT, StateT> tableSource =
factory.createSource(context);
- validateAndApplyMetadata(acceptedTables, tableSource);
return tableSource.createSource();
}
- private static void validateAndApplyMetadata(
- List<CatalogTable> catalogTables, TableSource<?, ?, ?>
tableSource) {
- // TODO: handle reading metadata
- }
-
public static <IN, StateT, CommitInfoT, AggregatedCommitInfoT>
SeaTunnelSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
createAndPrepareSink(
CatalogTable catalogTable,
@@ -129,9 +140,8 @@ public final class FactoryUtil {
try {
TableSinkFactory<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
factory =
discoverFactory(classLoader, TableSinkFactory.class,
factoryIdentifier);
- TableFactoryContext context =
- new TableFactoryContext(
- Collections.singletonList(catalogTable), options,
classLoader);
+ TableSinkFactoryContext context =
+ new TableSinkFactoryContext(catalogTable, options,
classLoader);
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
return factory.createSink(context).createSink();
} catch (Throwable t) {
@@ -293,8 +303,8 @@ public final class FactoryUtil {
String factoryIdentifier) {
final TableTransformFactory factory =
discoverFactory(classLoader, TableTransformFactory.class,
factoryIdentifier);
- TableFactoryContext context =
- new TableFactoryContext(
+ TableTransformFactoryContext context =
+ new TableTransformFactoryContext(
Collections.singletonList(catalogTable), options,
classLoader);
ConfigValidator.of(context.getOptions()).validate(factory.optionRule());
return factory.createTransform(context).createTransform();
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java
deleted file mode 100644
index a48fd96f74..0000000000
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/SupportMultipleTable.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.api.table.factory;
-
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
-
-import java.util.List;
-
-/**
- * Used to declare that the connector can handle data from multiple tables.
- *
- * <p>The expansion of the {@link TableSourceFactory}.
- */
-public interface SupportMultipleTable {
-
- /** A connector can pick tables and return the accepted and remaining
tables. */
- Result applyTables(TableFactoryContext context);
-
- final class Result {
- private final List<CatalogTable> acceptedTables;
- private final List<CatalogTable> remainingTables;
-
- private Result(List<CatalogTable> acceptedTables, List<CatalogTable>
remainingTables) {
- this.acceptedTables = acceptedTables;
- this.remainingTables = remainingTables;
- }
-
- public static Result of(
- List<CatalogTable> acceptedTables, List<CatalogTable>
remainingTables) {
- return new Result(acceptedTables, remainingTables);
- }
-
- public List<CatalogTable> getAcceptedTables() {
- return acceptedTables;
- }
-
- public List<CatalogTable> getRemainingTables() {
- return remainingTables;
- }
- }
-}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
index 2fda5fc064..10436da09b 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
@@ -18,42 +18,17 @@
package org.apache.seatunnel.api.table.factory;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.table.catalog.CatalogTable;
import lombok.Getter;
-import java.util.List;
+@Getter
+public abstract class TableFactoryContext {
-public class TableFactoryContext {
-
- private final List<CatalogTable> catalogTables;
- @Getter private final ReadonlyConfig options;
+ private final ReadonlyConfig options;
private final ClassLoader classLoader;
- public TableFactoryContext(
- List<CatalogTable> catalogTables, ReadonlyConfig options,
ClassLoader classLoader) {
- this.catalogTables = catalogTables;
+ public TableFactoryContext(ReadonlyConfig options, ClassLoader
classLoader) {
this.options = options;
this.classLoader = classLoader;
}
-
- public ClassLoader getClassLoader() {
- return this.classLoader;
- }
-
- /**
- * Returns a list of tables that need to be processed.
- *
- * <p>By default, return only single table.
- *
- * <p>If you need multiple tables, implement {@link SupportMultipleTable}.
- */
- public List<CatalogTable> getCatalogTables() {
- return catalogTables;
- }
-
- /** @return single table. */
- public CatalogTable getCatalogTable() {
- return catalogTables.get(0);
- }
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
index f0015fa58d..2fca039e7d 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactory.java
@@ -37,7 +37,7 @@ public interface TableSinkFactory<IN, StateT, CommitInfoT,
AggregatedCommitInfoT
* @return
*/
default TableSink<IN, StateT, CommitInfoT, AggregatedCommitInfoT>
createSink(
- TableFactoryContext context) {
+ TableSinkFactoryContext context) {
throw new UnsupportedOperationException(
"The Factory has not been implemented and the deprecated
Plugin will be used.");
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java
similarity index 55%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java
index 33caf328d6..f579adc416 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSinkFactoryContext.java
@@ -17,23 +17,19 @@
package org.apache.seatunnel.api.table.factory;
-import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
-/**
- * This is an SPI interface, used to create {@link
- * org.apache.seatunnel.api.table.connector.TableTransform}. Each plugin need
to have it own
- * implementation.
- */
-public interface TableTransformFactory extends Factory {
+import lombok.Getter;
+
+@Getter
+public class TableSinkFactoryContext extends TableFactoryContext {
+
+ private final CatalogTable catalogTable;
- /**
- * We will never use this method now. So gave a default implement and
return null.
- *
- * @param context TableFactoryContext
- * @return
- */
- default <T> TableTransform<T> createTransform(TableFactoryContext context)
{
- throw new UnsupportedOperationException(
- "The Factory has not been implemented and the deprecated
Plugin will be used.");
+ public TableSinkFactoryContext(
+ CatalogTable catalogTable, ReadonlyConfig options, ClassLoader
classLoader) {
+ super(options, classLoader);
+ this.catalogTable = catalogTable;
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
index 30f70efdea..132d904958 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactory.java
@@ -35,7 +35,7 @@ public interface TableSourceFactory extends Factory {
* @param context TableFactoryContext
*/
default <T, SplitT extends SourceSplit, StateT extends Serializable>
- TableSource<T, SplitT, StateT> createSource(TableFactoryContext
context) {
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
throw new UnsupportedOperationException(
"The Factory has not been implemented and the deprecated
Plugin will be used.");
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactoryContext.java
similarity index 55%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactoryContext.java
index 33caf328d6..41b2b39c6e 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableSourceFactoryContext.java
@@ -17,23 +17,14 @@
package org.apache.seatunnel.api.table.factory;
-import org.apache.seatunnel.api.table.connector.TableTransform;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-/**
- * This is an SPI interface, used to create {@link
- * org.apache.seatunnel.api.table.connector.TableTransform}. Each plugin need
to have it own
- * implementation.
- */
-public interface TableTransformFactory extends Factory {
+import lombok.Getter;
+
+@Getter
+public class TableSourceFactoryContext extends TableFactoryContext {
- /**
- * We will never use this method now. So gave a default implement and
return null.
- *
- * @param context TableFactoryContext
- * @return
- */
- default <T> TableTransform<T> createTransform(TableFactoryContext context)
{
- throw new UnsupportedOperationException(
- "The Factory has not been implemented and the deprecated
Plugin will be used.");
+ public TableSourceFactoryContext(ReadonlyConfig options, ClassLoader
classLoader) {
+ super(options, classLoader);
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
index 33caf328d6..46c6cfa56f 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactory.java
@@ -32,7 +32,7 @@ public interface TableTransformFactory extends Factory {
* @param context TableFactoryContext
* @return
*/
- default <T> TableTransform<T> createTransform(TableFactoryContext context)
{
+ default <T> TableTransform<T> createTransform(TableTransformFactoryContext
context) {
throw new UnsupportedOperationException(
"The Factory has not been implemented and the deprecated
Plugin will be used.");
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactoryContext.java
similarity index 62%
copy from
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
copy to
seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactoryContext.java
index 2fda5fc064..bf8176c7a8 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableFactoryContext.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/factory/TableTransformFactoryContext.java
@@ -24,36 +24,14 @@ import lombok.Getter;
import java.util.List;
-public class TableFactoryContext {
+@Getter
+public class TableTransformFactoryContext extends TableFactoryContext {
private final List<CatalogTable> catalogTables;
- @Getter private final ReadonlyConfig options;
- private final ClassLoader classLoader;
- public TableFactoryContext(
+ public TableTransformFactoryContext(
List<CatalogTable> catalogTables, ReadonlyConfig options,
ClassLoader classLoader) {
+ super(options, classLoader);
this.catalogTables = catalogTables;
- this.options = options;
- this.classLoader = classLoader;
- }
-
- public ClassLoader getClassLoader() {
- return this.classLoader;
- }
-
- /**
- * Returns a list of tables that need to be processed.
- *
- * <p>By default, return only single table.
- *
- * <p>If you need multiple tables, implement {@link SupportMultipleTable}.
- */
- public List<CatalogTable> getCatalogTables() {
- return catalogTables;
- }
-
- /** @return single table. */
- public CatalogTable getCatalogTable() {
- return catalogTables.get(0);
}
}
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index bd05e0808d..299026c407 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -17,8 +17,6 @@
package org.apache.seatunnel.api.table.type;
-import org.apache.seatunnel.api.table.factory.SupportMultipleTable;
-
import java.io.Serializable;
import java.util.Arrays;
import java.util.Map;
@@ -27,7 +25,7 @@ import java.util.Objects;
/** SeaTunnel row type. */
public final class SeaTunnelRow implements Serializable {
private static final long serialVersionUID = -1L;
- /** Table identifier, used for the source connector that {@link
SupportMultipleTable}. */
+ /** Table identifier. */
private String tableId = "";
/** The kind of change that a row describes in a changelog. */
private RowKind kind = RowKind.INSERT;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index ed04fb0f5d..ff13fc30b1 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -28,6 +28,7 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
import org.apache.seatunnel.api.source.SupportCoordinate;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
@@ -88,14 +89,19 @@ public abstract class IncrementalSource<T, C extends
SourceConfig>
protected int incrementalParallelism;
protected StopConfig stopConfig;
+ protected List<CatalogTable> catalogTables;
protected StopMode stopMode;
protected DebeziumDeserializationSchema<T> deserializationSchema;
protected SeaTunnelDataType<SeaTunnelRow> dataType;
- protected IncrementalSource(ReadonlyConfig options,
SeaTunnelDataType<SeaTunnelRow> dataType) {
+ protected IncrementalSource(
+ ReadonlyConfig options,
+ SeaTunnelDataType<SeaTunnelRow> dataType,
+ List<CatalogTable> catalogTables) {
this.dataType = dataType;
+ this.catalogTables = catalogTables;
this.readonlyConfig = options;
this.startupConfig = getStartupConfig(readonlyConfig);
this.stopConfig = getStopConfig(readonlyConfig);
@@ -137,6 +143,11 @@ public abstract class IncrementalSource<T, C extends
SourceConfig>
config.get(SourceOptions.STOP_TIMESTAMP));
}
+ @Override
+ public List<CatalogTable> getProducedCatalogTables() {
+ return catalogTables;
+ }
+
public abstract Option<StartupMode> getStartupModeOption();
public abstract Option<StopMode> getStopModeOption();
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
index 41191cfa52..edcbdea9a9 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSource.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
@@ -43,6 +44,7 @@ import lombok.NoArgsConstructor;
import javax.annotation.Nonnull;
+import java.util.List;
import java.util.Optional;
@NoArgsConstructor
@@ -53,8 +55,10 @@ public class MongodbIncrementalSource<T> extends
IncrementalSource<T, MongodbSou
static final String IDENTIFIER = "MongoDB-CDC";
public MongodbIncrementalSource(
- ReadonlyConfig options, SeaTunnelDataType<SeaTunnelRow> dataType) {
- super(options, dataType);
+ ReadonlyConfig options,
+ SeaTunnelDataType<SeaTunnelRow> dataType,
+ List<CatalogTable> catalogTables) {
+ super(options, dataType, catalogTables);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
index 6215afb74e..7b816ed3eb 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
@@ -21,28 +21,22 @@ import
org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.SupportMultipleTable;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions;
import com.google.auto.service.AutoService;
-import javax.annotation.Nonnull;
-
import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.List;
@AutoService(Factory.class)
-public class MongodbIncrementalSourceFactory implements TableSourceFactory,
SupportMultipleTable {
+public class MongodbIncrementalSourceFactory implements TableSourceFactory {
@Override
public String factoryIdentifier() {
return MongodbIncrementalSource.IDENTIFIER;
@@ -77,28 +71,15 @@ public class MongodbIncrementalSourceFactory implements
TableSourceFactory, Supp
@SuppressWarnings("unchecked")
@Override
public <T, SplitT extends SourceSplit, StateT extends Serializable>
- TableSource<T, SplitT, StateT> createSource(TableFactoryContext
context) {
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
return () -> {
- SeaTunnelDataType<SeaTunnelRow> dataType;
- if (context.getCatalogTables().size() == 1) {
- dataType =
-
context.getCatalogTables().get(0).getTableSchema().toPhysicalRowDataType();
- } else {
- Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
- for (CatalogTable catalogTable : context.getCatalogTables()) {
- rowTypeMap.put(
- catalogTable.getTableId().toTablePath().toString(),
-
catalogTable.getTableSchema().toPhysicalRowDataType());
- }
- dataType = new MultipleRowType(rowTypeMap);
- }
+ List<CatalogTable> catalogTables =
+ CatalogTableUtil.getCatalogTablesFromConfig(
+ context.getOptions(), context.getClassLoader());
+ SeaTunnelDataType<SeaTunnelRow> dataType =
+ CatalogTableUtil.convertToDataType(catalogTables);
return (SeaTunnelSource<T, SplitT, StateT>)
- new MongodbIncrementalSource<>(context.getOptions(),
dataType);
+ new MongodbIncrementalSource<>(context.getOptions(),
dataType, catalogTables);
};
}
-
- @Override
- public Result applyTables(@Nonnull TableFactoryContext context) {
- return Result.of(context.getCatalogTables(), Collections.emptyList());
- }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index 43d8e505d6..270b0d7309 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -50,6 +50,7 @@ import com.google.auto.service.AutoService;
import lombok.NoArgsConstructor;
import java.time.ZoneId;
+import java.util.List;
@NoArgsConstructor
@AutoService(SeaTunnelSource.class)
@@ -58,8 +59,10 @@ public class MySqlIncrementalSource<T> extends
IncrementalSource<T, JdbcSourceCo
static final String IDENTIFIER = "MySQL-CDC";
public MySqlIncrementalSource(
- ReadonlyConfig options, SeaTunnelDataType<SeaTunnelRow> dataType) {
- super(options, dataType);
+ ReadonlyConfig options,
+ SeaTunnelDataType<SeaTunnelRow> dataType,
+ List<CatalogTable> catalogTables) {
+ super(options, dataType, catalogTables);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index 6429fa4b52..60e1105e30 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -22,15 +22,13 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.SupportMultipleTable;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
@@ -40,12 +38,10 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions
import com.google.auto.service.AutoService;
import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.List;
@AutoService(Factory.class)
-public class MySqlIncrementalSourceFactory implements TableSourceFactory,
SupportMultipleTable {
+public class MySqlIncrementalSourceFactory implements TableSourceFactory {
@Override
public String factoryIdentifier() {
return MySqlIncrementalSource.IDENTIFIER;
@@ -95,28 +91,15 @@ public class MySqlIncrementalSourceFactory implements
TableSourceFactory, Suppor
@Override
public <T, SplitT extends SourceSplit, StateT extends Serializable>
- TableSource<T, SplitT, StateT> createSource(TableFactoryContext
context) {
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
return () -> {
- SeaTunnelDataType<SeaTunnelRow> dataType;
- if (context.getCatalogTables().size() == 1) {
- dataType =
-
context.getCatalogTables().get(0).getTableSchema().toPhysicalRowDataType();
- } else {
- Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
- for (CatalogTable catalogTable : context.getCatalogTables()) {
- rowTypeMap.put(
- catalogTable.getTableId().toTablePath().toString(),
-
catalogTable.getTableSchema().toPhysicalRowDataType());
- }
- dataType = new MultipleRowType(rowTypeMap);
- }
+ List<CatalogTable> catalogTables =
+ CatalogTableUtil.getCatalogTablesFromConfig(
+ context.getOptions(), context.getClassLoader());
+ SeaTunnelDataType<SeaTunnelRow> dataType =
+ CatalogTableUtil.convertToDataType(catalogTables);
return (SeaTunnelSource<T, SplitT, StateT>)
- new MySqlIncrementalSource<>(context.getOptions(),
dataType);
+ new MySqlIncrementalSource<>(context.getOptions(),
dataType, catalogTables);
};
}
-
- @Override
- public Result applyTables(TableFactoryContext context) {
- return Result.of(context.getCatalogTables(), Collections.emptyList());
- }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
index cf9cf84b82..e56fb00423 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSource.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SupportParallelism;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.utils.JdbcUrlUtil;
@@ -50,6 +51,7 @@ import io.debezium.relational.TableId;
import lombok.NoArgsConstructor;
import java.time.ZoneId;
+import java.util.List;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerConnectionUtils.createSqlServerConnection;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.utils.SqlServerTypeUtils.convertFromTable;
@@ -62,8 +64,10 @@ public class SqlServerIncrementalSource<T> extends
IncrementalSource<T, JdbcSour
static final String IDENTIFIER = "SqlServer-CDC";
public SqlServerIncrementalSource(
- ReadonlyConfig options, SeaTunnelDataType<SeaTunnelRow> dataType) {
- super(options, dataType);
+ ReadonlyConfig options,
+ SeaTunnelDataType<SeaTunnelRow> dataType,
+ List<CatalogTable> catalogTables) {
+ super(options, dataType, catalogTables);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
index 285d4b7923..7127209aef 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
@@ -22,15 +22,13 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogOptions;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.SupportMultipleTable;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
-import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
@@ -40,12 +38,10 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions
import com.google.auto.service.AutoService;
import java.io.Serializable;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.List;
@AutoService(Factory.class)
-public class SqlServerIncrementalSourceFactory implements TableSourceFactory,
SupportMultipleTable {
+public class SqlServerIncrementalSourceFactory implements TableSourceFactory {
@Override
public String factoryIdentifier() {
@@ -100,26 +96,14 @@ public class SqlServerIncrementalSourceFactory implements
TableSourceFactory, Su
@Override
public <T, SplitT extends SourceSplit, StateT extends Serializable>
- TableSource<T, SplitT, StateT> createSource(TableFactoryContext
context) {
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
return () -> {
- SeaTunnelDataType<SeaTunnelRow> dataType;
- if (context.getCatalogTables().size() == 1) {
- dataType =
-
context.getCatalogTables().get(0).getTableSchema().toPhysicalRowDataType();
- } else {
- Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
- for (CatalogTable catalogTable : context.getCatalogTables()) {
- String tableId =
catalogTable.getTableId().toTablePath().toString();
- rowTypeMap.put(tableId,
catalogTable.getTableSchema().toPhysicalRowDataType());
- }
- dataType = new MultipleRowType(rowTypeMap);
- }
- return new SqlServerIncrementalSource(context.getOptions(),
dataType);
+ List<CatalogTable> catalogTables =
+ CatalogTableUtil.getCatalogTablesFromConfig(
+ context.getOptions(), context.getClassLoader());
+ SeaTunnelDataType<SeaTunnelRow> dataType =
+ CatalogTableUtil.convertToDataType(catalogTables);
+ return new SqlServerIncrementalSource(context.getOptions(),
dataType, catalogTables);
};
}
-
- @Override
- public SupportMultipleTable.Result applyTables(TableFactoryContext
context) {
- return SupportMultipleTable.Result.of(context.getCatalogTables(),
Collections.emptyList());
- }
}
diff --git
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
index 5a66493aee..858357d282 100644
---
a/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSinkFactory.java
@@ -23,8 +23,8 @@ import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import com.google.auto.service.AutoService;
@@ -56,7 +56,7 @@ public class ConsoleSinkFactory implements TableSinkFactory {
}
@Override
- public TableSink createSink(TableFactoryContext context) {
+ public TableSink createSink(TableSinkFactoryContext context) {
ReadonlyConfig options = context.getOptions();
return () ->
new ConsoleSink(
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
index afad20c67c..7bd3bca639 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerDataTypeConvertor.java
@@ -80,6 +80,7 @@ public class SqlServerDataTypeConvertor implements
DataTypeConvertor<SqlServerTy
case NTEXT:
case NVARCHAR:
case TEXT:
+ case XML:
return BasicType.STRING_TYPE;
case DATE:
return LocalTimeType.LOCAL_DATE_TYPE;
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
index d18ff0d7fd..d93115a707 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkFactory.java
@@ -27,8 +27,8 @@ import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions;
import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig;
@@ -74,7 +74,7 @@ public class JdbcSinkFactory implements TableSinkFactory {
}
@Override
- public TableSink createSink(TableFactoryContext context) {
+ public TableSink createSink(TableSinkFactoryContext context) {
ReadonlyConfig config = context.getOptions();
CatalogTable catalogTable = context.getCatalogTable();
Map<String, String> catalogOptions =
config.get(CatalogOptions.CATALOG_OPTIONS);
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
index 8c21a84233..264df5eafa 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/source/JdbcSourceFactory.java
@@ -22,12 +22,13 @@ import
org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.connector.TableSource;
import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.factory.TableSourceFactoryContext;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -41,6 +42,7 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.JdbcCo
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.connection.SimpleJdbcConnectionProvider;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;
import com.google.auto.service.AutoService;
import lombok.extern.slf4j.Slf4j;
@@ -49,7 +51,9 @@ import java.io.Serializable;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@@ -78,20 +82,35 @@ public class JdbcSourceFactory implements
TableSourceFactory {
@Override
@SuppressWarnings("unchecked")
public <T, SplitT extends SourceSplit, StateT extends Serializable>
- TableSource<T, SplitT, StateT> createSource(TableFactoryContext
context) {
- CatalogTable catalogTable = context.getCatalogTable();
+ TableSource<T, SplitT, StateT>
createSource(TableSourceFactoryContext context) {
JdbcSourceConfig config = JdbcSourceConfig.of(context.getOptions());
- JdbcConnectionProvider connectionProvider =
- new
SimpleJdbcConnectionProvider(config.getJdbcConnectionConfig());
- final String querySql = config.getQuery();
JdbcDialect dialect =
JdbcDialectLoader.load(
config.getJdbcConnectionConfig().getUrl(),
config.getJdbcConnectionConfig().getCompatibleMode());
- TableSchema tableSchema = catalogTable.getTableSchema();
- SeaTunnelRowType rowType = tableSchema.toPhysicalRowDataType();
- Optional<PartitionParameter> partitionParameter =
- createPartitionParameter(config, tableSchema,
connectionProvider);
+ JdbcConnectionProvider connectionProvider =
+ new
SimpleJdbcConnectionProvider(config.getJdbcConnectionConfig());
+
+ SeaTunnelRowType rowType;
+ Optional<PartitionParameter> partitionParameter = Optional.empty();
+ try {
+ CatalogTable catalogTable =
+ CatalogTableUtil.getCatalogTablesFromConfig(
+ dialect.dialectName(),
+ context.getOptions(),
+ context.getClassLoader())
+ .get(0);
+ TableSchema tableSchema = catalogTable.getTableSchema();
+ rowType = tableSchema.toPhysicalRowDataType();
+ partitionParameter = createPartitionParameter(config, tableSchema,
connectionProvider);
+ } catch (Exception e) {
+ try (Connection connection =
connectionProvider.getOrEstablishConnection()) {
+ rowType = initTableField(connection, config, dialect);
+ } catch (Exception k) {
+ throw new PrepareFailException("jdbc", PluginType.SOURCE,
k.toString());
+ }
+ }
+ final String querySql = config.getQuery();
JdbcInputFormat inputFormat =
new JdbcInputFormat(
connectionProvider,
@@ -100,18 +119,20 @@ public class JdbcSourceFactory implements
TableSourceFactory {
querySql,
config.getFetchSize(),
config.getJdbcConnectionConfig().isAutoCommit());
+ Optional<PartitionParameter> finalPartitionParameter =
partitionParameter;
+ SeaTunnelRowType finalRowType = rowType;
return () ->
(SeaTunnelSource<T, SplitT, StateT>)
new JdbcSource(
config,
- rowType,
+ finalRowType,
dialect,
inputFormat,
- partitionParameter.orElse(null),
+ finalPartitionParameter.orElse(null),
connectionProvider,
- partitionParameter.isPresent()
+ finalPartitionParameter.isPresent()
? obtainPartitionSql(
- dialect,
partitionParameter.get(), querySql)
+ dialect,
finalPartitionParameter.get(), querySql)
: querySql);
}
@@ -132,6 +153,27 @@ public class JdbcSourceFactory implements
TableSourceFactory {
partitionParameter.getPartitionColumnName());
}
+ private SeaTunnelRowType initTableField(
+ Connection conn, JdbcSourceConfig jdbcSourceConfig, JdbcDialect
jdbcDialect) {
+ JdbcDialectTypeMapper jdbcDialectTypeMapper =
jdbcDialect.getJdbcDialectTypeMapper();
+ ArrayList<SeaTunnelDataType<?>> seaTunnelDataTypes = new ArrayList<>();
+ ArrayList<String> fieldNames = new ArrayList<>();
+ try {
+ ResultSetMetaData resultSetMetaData =
+ jdbcDialect.getResultSetMetaData(conn, jdbcSourceConfig);
+ for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+ // Support AS syntax
+ fieldNames.add(resultSetMetaData.getColumnLabel(i));
+
seaTunnelDataTypes.add(jdbcDialectTypeMapper.mapping(resultSetMetaData, i));
+ }
+ } catch (Exception e) {
+ log.warn("get row type info exception", e);
+ }
+ return new SeaTunnelRowType(
+ fieldNames.toArray(new String[0]),
+ seaTunnelDataTypes.toArray(new SeaTunnelDataType<?>[0]));
+ }
+
public static Optional<PartitionParameter> createPartitionParameter(
JdbcSourceConfig config,
TableSchema tableSchema,
diff --git
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
index 450ba3f1cd..c9a365511d 100644
---
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/sink/KafkaSinkFactory.java
@@ -20,8 +20,8 @@ package org.apache.seatunnel.connectors.seatunnel.kafka.sink;
import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.MessageFormat;
@@ -56,7 +56,7 @@ public class KafkaSinkFactory implements TableSinkFactory {
}
@Override
- public TableSink createSink(TableFactoryContext context) {
+ public TableSink createSink(TableSinkFactoryContext context) {
return () ->
new KafkaSink(
context.getOptions(),
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
index c0159c5fd4..7c3592411a 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java
@@ -21,8 +21,8 @@ import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableSink;
import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksOptions;
import
org.apache.seatunnel.connectors.seatunnel.starrocks.config.StarRocksSinkOptions;
@@ -60,7 +60,7 @@ public class StarRocksSinkFactory implements TableSinkFactory
{
}
@Override
- public TableSink createSink(TableFactoryContext context) {
+ public TableSink createSink(TableSinkFactoryContext context) {
SinkConfig sinkConfig = SinkConfig.of(context.getOptions());
CatalogTable catalogTable = context.getCatalogTable();
if (StringUtils.isBlank(sinkConfig.getTable())) {
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
index b5dcdf0534..18fea5fb28 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/dag/actions/ShuffleMultipleRowStrategy.java
@@ -17,11 +17,10 @@
package org.apache.seatunnel.engine.core.dag.actions;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.event.SchemaChangeEvent;
-import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import com.hazelcast.collection.IQueue;
import com.hazelcast.core.HazelcastInstance;
@@ -33,6 +32,7 @@ import lombok.experimental.Tolerate;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
@@ -44,7 +44,7 @@ import java.util.stream.Stream;
@Setter
@ToString
public class ShuffleMultipleRowStrategy extends ShuffleStrategy {
- private MultipleRowType inputRowType;
+ private List<CatalogTable> catalogTables;
private String targetTableId;
@Tolerate
@@ -54,8 +54,8 @@ public class ShuffleMultipleRowStrategy extends
ShuffleStrategy {
public Map<String, IQueue<Record<?>>> createShuffles(
HazelcastInstance hazelcast, int pipelineId, int inputIndex) {
Map<String, IQueue<Record<?>>> shuffleMap = new HashMap<>();
- for (Map.Entry<String, SeaTunnelRowType> entry : inputRowType) {
- String tableId = entry.getKey();
+ for (CatalogTable entry : catalogTables) {
+ String tableId = entry.getTableId().toTablePath().toString();
String queueName = generateQueueName(pipelineId, inputIndex,
tableId);
IQueue<Record<?>> queue = getIQueue(hazelcast, queueName);
// clear old data when job restore
diff --git
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
index c83ceade12..d38b5aacb7 100644
---
a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
+++
b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java
@@ -23,16 +23,12 @@ import org.apache.seatunnel.api.common.CommonOptions;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.env.EnvCommonOptions;
-import org.apache.seatunnel.api.env.ParsingMode;
import org.apache.seatunnel.api.sink.DataSaveMode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SupportDataSaveMode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.source.SourceOptions;
import org.apache.seatunnel.api.source.SourceSplit;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.factory.Factory;
import org.apache.seatunnel.api.table.factory.FactoryUtil;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
@@ -78,7 +74,6 @@ import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
@@ -303,31 +298,15 @@ public class MultipleTableJobConfigParser {
factoryId,
(factory) -> factory.createSource(null));
- final List<CatalogTable> catalogTables = new ArrayList<>();
- if (!fallback) {
- List<CatalogTable> tables =
- CatalogTableUtil.getCatalogTables(sourceConfig,
classLoader);
- if (!tables.isEmpty()) {
- catalogTables.addAll(tables);
- }
- }
-
- if (fallback || catalogTables.isEmpty()) {
+ if (fallback) {
Tuple2<CatalogTable, Action> tuple =
fallbackParser.parseSource(sourceConfig, jobConfig,
tableId, parallelism);
return new Tuple2<>(tableId, Collections.singletonList(tuple));
}
- if (readonlyConfig.get(SourceOptions.DAG_PARSING_MODE) ==
ParsingMode.SHARDING) {
- CatalogTable shardingTable = catalogTables.get(0);
- catalogTables.clear();
- catalogTables.add(shardingTable);
- }
-
List<Tuple2<SeaTunnelSource<Object, SourceSplit, Serializable>,
List<CatalogTable>>>
sources =
- FactoryUtil.createAndPrepareSource(
- catalogTables, readonlyConfig, classLoader,
factoryId);
+ FactoryUtil.createAndPrepareSource(readonlyConfig,
classLoader, factoryId);
Set<URL> factoryUrls =
getFactoryUrls(readonlyConfig, classLoader,
TableSourceFactory.class, factoryId);
@@ -465,9 +444,26 @@ public class MultipleTableJobConfigParser {
public static SeaTunnelDataType<?> getProducedType(Action action) {
if (action instanceof SourceAction) {
- return ((SourceAction<?, ?, ?>)
action).getSource().getProducedType();
+ try {
+ return ((SourceAction<?, ?, ?>) action)
+ .getSource()
+ .getProducedCatalogTables()
+ .get(0)
+ .getSeaTunnelRowType();
+ } catch (UnsupportedOperationException e) {
+ // TODO remove it when all connector use
`getProducedCatalogTables`
+ return ((SourceAction<?, ?, ?>)
action).getSource().getProducedType();
+ }
} else if (action instanceof TransformAction) {
- return ((TransformAction) action).getTransform().getProducedType();
+ try {
+ return ((TransformAction) action)
+ .getTransform()
+ .getProducedCatalogTable()
+ .getSeaTunnelRowType();
+ } catch (UnsupportedOperationException e) {
+ // TODO remove it when all connector use
`getProducedCatalogTables`
+ return ((TransformAction)
action).getTransform().getProducedType();
+ }
}
throw new UnsupportedOperationException();
}
@@ -534,13 +530,6 @@ public class MultipleTableJobConfigParser {
return fallbackParser.parseSinks(configIndex, inputVertices,
sinkConfig, jobConfig);
}
- Map<TablePath, CatalogTable> tableMap =
- CatalogTableUtil.getCatalogTables(sinkConfig,
classLoader).stream()
- .collect(
- Collectors.toMap(
- catalogTable ->
catalogTable.getTableId().toTablePath(),
- catalogTable -> catalogTable));
-
// get factory urls
Set<URL> factoryUrls =
getFactoryUrls(readonlyConfig, classLoader,
TableSinkFactory.class, factoryId);
@@ -558,7 +547,6 @@ public class MultipleTableJobConfigParser {
SinkAction<?, ?, ?, ?> sinkAction =
createSinkAction(
inputActionSample._1(),
- tableMap,
inputActions,
readonlyConfig,
classLoader,
@@ -575,7 +563,6 @@ public class MultipleTableJobConfigParser {
SinkAction<?, ?, ?, ?> sinkAction =
createSinkAction(
tuple._1(),
- tableMap,
Collections.singleton(tuple._2()),
readonlyConfig,
classLoader,
@@ -590,7 +577,6 @@ public class MultipleTableJobConfigParser {
private SinkAction<?, ?, ?, ?> createSinkAction(
CatalogTable catalogTable,
- Map<TablePath, CatalogTable> sinkTableMap,
Set<Action> inputActions,
ReadonlyConfig readonlyConfig,
ClassLoader classLoader,
@@ -598,17 +584,6 @@ public class MultipleTableJobConfigParser {
String factoryId,
int parallelism,
int configIndex) {
- Optional<CatalogTable> insteadTable;
- if (sinkTableMap.size() == 1) {
- insteadTable = sinkTableMap.values().stream().findFirst();
- } else {
- // TODO: another table full name map
- insteadTable =
-
Optional.ofNullable(sinkTableMap.get(catalogTable.getTableId().toTablePath()));
- }
- if (insteadTable.isPresent()) {
- catalogTable = insteadTable.get();
- }
SeaTunnelSink<?, ?, ?, ?> sink =
FactoryUtil.createAndPrepareSink(
catalogTable, readonlyConfig, classLoader, factoryId);
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
index d7beaf3a32..7e23c0b123 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/execution/ExecutionPlanGenerator.java
@@ -17,9 +17,7 @@
package org.apache.seatunnel.engine.server.dag.execution;
-import org.apache.seatunnel.api.table.type.MultipleRowType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.engine.common.config.server.CheckpointConfig;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
@@ -216,8 +214,12 @@ public class ExecutionPlanGenerator {
}
ExecutionVertex sourceExecutionVertex =
sourceExecutionVertices.stream().findFirst().get();
SourceAction sourceAction = (SourceAction)
sourceExecutionVertex.getAction();
- SeaTunnelDataType sourceProducedType =
sourceAction.getSource().getProducedType();
- if (!SqlType.MULTIPLE_ROW.equals(sourceProducedType.getSqlType())) {
+ List<CatalogTable> producedCatalogTables = new ArrayList<>();
+ try {
+ producedCatalogTables =
sourceAction.getSource().getProducedCatalogTables();
+ } catch (UnsupportedOperationException e) {
+ }
+ if (producedCatalogTables.size() <= 1) {
return executionEdges;
}
@@ -234,7 +236,7 @@ public class ExecutionPlanGenerator {
ShuffleMultipleRowStrategy.builder()
.jobId(jobImmutableInformation.getJobId())
.inputPartitions(sourceAction.getParallelism())
-
.inputRowType(MultipleRowType.class.cast(sourceProducedType))
+ .catalogTables(producedCatalogTables)
.queueEmptyQueueTtl((int)
(checkpointConfig.getCheckpointInterval() * 3))
.build();
ShuffleConfig shuffleConfig =
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
index 5d0f571d49..25f1e850f5 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/dag/physical/PhysicalPlanGenerator.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.engine.server.dag.physical;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.engine.common.config.server.QueueType;
import org.apache.seatunnel.engine.common.utils.IdGenerator;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
@@ -67,7 +66,6 @@ import lombok.NonNull;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -323,11 +321,6 @@ public class PhysicalPlanGenerator {
SinkAction sinkAction = (SinkAction)
sinkFlow.getAction();
String sinkTableId =
sinkAction.getConfig().getMultipleRowTableId();
- MultipleRowType multipleRowType =
-
shuffleMultipleRowStrategy.getInputRowType();
- int sinkTableIndex =
-
Arrays.asList(multipleRowType.getTableIds())
- .indexOf(sinkTableId);
long taskIDPrefix =
idGenerator.getNextId();
long taskGroupIDPrefix =
idGenerator.getNextId();
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
index 80a0dff04e..2a4a129adb 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/task/SourceSeaTunnelTask.java
@@ -21,6 +21,9 @@ import org.apache.seatunnel.api.common.metrics.MetricsContext;
import org.apache.seatunnel.api.env.EnvCommonOptions;
import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.core.starter.flowcontrol.FlowControlStrategy;
import org.apache.seatunnel.engine.core.dag.actions.SourceAction;
import org.apache.seatunnel.engine.server.dag.physical.config.SourceConfig;
@@ -73,13 +76,22 @@ public class SourceSeaTunnelTask<T, SplitT extends
SourceSplit> extends SeaTunne
"SourceSeaTunnelTask only support SourceFlowLifeCycle, but
get "
+ startFlowLifeCycle.getClass().getName());
} else {
+ SeaTunnelDataType sourceProducedType;
+ try {
+ List<CatalogTable> producedCatalogTables =
+
sourceFlow.getAction().getSource().getProducedCatalogTables();
+ sourceProducedType =
CatalogTableUtil.convertToDataType(producedCatalogTables);
+ } catch (UnsupportedOperationException e) {
+ // TODO remove it when all connector use
`getProducedCatalogTables`
+ sourceProducedType =
sourceFlow.getAction().getSource().getProducedType();
+ }
this.collector =
new SeaTunnelSourceCollector<>(
checkpointLock,
outputs,
this.getMetricsContext(),
getFlowControlStrategy(),
-
sourceFlow.getAction().getSource().getProducedType());
+ sourceProducedType);
((SourceFlowLifeCycle<T, SplitT>)
startFlowLifeCycle).setCollector(collector);
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java
index 2271fa641b..d8d8a97962 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/copy/CopyFieldTransformFactory.java
@@ -21,11 +21,10 @@ import
org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableTransform;
import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import com.google.auto.service.AutoService;
-import lombok.NonNull;
@AutoService(Factory.class)
public class CopyFieldTransformFactory implements TableTransformFactory {
@@ -43,9 +42,9 @@ public class CopyFieldTransformFactory implements
TableTransformFactory {
}
@Override
- public TableTransform createTransform(@NonNull TableFactoryContext
context) {
+ public TableTransform createTransform(TableTransformFactoryContext
context) {
CopyTransformConfig copyTransformConfig =
CopyTransformConfig.of(context.getOptions());
- CatalogTable catalogTable = context.getCatalogTable();
+ CatalogTable catalogTable = context.getCatalogTables().get(0);
return () -> new CopyFieldTransform(copyTransformConfig, catalogTable);
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformFactory.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformFactory.java
index 9e6334516b..b7382175ba 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformFactory.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/fieldmapper/FieldMapperTransformFactory.java
@@ -22,8 +22,8 @@ import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableTransform;
import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import com.google.auto.service.AutoService;
@@ -40,8 +40,8 @@ public class FieldMapperTransformFactory implements
TableTransformFactory {
}
@Override
- public TableTransform createTransform(TableFactoryContext context) {
- CatalogTable catalogTable = context.getCatalogTable();
+ public TableTransform createTransform(TableTransformFactoryContext
context) {
+ CatalogTable catalogTable = context.getCatalogTables().get(0);
ReadonlyConfig options = context.getOptions();
FieldMapperTransformConfig fieldMapperTransformConfig =
FieldMapperTransformConfig.of(options);
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
index 14259a0a85..f562a7cc28 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filter/FilterFieldTransformFactory.java
@@ -21,8 +21,8 @@ import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableTransform;
import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import com.google.auto.service.AutoService;
@@ -41,8 +41,8 @@ public class FilterFieldTransformFactory implements
TableTransformFactory {
}
@Override
- public TableTransform createTransform(TableFactoryContext context) {
- CatalogTable catalogTable = context.getCatalogTable();
+ public TableTransform createTransform(TableTransformFactoryContext
context) {
+ CatalogTable catalogTable = context.getCatalogTables().get(0);
return () -> new FilterFieldTransform(context.getOptions(),
catalogTable);
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java
index 9e89ebe5e2..2191e30bc5 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/filterrowkind/FilterRowKindTransformFactory.java
@@ -21,8 +21,8 @@ import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableTransform;
import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import com.google.auto.service.AutoService;
@@ -43,8 +43,8 @@ public class FilterRowKindTransformFactory implements
TableTransformFactory {
}
@Override
- public TableTransform createTransform(TableFactoryContext context) {
- CatalogTable catalogTable = context.getCatalogTable();
+ public TableTransform createTransform(TableTransformFactoryContext
context) {
+ CatalogTable catalogTable = context.getCatalogTables().get(0);
return () -> new FilterRowKindTransform(context.getOptions(),
catalogTable);
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java
index 25696ba6e6..c0bed8977d 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/replace/ReplaceTransformFactory.java
@@ -21,8 +21,8 @@ import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableTransform;
import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import com.google.auto.service.AutoService;
@@ -49,8 +49,8 @@ public class ReplaceTransformFactory implements
TableTransformFactory {
}
@Override
- public TableTransform createTransform(TableFactoryContext context) {
- CatalogTable catalogTable = context.getCatalogTable();
+ public TableTransform createTransform(TableTransformFactoryContext
context) {
+ CatalogTable catalogTable = context.getCatalogTables().get(0);
return () -> new ReplaceTransform(context.getOptions(), catalogTable);
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java
index 91281251e9..32d660860b 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/split/SplitTransformFactory.java
@@ -21,11 +21,10 @@ import
org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableTransform;
import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import com.google.auto.service.AutoService;
-import lombok.NonNull;
@AutoService(Factory.class)
public class SplitTransformFactory implements TableTransformFactory {
@@ -45,9 +44,9 @@ public class SplitTransformFactory implements
TableTransformFactory {
}
@Override
- public TableTransform createTransform(@NonNull TableFactoryContext
context) {
+ public TableTransform createTransform(TableTransformFactoryContext
context) {
SplitTransformConfig splitTransformConfig =
SplitTransformConfig.of(context.getOptions());
- CatalogTable catalogTable = context.getCatalogTable();
+ CatalogTable catalogTable = context.getCatalogTables().get(0);
return () -> new SplitTransform(splitTransformConfig, catalogTable);
}
}
diff --git
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java
index f509af832c..5c4abf53c0 100644
---
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java
+++
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java
@@ -21,8 +21,8 @@ import org.apache.seatunnel.api.configuration.util.OptionRule;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.connector.TableTransform;
import org.apache.seatunnel.api.table.factory.Factory;
-import org.apache.seatunnel.api.table.factory.TableFactoryContext;
import org.apache.seatunnel.api.table.factory.TableTransformFactory;
+import org.apache.seatunnel.api.table.factory.TableTransformFactoryContext;
import com.google.auto.service.AutoService;
@@ -41,8 +41,8 @@ public class SQLTransformFactory implements
TableTransformFactory {
}
@Override
- public TableTransform createTransform(TableFactoryContext context) {
- CatalogTable catalogTable = context.getCatalogTable();
+ public TableTransform createTransform(TableTransformFactoryContext
context) {
+ CatalogTable catalogTable = context.getCatalogTables().get(0);
return () -> new SQLTransform(context.getOptions(), catalogTable);
}
}