This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 37fcff347e [Bug][CDC] Fix state recovery error when switching a single 
table to multiple tables (#5784)
37fcff347e is described below

commit 37fcff347e86656ba95b4efdffd95f654c878ce7
Author: hailin0 <[email protected]>
AuthorDate: Tue Nov 7 13:33:22 2023 +0800

    [Bug][CDC] Fix state recovery error when switching a single table to 
multiple tables (#5784)
---
 .../api/table/catalog/CatalogTableUtil.java        | 16 +++--
 .../row/SeaTunnelRowDebeziumDeserializeSchema.java | 15 +++--
 .../mongodb/MongodbIncrementalSourceFactory.java   |  2 +-
 .../MongoDBConnectorDeserializationSchema.java     | 70 +++++++++++++++-------
 .../source/MySqlIncrementalSourceFactory.java      |  2 +-
 .../source/SqlServerIncrementalSourceFactory.java  |  2 +-
 .../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java |  6 ++
 7 files changed, 78 insertions(+), 35 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index c6a22e96c5..0439754914 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -151,13 +151,17 @@ public class CatalogTableUtil implements Serializable {
         if (catalogTables.size() == 1) {
             return 
catalogTables.get(0).getTableSchema().toPhysicalRowDataType();
         } else {
-            Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
-            for (CatalogTable catalogTable : catalogTables) {
-                String tableId = 
catalogTable.getTableId().toTablePath().toString();
-                rowTypeMap.put(tableId, 
catalogTable.getTableSchema().toPhysicalRowDataType());
-            }
-            return new MultipleRowType(rowTypeMap);
+            return convertToMultipleRowType(catalogTables);
+        }
+    }
+
+    public static MultipleRowType convertToMultipleRowType(List<CatalogTable> 
catalogTables) {
+        Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
+        for (CatalogTable catalogTable : catalogTables) {
+            String tableId = 
catalogTable.getTableId().toTablePath().toString();
+            rowTypeMap.put(tableId, 
catalogTable.getTableSchema().toPhysicalRowDataType());
         }
+        return new MultipleRowType(rowTypeMap);
     }
 
     // We need to use buildWithConfig(String catalogName, ReadonlyConfig 
readonlyConfig);
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
index ea0a3fc13e..3e86a6603d 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver;
 import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
 import 
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverterFactory;
@@ -233,12 +234,14 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
 
     @Override
     public void restoreCheckpointProducedType(SeaTunnelDataType<SeaTunnelRow> 
checkpointDataType) {
-        if 
(!checkpointDataType.getSqlType().equals(resultTypeInfo.getSqlType())) {
-            throw new IllegalStateException(
-                    String.format(
-                            "The produced type %s of the SeaTunnel 
deserialization schema "
-                                    + "doesn't match the type %s of the 
restored snapshot.",
-                            resultTypeInfo.getSqlType(), 
checkpointDataType.getSqlType()));
+        if (SqlType.ROW.equals(checkpointDataType.getSqlType())
+                && SqlType.MULTIPLE_ROW.equals(resultTypeInfo.getSqlType())) {
+            // TODO: Older versions may have this issue
+            log.warn(
+                    "Skip incompatible restore type. produced type: {}, 
checkpoint type: {}",
+                    resultTypeInfo,
+                    checkpointDataType);
+            return;
         }
         if (checkpointDataType instanceof MultipleRowType) {
             MultipleRowType latestDataType = (MultipleRowType) resultTypeInfo;
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
index 761d36f8a2..07f85fa2a6 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
@@ -77,7 +77,7 @@ public class MongodbIncrementalSourceFactory implements 
TableSourceFactory {
                     CatalogTableUtil.getCatalogTables(
                             context.getOptions(), context.getClassLoader());
             SeaTunnelDataType<SeaTunnelRow> dataType =
-                    CatalogTableUtil.convertToDataType(catalogTables);
+                    CatalogTableUtil.convertToMultipleRowType(catalogTables);
             return (SeaTunnelSource<T, SplitT, StateT>)
                     new MongodbIncrementalSource<>(context.getOptions(), 
dataType, catalogTables);
         };
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
index 4df666d2ad..8a72e0cd4c 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
 import org.apache.seatunnel.api.table.type.RowKind;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -41,6 +42,7 @@ import org.bson.json.JsonWriterSettings;
 import org.bson.types.Decimal128;
 
 import com.mongodb.client.model.changestream.OperationType;
+import lombok.extern.slf4j.Slf4j;
 
 import javax.annotation.Nonnull;
 
@@ -67,17 +69,17 @@ import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.Mongo
 import static 
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument;
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
 
+@Slf4j
 public class MongoDBConnectorDeserializationSchema
         implements DebeziumDeserializationSchema<SeaTunnelRow> {
-
     private final SeaTunnelDataType<SeaTunnelRow> resultTypeInfo;
 
-    private final DeserializationRuntimeConverter physicalConverter;
+    private final Map<String, DeserializationRuntimeConverter> 
tableRowConverters;
 
     public MongoDBConnectorDeserializationSchema(
             SeaTunnelDataType<SeaTunnelRow> physicalDataType,
             SeaTunnelDataType<SeaTunnelRow> resultTypeInfo) {
-        this.physicalConverter = createConverter(physicalDataType);
+        this.tableRowConverters = createConverter(physicalDataType);
         this.resultTypeInfo = resultTypeInfo;
     }
 
@@ -92,29 +94,44 @@ public class MongoDBConnectorDeserializationSchema
                         Objects.requireNonNull(
                                 extractBsonDocument(value, valueSchema, 
DOCUMENT_KEY)));
         BsonDocument fullDocument = extractBsonDocument(value, valueSchema, 
FULL_DOCUMENT);
+        String tableId = extractTableId(record);
+        DeserializationRuntimeConverter tableRowConverter;
+        if (tableId == null && tableRowConverters.size() == 1) {
+            tableRowConverter = tableRowConverters.values().iterator().next();
+        } else {
+            tableRowConverter = tableRowConverters.get(tableId);
+        }
+        if (tableRowConverter == null) {
+            log.debug("Ignore newly added table {}", tableId);
+            return;
+        }
 
         switch (op) {
             case INSERT:
-                SeaTunnelRow insert = extractRowData(fullDocument);
+                SeaTunnelRow insert = extractRowData(tableRowConverter, 
fullDocument);
                 insert.setRowKind(RowKind.INSERT);
+                insert.setTableId(tableId);
                 emit(record, insert, out);
                 break;
             case DELETE:
-                SeaTunnelRow delete = extractRowData(documentKey);
+                SeaTunnelRow delete = extractRowData(tableRowConverter, 
documentKey);
                 delete.setRowKind(RowKind.DELETE);
+                delete.setTableId(tableId);
                 emit(record, delete, out);
                 break;
             case UPDATE:
                 if (fullDocument == null) {
                     break;
                 }
-                SeaTunnelRow updateAfter = extractRowData(fullDocument);
+                SeaTunnelRow updateAfter = extractRowData(tableRowConverter, 
fullDocument);
                 updateAfter.setRowKind(RowKind.UPDATE_AFTER);
+                updateAfter.setTableId(tableId);
                 emit(record, updateAfter, out);
                 break;
             case REPLACE:
-                SeaTunnelRow replaceAfter = extractRowData(fullDocument);
+                SeaTunnelRow replaceAfter = extractRowData(tableRowConverter, 
fullDocument);
                 replaceAfter.setRowKind(RowKind.UPDATE_AFTER);
+                replaceAfter.setTableId(tableId);
                 emit(record, replaceAfter, out);
                 break;
             case INVALIDATE:
@@ -145,9 +162,15 @@ public class MongoDBConnectorDeserializationSchema
         collector.collect(physicalRow);
     }
 
-    private SeaTunnelRow extractRowData(BsonDocument document) {
+    private SeaTunnelRow extractRowData(
+            DeserializationRuntimeConverter tableRowConverter, BsonDocument 
document) {
         checkNotNull(document);
-        return (SeaTunnelRow) physicalConverter.convert(document);
+        return (SeaTunnelRow) tableRowConverter.convert(document);
+    }
+
+    private String extractTableId(SourceRecord record) {
+        // TODO extract table id from record
+        return null;
     }
 
     // 
-------------------------------------------------------------------------------------
@@ -159,17 +182,24 @@ public class MongoDBConnectorDeserializationSchema
         Object convert(BsonValue bsonValue);
     }
 
-    public DeserializationRuntimeConverter 
createConverter(SeaTunnelDataType<?> type) {
-        SerializableFunction<BsonValue, Object> internalRowConverter =
-                createNullSafeInternalConverter(type);
-        return new DeserializationRuntimeConverter() {
-            private static final long serialVersionUID = 1L;
-
-            @Override
-            public Object convert(BsonValue bsonValue) {
-                return internalRowConverter.apply(bsonValue);
-            }
-        };
+    public Map<String, DeserializationRuntimeConverter> createConverter(
+            SeaTunnelDataType<?> inputDataType) {
+        Map<String, DeserializationRuntimeConverter> tableRowConverters = new 
HashMap<>();
+        for (Map.Entry<String, SeaTunnelRowType> item : (MultipleRowType) 
inputDataType) {
+            SerializableFunction<BsonValue, Object> internalRowConverter =
+                    createNullSafeInternalConverter(item.getValue());
+            DeserializationRuntimeConverter itemRowConverter =
+                    new DeserializationRuntimeConverter() {
+                        private static final long serialVersionUID = 1L;
+
+                        @Override
+                        public Object convert(BsonValue bsonValue) {
+                            return internalRowConverter.apply(bsonValue);
+                        }
+                    };
+            tableRowConverters.put(item.getKey(), itemRowConverter);
+        }
+        return tableRowConverters;
     }
 
     private static SerializableFunction<BsonValue, Object> 
createNullSafeInternalConverter(
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index 90e76e835c..1ec94c3cfc 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -97,7 +97,7 @@ public class MySqlIncrementalSourceFactory implements 
TableSourceFactory {
                     CatalogTableUtil.getCatalogTables(
                             context.getOptions(), context.getClassLoader());
             SeaTunnelDataType<SeaTunnelRow> dataType =
-                    CatalogTableUtil.convertToDataType(catalogTables);
+                    CatalogTableUtil.convertToMultipleRowType(catalogTables);
             return (SeaTunnelSource<T, SplitT, StateT>)
                     new MySqlIncrementalSource<>(context.getOptions(), 
dataType, catalogTables);
         };
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
index 41623937af..6338d85aa2 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
@@ -102,7 +102,7 @@ public class SqlServerIncrementalSourceFactory implements 
TableSourceFactory {
                     CatalogTableUtil.getCatalogTables(
                             context.getOptions(), context.getClassLoader());
             SeaTunnelDataType<SeaTunnelRow> dataType =
-                    CatalogTableUtil.convertToDataType(catalogTables);
+                    CatalogTableUtil.convertToMultipleRowType(catalogTables);
             return new SqlServerIncrementalSource(context.getOptions(), 
dataType, catalogTables);
         };
     }
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index a035ec4caa..6b3519f536 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -324,6 +324,12 @@ public class MysqlCDCIT extends TestSuiteBase implements 
TestResource {
                                                                 
getSourceQuerySQL(
                                                                         
MYSQL_DATABASE2,
                                                                         
SOURCE_TABLE_2)))));
+
+        log.info("****************** container logs start ******************");
+        String containerLogs = container.getServerLogs();
+        log.info(containerLogs);
+        Assertions.assertFalse(containerLogs.contains("ERROR"));
+        log.info("****************** container logs end ******************");
     }
 
     private Connection getJdbcConnection() throws SQLException {

Reply via email to