This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 8815f2b87 [FLINK-34865][pipeline-connector/mysql] Support sync newly 
added table's comment
8815f2b87 is described below

commit 8815f2b879fd48ad6211141b7f627eb1d143a02c
Author: North Lin <37775475+qg-...@users.noreply.github.com>
AuthorDate: Fri Jan 17 10:52:32 2025 +0800

    [FLINK-34865][pipeline-connector/mysql] Support sync newly added table's 
comment
    
    This closes #3869
---
 .../parser/CustomAlterTableParserListener.java     | 16 +++++
 .../mysql/source/MySqlPipelineITCase.java          | 70 ++++++++++++++++++++++
 2 files changed, 86 insertions(+)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
index 31ee183e5..af20c531c 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
@@ -122,6 +122,7 @@ public class CustomAlterTableParserListener extends 
MySqlParserBaseListener {
                     if (tableEditor.hasPrimaryKey()) {
                         
builder.primaryKey(tableEditor.primaryKeyColumnNames());
                     }
+                    builder.comment(tableEditor.create().comment());
                     changes.add(
                             new CreateTableEvent(
                                     toCdcTableId(tableEditor.tableId()), 
builder.build()));
@@ -413,6 +414,21 @@ public class CustomAlterTableParserListener extends 
MySqlParserBaseListener {
         super.exitDropTable(ctx);
     }
 
+    @Override
+    public void enterTableOptionComment(MySqlParser.TableOptionCommentContext 
ctx) {
+        if (!parser.skipComments()) {
+            parser.runIfNotNull(
+                    () -> {
+                        if (ctx.COMMENT() != null) {
+                            tableEditor.setComment(
+                                    
parser.withoutQuotes(ctx.STRING_LITERAL().getText()));
+                        }
+                    },
+                    tableEditor);
+        }
+        super.enterTableOptionComment(ctx);
+    }
+
     private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column 
dbzColumn) {
         return org.apache.flink.cdc.common.schema.Column.physicalColumn(
                 dbzColumn.name(),
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index 3e338a68c..7ef6cbeb1 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -79,6 +79,7 @@ import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOption
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.INCLUDE_COMMENTS_ENABLED;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
+import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SERVER_TIME_ZONE;
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
@@ -1076,6 +1077,75 @@ public class MySqlPipelineITCase extends 
MySqlSourceTestBase {
                 
actual.stream().map(Object::toString).collect(Collectors.toList()));
     }
 
+    @Test
+    public void testIncludeCommentsForScanBinlogNewlyAddedTableEnabled() 
throws Exception {
+        env.setParallelism(1);
+        inventoryDatabase.createAndInitialize();
+        TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), 
"products");
+        TableId newTableId =
+                TableId.tableId(inventoryDatabase.getDatabaseName(), 
"products_with_comments2");
+
+        Map<String, String> options = new HashMap<>();
+        options.put(HOSTNAME.key(), MYSQL8_CONTAINER.getHost());
+        options.put(PORT.key(), 
String.valueOf(MYSQL8_CONTAINER.getDatabasePort()));
+        options.put(USERNAME.key(), TEST_USER);
+        options.put(PASSWORD.key(), TEST_PASSWORD);
+        options.put(SERVER_TIME_ZONE.key(), "UTC");
+        options.put(INCLUDE_COMMENTS_ENABLED.key(), "true");
+        options.put(SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED.key(), "true");
+        options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + 
".products\\.*");
+        Factory.Context context =
+                new FactoryHelper.DefaultContext(
+                        Configuration.fromMap(options), null, 
this.getClass().getClassLoader());
+
+        MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
+        MySqlDataSource dataSource = (MySqlDataSource) 
factory.createDataSource(context);
+        FlinkSourceProvider sourceProvider =
+                (FlinkSourceProvider) dataSource.getEventSourceProvider();
+
+        CloseableIterator<Event> events =
+                env.fromSource(
+                                sourceProvider.getSource(),
+                                WatermarkStrategy.noWatermarks(),
+                                MySqlDataSourceFactory.IDENTIFIER,
+                                new EventTypeInfo())
+                        .executeAndCollect();
+        Thread.sleep(5_000);
+
+        String createTableSql =
+                String.format(
+                        "CREATE TABLE IF NOT EXISTS `%s`.`%s` (\n"
+                                + "  id INTEGER NOT NULL AUTO_INCREMENT 
COMMENT 'column comment of id' PRIMARY KEY,\n"
+                                + "  name VARCHAR(255) NOT NULL DEFAULT 
'flink' COMMENT 'column comment of name',\n"
+                                + "  weight FLOAT COMMENT 'column comment of 
weight'\n"
+                                + ")\n"
+                                + "COMMENT 'table comment of products';",
+                        inventoryDatabase.getDatabaseName(), 
"products_with_comments2");
+        executeSql(inventoryDatabase, createTableSql);
+
+        // add some column
+        String addColumnSql =
+                String.format(
+                        "ALTER TABLE `%s`.`products_with_comments2` ADD COLUMN 
`description` VARCHAR(512) comment 'column comment of description';",
+                        inventoryDatabase.getDatabaseName());
+        executeSql(inventoryDatabase, addColumnSql);
+
+        List<Event> expectedEvents = new ArrayList<>();
+        CreateTableEvent productCreateTableEvent = 
getProductsCreateTableEvent(tableId);
+        expectedEvents.add(productCreateTableEvent);
+        // generate snapshot data
+        List<Event> productExpectedSnapshot = getSnapshotExpected(tableId);
+        expectedEvents.addAll(productExpectedSnapshot);
+
+        List<Event> newTableExpectedEvents = getEventsWithComments(newTableId);
+        expectedEvents.addAll(newTableExpectedEvents);
+
+        List<Event> actual = fetchResults(events, expectedEvents.size());
+        assertEqualsInAnyOrder(
+                
expectedEvents.stream().map(Object::toString).collect(Collectors.toList()),
+                
actual.stream().map(Object::toString).collect(Collectors.toList()));
+    }
+
     private void executeSql(UniqueDatabase database, String sql) throws 
SQLException {
         try (Connection connection = database.getJdbcConnection();
                 Statement statement = connection.createStatement()) {

Reply via email to