This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 37fcff347e [Bug][CDC] Fix state recovery error when switching a single
table to multiple tables (#5784)
37fcff347e is described below
commit 37fcff347e86656ba95b4efdffd95f654c878ce7
Author: hailin0 <[email protected]>
AuthorDate: Tue Nov 7 13:33:22 2023 +0800
[Bug][CDC] Fix state recovery error when switching a single table to
multiple tables (#5784)
---
.../api/table/catalog/CatalogTableUtil.java | 16 +++--
.../row/SeaTunnelRowDebeziumDeserializeSchema.java | 15 +++--
.../mongodb/MongodbIncrementalSourceFactory.java | 2 +-
.../MongoDBConnectorDeserializationSchema.java | 70 +++++++++++++++-------
.../source/MySqlIncrementalSourceFactory.java | 2 +-
.../source/SqlServerIncrementalSourceFactory.java | 2 +-
.../connectors/seatunnel/cdc/mysql/MysqlCDCIT.java | 6 ++
7 files changed, 78 insertions(+), 35 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
index c6a22e96c5..0439754914 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java
@@ -151,13 +151,17 @@ public class CatalogTableUtil implements Serializable {
if (catalogTables.size() == 1) {
return
catalogTables.get(0).getTableSchema().toPhysicalRowDataType();
} else {
- Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
- for (CatalogTable catalogTable : catalogTables) {
- String tableId =
catalogTable.getTableId().toTablePath().toString();
- rowTypeMap.put(tableId,
catalogTable.getTableSchema().toPhysicalRowDataType());
- }
- return new MultipleRowType(rowTypeMap);
+ return convertToMultipleRowType(catalogTables);
+ }
+ }
+
+ public static MultipleRowType convertToMultipleRowType(List<CatalogTable>
catalogTables) {
+ Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
+ for (CatalogTable catalogTable : catalogTables) {
+ String tableId =
catalogTable.getTableId().toTablePath().toString();
+ rowTypeMap.put(tableId,
catalogTable.getTableSchema().toPhysicalRowDataType());
}
+ return new MultipleRowType(rowTypeMap);
}
// We need to use buildWithConfig(String catalogName, ReadonlyConfig
readonlyConfig);
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
index ea0a3fc13e..3e86a6603d 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver;
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
import
org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverterFactory;
@@ -233,12 +234,14 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
@Override
public void restoreCheckpointProducedType(SeaTunnelDataType<SeaTunnelRow>
checkpointDataType) {
- if
(!checkpointDataType.getSqlType().equals(resultTypeInfo.getSqlType())) {
- throw new IllegalStateException(
- String.format(
- "The produced type %s of the SeaTunnel
deserialization schema "
- + "doesn't match the type %s of the
restored snapshot.",
- resultTypeInfo.getSqlType(),
checkpointDataType.getSqlType()));
+ if (SqlType.ROW.equals(checkpointDataType.getSqlType())
+ && SqlType.MULTIPLE_ROW.equals(resultTypeInfo.getSqlType())) {
+ // TODO: Older versions may have this issue
+ log.warn(
+ "Skip incompatible restore type. produced type: {},
checkpoint type: {}",
+ resultTypeInfo,
+ checkpointDataType);
+ return;
}
if (checkpointDataType instanceof MultipleRowType) {
MultipleRowType latestDataType = (MultipleRowType) resultTypeInfo;
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
index 761d36f8a2..07f85fa2a6 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java
@@ -77,7 +77,7 @@ public class MongodbIncrementalSourceFactory implements
TableSourceFactory {
CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
SeaTunnelDataType<SeaTunnelRow> dataType =
- CatalogTableUtil.convertToDataType(catalogTables);
+ CatalogTableUtil.convertToMultipleRowType(catalogTables);
return (SeaTunnelSource<T, SplitT, StateT>)
new MongodbIncrementalSource<>(context.getOptions(),
dataType, catalogTables);
};
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
index 4df666d2ad..8a72e0cd4c 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.MapType;
+import org.apache.seatunnel.api.table.type.MultipleRowType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -41,6 +42,7 @@ import org.bson.json.JsonWriterSettings;
import org.bson.types.Decimal128;
import com.mongodb.client.model.changestream.OperationType;
+import lombok.extern.slf4j.Slf4j;
import javax.annotation.Nonnull;
@@ -67,17 +69,17 @@ import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.Mongo
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument;
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
+@Slf4j
public class MongoDBConnectorDeserializationSchema
implements DebeziumDeserializationSchema<SeaTunnelRow> {
-
private final SeaTunnelDataType<SeaTunnelRow> resultTypeInfo;
- private final DeserializationRuntimeConverter physicalConverter;
+ private final Map<String, DeserializationRuntimeConverter>
tableRowConverters;
public MongoDBConnectorDeserializationSchema(
SeaTunnelDataType<SeaTunnelRow> physicalDataType,
SeaTunnelDataType<SeaTunnelRow> resultTypeInfo) {
- this.physicalConverter = createConverter(physicalDataType);
+ this.tableRowConverters = createConverter(physicalDataType);
this.resultTypeInfo = resultTypeInfo;
}
@@ -92,29 +94,44 @@ public class MongoDBConnectorDeserializationSchema
Objects.requireNonNull(
extractBsonDocument(value, valueSchema,
DOCUMENT_KEY)));
BsonDocument fullDocument = extractBsonDocument(value, valueSchema,
FULL_DOCUMENT);
+ String tableId = extractTableId(record);
+ DeserializationRuntimeConverter tableRowConverter;
+ if (tableId == null && tableRowConverters.size() == 1) {
+ tableRowConverter = tableRowConverters.values().iterator().next();
+ } else {
+ tableRowConverter = tableRowConverters.get(tableId);
+ }
+ if (tableRowConverter == null) {
+ log.debug("Ignore newly added table {}", tableId);
+ return;
+ }
switch (op) {
case INSERT:
- SeaTunnelRow insert = extractRowData(fullDocument);
+ SeaTunnelRow insert = extractRowData(tableRowConverter,
fullDocument);
insert.setRowKind(RowKind.INSERT);
+ insert.setTableId(tableId);
emit(record, insert, out);
break;
case DELETE:
- SeaTunnelRow delete = extractRowData(documentKey);
+ SeaTunnelRow delete = extractRowData(tableRowConverter,
documentKey);
delete.setRowKind(RowKind.DELETE);
+ delete.setTableId(tableId);
emit(record, delete, out);
break;
case UPDATE:
if (fullDocument == null) {
break;
}
- SeaTunnelRow updateAfter = extractRowData(fullDocument);
+ SeaTunnelRow updateAfter = extractRowData(tableRowConverter,
fullDocument);
updateAfter.setRowKind(RowKind.UPDATE_AFTER);
+ updateAfter.setTableId(tableId);
emit(record, updateAfter, out);
break;
case REPLACE:
- SeaTunnelRow replaceAfter = extractRowData(fullDocument);
+ SeaTunnelRow replaceAfter = extractRowData(tableRowConverter,
fullDocument);
replaceAfter.setRowKind(RowKind.UPDATE_AFTER);
+ replaceAfter.setTableId(tableId);
emit(record, replaceAfter, out);
break;
case INVALIDATE:
@@ -145,9 +162,15 @@ public class MongoDBConnectorDeserializationSchema
collector.collect(physicalRow);
}
- private SeaTunnelRow extractRowData(BsonDocument document) {
+ private SeaTunnelRow extractRowData(
+ DeserializationRuntimeConverter tableRowConverter, BsonDocument
document) {
checkNotNull(document);
- return (SeaTunnelRow) physicalConverter.convert(document);
+ return (SeaTunnelRow) tableRowConverter.convert(document);
+ }
+
+ private String extractTableId(SourceRecord record) {
+ // TODO extract table id from record
+ return null;
}
//
-------------------------------------------------------------------------------------
@@ -159,17 +182,24 @@ public class MongoDBConnectorDeserializationSchema
Object convert(BsonValue bsonValue);
}
- public DeserializationRuntimeConverter
createConverter(SeaTunnelDataType<?> type) {
- SerializableFunction<BsonValue, Object> internalRowConverter =
- createNullSafeInternalConverter(type);
- return new DeserializationRuntimeConverter() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Object convert(BsonValue bsonValue) {
- return internalRowConverter.apply(bsonValue);
- }
- };
+ public Map<String, DeserializationRuntimeConverter> createConverter(
+ SeaTunnelDataType<?> inputDataType) {
+ Map<String, DeserializationRuntimeConverter> tableRowConverters = new
HashMap<>();
+ for (Map.Entry<String, SeaTunnelRowType> item : (MultipleRowType)
inputDataType) {
+ SerializableFunction<BsonValue, Object> internalRowConverter =
+ createNullSafeInternalConverter(item.getValue());
+ DeserializationRuntimeConverter itemRowConverter =
+ new DeserializationRuntimeConverter() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object convert(BsonValue bsonValue) {
+ return internalRowConverter.apply(bsonValue);
+ }
+ };
+ tableRowConverters.put(item.getKey(), itemRowConverter);
+ }
+ return tableRowConverters;
}
private static SerializableFunction<BsonValue, Object>
createNullSafeInternalConverter(
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
index 90e76e835c..1ec94c3cfc 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java
@@ -97,7 +97,7 @@ public class MySqlIncrementalSourceFactory implements
TableSourceFactory {
CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
SeaTunnelDataType<SeaTunnelRow> dataType =
- CatalogTableUtil.convertToDataType(catalogTables);
+ CatalogTableUtil.convertToMultipleRowType(catalogTables);
return (SeaTunnelSource<T, SplitT, StateT>)
new MySqlIncrementalSource<>(context.getOptions(),
dataType, catalogTables);
};
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
index 41623937af..6338d85aa2 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java
@@ -102,7 +102,7 @@ public class SqlServerIncrementalSourceFactory implements
TableSourceFactory {
CatalogTableUtil.getCatalogTables(
context.getOptions(), context.getClassLoader());
SeaTunnelDataType<SeaTunnelRow> dataType =
- CatalogTableUtil.convertToDataType(catalogTables);
+ CatalogTableUtil.convertToMultipleRowType(catalogTables);
return new SqlServerIncrementalSource(context.getOptions(),
dataType, catalogTables);
};
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
index a035ec4caa..6b3519f536 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java
@@ -324,6 +324,12 @@ public class MysqlCDCIT extends TestSuiteBase implements
TestResource {
getSourceQuerySQL(
MYSQL_DATABASE2,
SOURCE_TABLE_2)))));
+
+ log.info("****************** container logs start ******************");
+ String containerLogs = container.getServerLogs();
+ log.info(containerLogs);
+ Assertions.assertFalse(containerLogs.contains("ERROR"));
+ log.info("****************** container logs end ******************");
}
private Connection getJdbcConnection() throws SQLException {