This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this 
push:
     new 21ef45fcc [Feature][CDC] MySQL CDC supports deserialization of 
multi-tables (#4067)
21ef45fcc is described below

commit 21ef45fcca04b0244994d3a4e45a19cc08966603
Author: hailin0 <[email protected]>
AuthorDate: Tue Feb 7 10:40:30 2023 +0800

    [Feature][CDC] MySQL CDC supports deserialization of multi-tables (#4067)
---
 .../api/table/catalog/TableIdentifier.java         |  2 +-
 .../seatunnel/api/table/type/MultipleRowType.java  | 56 +++++++++++++++++++++
 .../seatunnel/api/table/type/SeaTunnelRow.java     |  6 +--
 .../apache/seatunnel/api/table/type/SqlType.java   |  3 +-
 .../cdc/base/source/IncrementalSource.java         | 18 +++++++
 .../row/SeaTunnelRowDebeziumDeserializeSchema.java | 58 ++++++++++++++++------
 .../cdc/mysql/source/MySqlIncrementalSource.java   | 21 +++++---
 .../source/MySqlIncrementalSourceFactory.java      | 42 +++++++++++++++-
 .../transform/common/SeaTunnelRowAccessor.java     |  2 +-
 9 files changed, 181 insertions(+), 27 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
index e3c60ada1..66ae7df2d 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java
@@ -47,7 +47,7 @@ public final class TableIdentifier implements Serializable {
         return databaseName;
     }
 
-    public String gettableName() {
+    public String getTableName() {
         return tableName;
     }
 
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MultipleRowType.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MultipleRowType.java
new file mode 100644
index 000000000..5d7fe330f
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/MultipleRowType.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.type;
+
+import lombok.RequiredArgsConstructor;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+@RequiredArgsConstructor
+public class MultipleRowType implements SeaTunnelDataType<SeaTunnelRow>, 
Iterable<Map.Entry<String, SeaTunnelRowType>> {
+    private final Map<String, SeaTunnelRowType> rowTypeMap;
+
+    public MultipleRowType(String[] tableIds, SeaTunnelRowType[] rowTypes) {
+        Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
+        for (int i = 0; i < tableIds.length; i++) {
+            rowTypeMap.put(tableIds[i], rowTypes[i]);
+        }
+        this.rowTypeMap = rowTypeMap;
+    }
+
+    public SeaTunnelRowType getRowType(String tableId) {
+        return rowTypeMap.get(tableId);
+    }
+
+    @Override
+    public Class<SeaTunnelRow> getTypeClass() {
+        return SeaTunnelRow.class;
+    }
+
+    @Override
+    public SqlType getSqlType() {
+        return SqlType.MULTIPLE_ROW;
+    }
+
+    @Override
+    public Iterator<Map.Entry<String, SeaTunnelRowType>> iterator() {
+        return rowTypeMap.entrySet().iterator();
+    }
+}
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
index e78d246d9..2d48c6cb1 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java
@@ -29,7 +29,7 @@ import java.util.Objects;
 public final class SeaTunnelRow implements Serializable {
     private static final long serialVersionUID = -1L;
     /** Table identifier, used for the source connector that {@link 
SupportMultipleTable}. */
-    private int tableId = -1;
+    private String tableId;
     /** The kind of change that a row describes in a changelog. */
     private RowKind kind = RowKind.INSERT;
     /** The array to store the actual internal format values. */
@@ -47,7 +47,7 @@ public final class SeaTunnelRow implements Serializable {
         this.fields[pos] = value;
     }
 
-    public void setTableId(int tableId) {
+    public void setTableId(String tableId) {
         this.tableId = tableId;
     }
 
@@ -59,7 +59,7 @@ public final class SeaTunnelRow implements Serializable {
         return fields.length;
     }
 
-    public int getTableId() {
+    public String getTableId() {
         return tableId;
     }
 
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
index ad8b3c347..1e5c05b90 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java
@@ -37,5 +37,6 @@ public enum SqlType {
     DATE,
     TIME,
     TIMESTAMP,
-    ROW;
+    ROW,
+    MULTIPLE_ROW;
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
index 41e416aa5..8be224ea4 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.config.StartupConfig;
 import org.apache.seatunnel.connectors.cdc.base.config.StopConfig;
@@ -53,6 +54,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceRead
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import io.debezium.relational.TableId;
+import lombok.NoArgsConstructor;
 
 import java.util.HashMap;
 import java.util.HashSet;
@@ -61,6 +63,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Supplier;
 
+@NoArgsConstructor
 public abstract class IncrementalSource<T, C extends SourceConfig> implements 
SeaTunnelSource<T, SourceSplitBase, PendingSplitsState> {
 
     protected ReadonlyConfig readonlyConfig;
@@ -76,6 +79,21 @@ public abstract class IncrementalSource<T, C extends 
SourceConfig> implements Se
     protected StopMode stopMode;
     protected DebeziumDeserializationSchema<T> deserializationSchema;
 
+    protected SeaTunnelDataType<SeaTunnelRow> dataType;
+
+    protected IncrementalSource(ReadonlyConfig options, 
SeaTunnelDataType<SeaTunnelRow> dataType) {
+        this.dataType = dataType;
+        this.readonlyConfig = options;
+        this.startupConfig = getStartupConfig(readonlyConfig);
+        this.stopConfig = getStopConfig(readonlyConfig);
+        this.stopMode = stopConfig.getStopMode();
+        this.incrementalParallelism = 
readonlyConfig.get(SourceOptions.INCREMENTAL_PARALLELISM);
+        this.configFactory = createSourceConfigFactory(readonlyConfig);
+        this.dataSourceDialect = createDataSourceDialect(readonlyConfig);
+        this.deserializationSchema = 
createDebeziumDeserializationSchema(readonlyConfig);
+        this.offsetFactory = createOffsetFactory(readonlyConfig);
+    }
+
     @Override
     public final void prepare(Config pluginConfig) throws PrepareFailException 
{
         this.readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
index 8bcee31a5..c94d59064 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
@@ -18,8 +18,10 @@
 package org.apache.seatunnel.connectors.cdc.debezium.row;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static io.debezium.connector.AbstractSourceInfo.DATABASE_NAME_KEY;
 
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -36,6 +38,9 @@ import org.apache.kafka.connect.source.SourceRecord;
 
 import java.io.Serializable;
 import java.time.ZoneId;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Deserialization schema from Debezium object to {@link SeaTunnelRow}.
@@ -52,7 +57,9 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
     /**
      * Runtime converter that converts Kafka {@link SourceRecord}s into {@link 
SeaTunnelRow} consisted of
      */
-    private final SeaTunnelRowDebeziumDeserializationConverters converters;
+    private final SeaTunnelRowDebeziumDeserializationConverters 
singleRowConverter;
+
+    private final Map<String, SeaTunnelRowDebeziumDeserializationConverters> 
multipleRowConverters;
 
     /**
      * Validator to validate the row value.
@@ -67,18 +74,35 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
     }
 
     SeaTunnelRowDebeziumDeserializeSchema(
-        SeaTunnelRowType physicalDataType,
+        SeaTunnelDataType<SeaTunnelRow> physicalDataType,
         MetadataConverter[] metadataConverters,
-        SeaTunnelRowType resultType,
+        SeaTunnelDataType<SeaTunnelRow> resultType,
         ValueValidator validator,
         ZoneId serverTimeZone,
         DebeziumDeserializationConverterFactory userDefinedConverterFactory) {
-        this.converters = new SeaTunnelRowDebeziumDeserializationConverters(
-            physicalDataType,
-            metadataConverters,
-            serverTimeZone,
-            userDefinedConverterFactory
-        );
+
+        SeaTunnelRowDebeziumDeserializationConverters singleRowConverter = 
null;
+        Map<String, SeaTunnelRowDebeziumDeserializationConverters> 
multipleRowConverters = Collections.emptyMap();
+        if (physicalDataType instanceof MultipleRowType) {
+            multipleRowConverters = new HashMap<>();
+            for (Map.Entry<String, SeaTunnelRowType> item : (MultipleRowType) 
physicalDataType) {
+                SeaTunnelRowDebeziumDeserializationConverters itemRowConverter 
= new SeaTunnelRowDebeziumDeserializationConverters(
+                    item.getValue(),
+                    metadataConverters,
+                    serverTimeZone,
+                    userDefinedConverterFactory);
+                multipleRowConverters.put(item.getKey(), itemRowConverter);
+            }
+        } else {
+            singleRowConverter = new 
SeaTunnelRowDebeziumDeserializationConverters(
+                (SeaTunnelRowType) physicalDataType,
+                metadataConverters,
+                serverTimeZone,
+                userDefinedConverterFactory
+            );
+        }
+        this.singleRowConverter = singleRowConverter;
+        this.multipleRowConverters = multipleRowConverters;
         this.resultTypeInfo = checkNotNull(resultType);
         this.validator = checkNotNull(validator);
     }
@@ -90,28 +114,34 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
         Schema valueSchema = record.valueSchema();
 
         Struct sourceStruct = 
messageStruct.getStruct(Envelope.FieldName.SOURCE);
-        // TODO: multi-table
+        String database = sourceStruct.getString(DATABASE_NAME_KEY);
         String tableName = 
sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY);
+        String tableId = database + ":" + tableName;
+        SeaTunnelRowDebeziumDeserializationConverters converters = 
multipleRowConverters.getOrDefault(tableId, singleRowConverter);
 
         if (operation == Envelope.Operation.CREATE || operation == 
Envelope.Operation.READ) {
             SeaTunnelRow insert = extractAfterRow(converters, record, 
messageStruct, valueSchema);
             insert.setRowKind(RowKind.INSERT);
+            insert.setTableId(tableId);
             validator.validate(insert, RowKind.INSERT);
             collector.collect(insert);
         } else if (operation == Envelope.Operation.DELETE) {
             SeaTunnelRow delete = extractBeforeRow(converters, record, 
messageStruct, valueSchema);
             validator.validate(delete, RowKind.DELETE);
             delete.setRowKind(RowKind.DELETE);
+            delete.setTableId(tableId);
             collector.collect(delete);
         } else {
             SeaTunnelRow before = extractBeforeRow(converters, record, 
messageStruct, valueSchema);
             validator.validate(before, RowKind.UPDATE_BEFORE);
             before.setRowKind(RowKind.UPDATE_BEFORE);
+            before.setTableId(tableId);
             collector.collect(before);
 
             SeaTunnelRow after = extractAfterRow(converters, record, 
messageStruct, valueSchema);
             validator.validate(after, RowKind.UPDATE_AFTER);
             after.setRowKind(RowKind.UPDATE_AFTER);
+            after.setTableId(tableId);
             collector.collect(after);
         }
     }
@@ -159,8 +189,8 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
      * Builder of {@link SeaTunnelRowDebeziumDeserializeSchema}.
      */
     public static class Builder {
-        private SeaTunnelRowType physicalRowType;
-        private SeaTunnelRowType resultTypeInfo;
+        private SeaTunnelDataType<SeaTunnelRow> physicalRowType;
+        private SeaTunnelDataType<SeaTunnelRow> resultTypeInfo;
         private MetadataConverter[] metadataConverters = new 
MetadataConverter[0];
         private ValueValidator validator = (rowData, rowKind) -> {
         };
@@ -168,7 +198,7 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
         private DebeziumDeserializationConverterFactory 
userDefinedConverterFactory =
             DebeziumDeserializationConverterFactory.DEFAULT;
 
-        public Builder setPhysicalRowType(SeaTunnelRowType physicalRowType) {
+        public Builder setPhysicalRowType(SeaTunnelDataType<SeaTunnelRow> 
physicalRowType) {
             this.physicalRowType = physicalRowType;
             return this;
         }
@@ -178,7 +208,7 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
             return this;
         }
 
-        public Builder setResultTypeInfo(SeaTunnelRowType resultTypeInfo) {
+        public Builder setResultTypeInfo(SeaTunnelDataType<SeaTunnelRow> 
resultTypeInfo) {
             this.resultTypeInfo = resultTypeInfo;
             return this;
         }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index 0d5d5e694..97f8dca85 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -22,7 +22,8 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SupportParallelism;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
 import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
@@ -45,6 +46,10 @@ import java.time.ZoneId;
 public class MySqlIncrementalSource<T> extends IncrementalSource<T, 
JdbcSourceConfig> implements SupportParallelism {
     static final String IDENTIFIER = "MySQL-CDC";
 
+    public MySqlIncrementalSource(ReadonlyConfig options, 
SeaTunnelDataType<SeaTunnelRow> dataType) {
+        super(options, dataType);
+    }
+
     @Override
     public String getPluginName() {
         return IDENTIFIER;
@@ -65,11 +70,15 @@ public class MySqlIncrementalSource<T> extends 
IncrementalSource<T, JdbcSourceCo
     public DebeziumDeserializationSchema<T> 
createDebeziumDeserializationSchema(ReadonlyConfig config) {
         JdbcSourceConfig jdbcSourceConfig = configFactory.create(0);
         String baseUrl = config.get(JdbcCatalogOptions.BASE_URL);
-        // TODO: support multi-table
-        // TODO: support metadata keys
-        MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql", 
jdbcSourceConfig.getDatabaseList().get(0), jdbcSourceConfig.getUsername(), 
jdbcSourceConfig.getPassword(), baseUrl);
-        CatalogTable table = 
mySqlCatalog.getTable(TablePath.of(jdbcSourceConfig.getDatabaseList().get(0), 
config.get(JdbcSourceOptions.TABLE_NAME)));
-        SeaTunnelRowType physicalRowType = 
table.getTableSchema().toPhysicalRowDataType();
+        SeaTunnelDataType<SeaTunnelRow> physicalRowType;
+        if (dataType == null) {
+            // TODO: support metadata keys
+            MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql", 
jdbcSourceConfig.getDatabaseList().get(0), jdbcSourceConfig.getUsername(), 
jdbcSourceConfig.getPassword(), baseUrl);
+            CatalogTable table = 
mySqlCatalog.getTable(TablePath.of(jdbcSourceConfig.getDatabaseList().get(0), 
config.get(JdbcSourceOptions.TABLE_NAME)));
+            physicalRowType = table.getTableSchema().toPhysicalRowDataType();
+        } else {
+            physicalRowType = dataType;
+        }
         String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
         return (DebeziumDeserializationSchema<T>) 
SeaTunnelRowDebeziumDeserializeSchema.builder()
             .setPhysicalRowType(physicalRowType)
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index b4fa5467f..f97894522 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -19,15 +19,29 @@ package 
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.source;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.connector.TableSource;
 import org.apache.seatunnel.api.table.factory.Factory;
+import org.apache.seatunnel.api.table.factory.SupportMultipleTable;
+import org.apache.seatunnel.api.table.factory.TableFactoryContext;
 import org.apache.seatunnel.api.table.factory.TableSourceFactory;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
 
 import com.google.auto.service.AutoService;
 
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 @AutoService(Factory.class)
-public class MySqlIncrementalSourceFactory implements TableSourceFactory {
+public class MySqlIncrementalSourceFactory implements TableSourceFactory, 
SupportMultipleTable {
     @Override
     public String factoryIdentifier() {
         return MySqlIncrementalSource.IDENTIFIER;
@@ -57,4 +71,30 @@ public class MySqlIncrementalSourceFactory implements 
TableSourceFactory {
     public Class<? extends SeaTunnelSource> getSourceClass() {
         return MySqlIncrementalSource.class;
     }
+
+    @Override
+    public <T, SplitT extends SourceSplit, StateT extends Serializable> 
TableSource<T, SplitT, StateT> createSource(TableFactoryContext context) {
+        return () -> {
+            SeaTunnelDataType<SeaTunnelRow> dataType;
+            if (context.getCatalogTables().size() == 1) {
+                dataType = context.getCatalogTables()
+                    .get(0)
+                    .getTableSchema()
+                    .toPhysicalRowDataType();
+            } else {
+                Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
+                for (CatalogTable catalogTable : context.getCatalogTables()) {
+                    String tableId = 
catalogTable.getTableId().getDatabaseName() + ":" + 
catalogTable.getTableId().getTableName();
+                    rowTypeMap.put(tableId, 
catalogTable.getTableSchema().toPhysicalRowDataType());
+                }
+                dataType = new MultipleRowType(rowTypeMap);
+            }
+            return (SeaTunnelSource<T, SplitT, StateT>) new 
MySqlIncrementalSource<>(context.getOptions(), dataType);
+        };
+    }
+
+    @Override
+    public Result applyTables(TableFactoryContext context) {
+        return Result.of(context.getCatalogTables(), Collections.emptyList());
+    }
 }
diff --git 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
index 81b2f50f0..0224ef4b8 100644
--- 
a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
+++ 
b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/SeaTunnelRowAccessor.java
@@ -30,7 +30,7 @@ public class SeaTunnelRowAccessor {
         return row.getArity();
     }
 
-    public int getTableId() {
+    public String getTableId() {
         return row.getTableId();
     }
 

Reply via email to