This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 32ff20461 [cdc] cdc schema change should add column comment (#3667)
32ff20461 is described below
commit 32ff2046161b8b02c7993bc5717f29dffd1be3a7
Author: wangkang <[email protected]>
AuthorDate: Mon Aug 12 08:15:43 2024 +0800
[cdc] cdc schema change should add column comment (#3667)
---
.../flink/action/cdc/mysql/MySqlRecordParser.java | 17 ++++++-
.../cdc/mysql/MySqlSyncTableActionITCase.java | 59 ++++++++++++++++++++++
.../src/test/resources/mysql/sync_table_setup.sql | 14 +++++
3 files changed, 88 insertions(+), 2 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
index 3f54db541..b87a92806 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
@@ -38,8 +38,10 @@ import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
+import com.ververica.cdc.debezium.table.DebeziumOptions;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.relational.Column;
+import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.Table;
import io.debezium.relational.history.TableChanges;
import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -74,7 +76,7 @@ public class MySqlRecordParser implements
FlatMapFunction<CdcSourceRecord, RichC
private final ZoneId serverTimeZone;
private final List<ComputedColumn> computedColumns;
private final TypeMapping typeMapping;
-
+ private final boolean isDebeziumSchemaCommentsEnabled;
private DebeziumEvent root;
// NOTE: current table name is not converted by tableNameConverter
@@ -96,6 +98,12 @@ public class MySqlRecordParser implements
FlatMapFunction<CdcSourceRecord, RichC
.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS,
true)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
String stringifyServerTimeZone =
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
+
+ this.isDebeziumSchemaCommentsEnabled =
+ mySqlConfig.getBoolean(
+ DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX
+ +
RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
+ false);
this.serverTimeZone =
stringifyServerTimeZone == null
? ZoneId.systemDefault()
@@ -174,7 +182,12 @@ public class MySqlRecordParser implements
FlatMapFunction<CdcSourceRecord, RichC
typeMapping);
dataType = dataType.copy(typeMapping.containsMode(TO_NULLABLE) ||
column.isOptional());
- rowType.field(column.name(), dataType);
+ // add column comment when we upgrade flink cdc to 2.4
+ if (isDebeziumSchemaCommentsEnabled) {
+ rowType.field(column.name(), dataType, column.comment());
+ } else {
+ rowType.field(column.name(), dataType);
+ }
}
return rowType.build().getFields();
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 9568b1c3b..7e11b541d 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -325,6 +325,65 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
waitForResult(expected, table, rowType, primaryKeys);
}
+ @Test
+ @Timeout(60)
+ public void testSchemaEvolutionWithComment() throws Exception {
+ Map<String, String> mySqlConfig = getBasicMySqlConfig();
+ mySqlConfig.put("database-name", DATABASE_NAME);
+ mySqlConfig.put("table-name", "schema_evolution_comment");
+ mySqlConfig.put("debezium.include.schema.comments", "true");
+
+ MySqlSyncTableAction action =
+ syncTableActionBuilder(mySqlConfig)
+ .withCatalogConfig(
+ Collections.singletonMap(
+ CatalogOptions.METASTORE.key(),
"test-alter-table"))
+ .withTableConfig(getBasicTableConfig())
+ .withPrimaryKeys("_id")
+ .build();
+ runActionWithDefaultEnv(action);
+
+ try (Statement statement = getStatement()) {
+ testSchemaEvolutionWithCommentImpl(statement);
+ }
+ }
+
+ private void testSchemaEvolutionWithCommentImpl(Statement statement)
throws Exception {
+ FileStoreTable table = getFileStoreTable();
+ statement.executeUpdate("USE " + DATABASE_NAME);
+ statement.executeUpdate("INSERT INTO schema_evolution_comment VALUES
(1, 'one')");
+
+ RowType rowType =
+ RowType.of(
+ new DataType[] {DataTypes.INT().notNull(),
DataTypes.VARCHAR(10)},
+ new String[] {"_id", "v1"});
+ List<String> primaryKeys = Collections.singletonList("_id");
+ List<String> expected = Collections.singletonList("+I[1, one]");
+ waitForResult(expected, table, rowType, primaryKeys);
+
+ statement.executeUpdate(
+ "ALTER TABLE schema_evolution_comment MODIFY COLUMN v1
VARCHAR(20) COMMENT 'v1-new'");
+ statement.executeUpdate("INSERT INTO schema_evolution_comment VALUES
(2, 'two')");
+
+ statement.executeUpdate(
+ "ALTER TABLE schema_evolution_comment ADD COLUMN v2 INT
COMMENT 'v2'");
+
+ statement.executeUpdate("INSERT INTO schema_evolution_comment VALUES
(3, 'three', 30)");
+ rowType =
+ RowType.of(
+ new DataType[] {
+ DataTypes.INT().notNull(), DataTypes.VARCHAR(20),
DataTypes.INT()
+ },
+ new String[] {"_id", "v1", "v2"});
+ expected = Arrays.asList("+I[1, one, NULL]", "+I[2, two, NULL]",
"+I[3, three, 30]");
+ waitForResult(expected, table, rowType, primaryKeys);
+
+ checkTableSchema(
+ "[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT
NULL\",\"description\":\"primary\"},"
+ +
"{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(20)\",\"description\":\"v1-new\"},"
+ +
"{\"id\":2,\"name\":\"v2\",\"type\":\"INT\",\"description\":\"v2\"}]");
+ }
+
@Test
@Timeout(90)
public void testAllTypes() throws Exception {
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
index 676185fb9..965f884ec 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
+++
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
@@ -49,6 +49,20 @@ CREATE TABLE schema_evolution_multiple (
PRIMARY KEY (_id)
);
+--
################################################################################
+-- MySqlSyncTableActionITCase
+--
################################################################################
+
+CREATE TABLE schema_evolution_comment (
+ _id INT comment 'primary',
+ v1 VARCHAR(10) comment 'v1',
+ PRIMARY KEY (_id)
+);
+
+--
################################################################################
+-- testAllTypes
+--
################################################################################
+
CREATE TABLE all_types_table (
_id INT,
pt DECIMAL(2, 1),