This is an automated email from the ASF dual-hosted git repository.

ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 21fade0e2 [FLINK-35432][pipeline-connector][mysql] Support catch 
modify event in mysql to send AlterColumnTypeEvent. (#3352)
21fade0e2 is described below

commit 21fade0e22155b82fa9560afde8a55dfd3c65a53
Author: hk__lrzy <[email protected]>
AuthorDate: Thu Aug 8 14:38:17 2024 +0800

    [FLINK-35432][pipeline-connector][mysql] Support catch modify event in 
mysql to send AlterColumnTypeEvent. (#3352)
    
    
    Co-authored-by: haoke <[email protected]>
---
 .../parser/CustomAlterTableParserListener.java     | 26 ++++++++++++++++++++++
 .../mysql/source/MySqlPipelineITCase.java          |  8 +++++++
 2 files changed, 34 insertions(+)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
index cde5aa0c5..79583d83e 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
@@ -234,6 +234,32 @@ public class CustomAlterTableParserListener extends 
MySqlParserBaseListener {
         super.enterAlterByRenameColumn(ctx);
     }
 
+    @Override
+    public void 
enterAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) {
+        String oldColumnName = parser.parseName(ctx.uid(0));
+        ColumnEditor columnEditor = Column.editor().name(oldColumnName);
+        columnEditor.unsetDefaultValueExpression();
+
+        columnDefinitionListener =
+                new CustomColumnDefinitionParserListener(columnEditor, parser, 
listeners);
+        listeners.add(columnDefinitionListener);
+        super.enterAlterByModifyColumn(ctx);
+    }
+
+    @Override
+    public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext 
ctx) {
+        parser.runIfNotNull(
+                () -> {
+                    Column column = columnDefinitionListener.getColumn();
+                    Map<String, DataType> typeMapping = new HashMap<>();
+                    typeMapping.put(column.name(), fromDbzColumn(column));
+                    changes.add(new AlterColumnTypeEvent(currentTable, 
typeMapping));
+                    listeners.remove(columnDefinitionListener);
+                },
+                columnDefinitionListener);
+        super.exitAlterByModifyColumn(ctx);
+    }
+
     @Override
     public void exitAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext 
ctx) {
         parser.runIfNotNull(
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index 076e57364..bd2c059bb 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -554,6 +554,14 @@ public class MySqlPipelineITCase extends 
MySqlSourceTestBase {
                         inventoryDatabase.getDatabaseName()));
         expected.add(new RenameColumnEvent(tableId, 
Collections.singletonMap("desc1", "desc3")));
 
+        statement.execute(
+                String.format(
+                        "ALTER TABLE `%s`.`products` MODIFY COLUMN `DESC3` 
VARCHAR(255) NULL DEFAULT NULL;",
+                        inventoryDatabase.getDatabaseName()));
+        expected.add(
+                new AlterColumnTypeEvent(
+                        tableId, Collections.singletonMap("DESC3", 
DataTypes.VARCHAR(255))));
+
         statement.execute(
                 String.format(
                         "ALTER TABLE `%s`.`products` DROP COLUMN `DESC3`;",

Reply via email to