This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 32ff20461 [cdc] cdc schema change should add column comment (#3667)
32ff20461 is described below

commit 32ff2046161b8b02c7993bc5717f29dffd1be3a7
Author: wangkang <[email protected]>
AuthorDate: Mon Aug 12 08:15:43 2024 +0800

    [cdc] cdc schema change should add column comment (#3667)
---
 .../flink/action/cdc/mysql/MySqlRecordParser.java  | 17 ++++++-
 .../cdc/mysql/MySqlSyncTableActionITCase.java      | 59 ++++++++++++++++++++++
 .../src/test/resources/mysql/sync_table_setup.sql  | 14 +++++
 3 files changed, 88 insertions(+), 2 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
index 3f54db541..b87a92806 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/mysql/MySqlRecordParser.java
@@ -38,8 +38,10 @@ import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceOptions;
+import com.ververica.cdc.debezium.table.DebeziumOptions;
 import io.debezium.connector.AbstractSourceInfo;
 import io.debezium.relational.Column;
+import io.debezium.relational.RelationalDatabaseConnectorConfig;
 import io.debezium.relational.Table;
 import io.debezium.relational.history.TableChanges;
 import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -74,7 +76,7 @@ public class MySqlRecordParser implements 
FlatMapFunction<CdcSourceRecord, RichC
     private final ZoneId serverTimeZone;
     private final List<ComputedColumn> computedColumns;
     private final TypeMapping typeMapping;
-
+    private final boolean isDebeziumSchemaCommentsEnabled;
     private DebeziumEvent root;
 
     // NOTE: current table name is not converted by tableNameConverter
@@ -96,6 +98,12 @@ public class MySqlRecordParser implements 
FlatMapFunction<CdcSourceRecord, RichC
                 .configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, 
true)
                 .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false);
         String stringifyServerTimeZone = 
mySqlConfig.get(MySqlSourceOptions.SERVER_TIME_ZONE);
+
+        this.isDebeziumSchemaCommentsEnabled =
+                mySqlConfig.getBoolean(
+                        DebeziumOptions.DEBEZIUM_OPTIONS_PREFIX
+                                + 
RelationalDatabaseConnectorConfig.INCLUDE_SCHEMA_COMMENTS.name(),
+                        false);
         this.serverTimeZone =
                 stringifyServerTimeZone == null
                         ? ZoneId.systemDefault()
@@ -174,7 +182,12 @@ public class MySqlRecordParser implements 
FlatMapFunction<CdcSourceRecord, RichC
                             typeMapping);
             dataType = dataType.copy(typeMapping.containsMode(TO_NULLABLE) || 
column.isOptional());
 
-            rowType.field(column.name(), dataType);
+            // add column comment when we upgrade flink cdc to 2.4
+            if (isDebeziumSchemaCommentsEnabled) {
+                rowType.field(column.name(), dataType, column.comment());
+            } else {
+                rowType.field(column.name(), dataType);
+            }
         }
         return rowType.build().getFields();
     }
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 9568b1c3b..7e11b541d 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -325,6 +325,65 @@ public class MySqlSyncTableActionITCase extends 
MySqlActionITCaseBase {
         waitForResult(expected, table, rowType, primaryKeys);
     }
 
+    @Test
+    @Timeout(60)
+    public void testSchemaEvolutionWithComment() throws Exception {
+        Map<String, String> mySqlConfig = getBasicMySqlConfig();
+        mySqlConfig.put("database-name", DATABASE_NAME);
+        mySqlConfig.put("table-name", "schema_evolution_comment");
+        mySqlConfig.put("debezium.include.schema.comments", "true");
+
+        MySqlSyncTableAction action =
+                syncTableActionBuilder(mySqlConfig)
+                        .withCatalogConfig(
+                                Collections.singletonMap(
+                                        CatalogOptions.METASTORE.key(), 
"test-alter-table"))
+                        .withTableConfig(getBasicTableConfig())
+                        .withPrimaryKeys("_id")
+                        .build();
+        runActionWithDefaultEnv(action);
+
+        try (Statement statement = getStatement()) {
+            testSchemaEvolutionWithCommentImpl(statement);
+        }
+    }
+
+    private void testSchemaEvolutionWithCommentImpl(Statement statement) 
throws Exception {
+        FileStoreTable table = getFileStoreTable();
+        statement.executeUpdate("USE " + DATABASE_NAME);
+        statement.executeUpdate("INSERT INTO schema_evolution_comment VALUES 
(1, 'one')");
+
+        RowType rowType =
+                RowType.of(
+                        new DataType[] {DataTypes.INT().notNull(), 
DataTypes.VARCHAR(10)},
+                        new String[] {"_id", "v1"});
+        List<String> primaryKeys = Collections.singletonList("_id");
+        List<String> expected = Collections.singletonList("+I[1, one]");
+        waitForResult(expected, table, rowType, primaryKeys);
+
+        statement.executeUpdate(
+                "ALTER TABLE schema_evolution_comment MODIFY COLUMN v1 
VARCHAR(20) COMMENT 'v1-new'");
+        statement.executeUpdate("INSERT INTO schema_evolution_comment VALUES 
(2, 'two')");
+
+        statement.executeUpdate(
+                "ALTER TABLE schema_evolution_comment ADD COLUMN v2 INT 
COMMENT 'v2'");
+
+        statement.executeUpdate("INSERT INTO schema_evolution_comment VALUES 
(3, 'three', 30)");
+        rowType =
+                RowType.of(
+                        new DataType[] {
+                            DataTypes.INT().notNull(), DataTypes.VARCHAR(20), 
DataTypes.INT()
+                        },
+                        new String[] {"_id", "v1", "v2"});
+        expected = Arrays.asList("+I[1, one, NULL]", "+I[2, two, NULL]", 
"+I[3, three, 30]");
+        waitForResult(expected, table, rowType, primaryKeys);
+
+        checkTableSchema(
+                "[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT 
NULL\",\"description\":\"primary\"},"
+                        + 
"{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(20)\",\"description\":\"v1-new\"},"
+                        + 
"{\"id\":2,\"name\":\"v2\",\"type\":\"INT\",\"description\":\"v2\"}]");
+    }
+
     @Test
     @Timeout(90)
     public void testAllTypes() throws Exception {
diff --git 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
index 676185fb9..965f884ec 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
+++ 
b/paimon-flink/paimon-flink-cdc/src/test/resources/mysql/sync_table_setup.sql
@@ -49,6 +49,20 @@ CREATE TABLE schema_evolution_multiple (
     PRIMARY KEY (_id)
 );
 
+-- 
################################################################################
+--  MySqlSyncTableActionITCase
+-- 
################################################################################
+
+CREATE TABLE schema_evolution_comment (
+    _id INT comment 'primary',
+    v1 VARCHAR(10) comment 'v1',
+    PRIMARY KEY (_id)
+);
+
+-- 
################################################################################
+--  testAllTypes
+-- 
################################################################################
+
 CREATE TABLE all_types_table (
     _id INT,
     pt DECIMAL(2, 1),

Reply via email to