This is an automated email from the ASF dual-hosted git repository.
ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 21fade0e2 [FLINK-35432][pipeline-connector][mysql] Support catch
modify event in mysql to send AlterColumnTypeEvent. (#3352)
21fade0e2 is described below
commit 21fade0e22155b82fa9560afde8a55dfd3c65a53
Author: hk__lrzy <[email protected]>
AuthorDate: Thu Aug 8 14:38:17 2024 +0800
[FLINK-35432][pipeline-connector][mysql] Support catch modify event in
mysql to send AlterColumnTypeEvent. (#3352)
Co-authored-by: haoke <[email protected]>
---
.../parser/CustomAlterTableParserListener.java | 26 ++++++++++++++++++++++
.../mysql/source/MySqlPipelineITCase.java | 8 +++++++
2 files changed, 34 insertions(+)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
index cde5aa0c5..79583d83e 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
@@ -234,6 +234,32 @@ public class CustomAlterTableParserListener extends
MySqlParserBaseListener {
super.enterAlterByRenameColumn(ctx);
}
+ @Override
+ public void
enterAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx) {
+ String oldColumnName = parser.parseName(ctx.uid(0));
+ ColumnEditor columnEditor = Column.editor().name(oldColumnName);
+ columnEditor.unsetDefaultValueExpression();
+
+ columnDefinitionListener =
+ new CustomColumnDefinitionParserListener(columnEditor, parser,
listeners);
+ listeners.add(columnDefinitionListener);
+ super.enterAlterByModifyColumn(ctx);
+ }
+
+ @Override
+ public void exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext
ctx) {
+ parser.runIfNotNull(
+ () -> {
+ Column column = columnDefinitionListener.getColumn();
+ Map<String, DataType> typeMapping = new HashMap<>();
+ typeMapping.put(column.name(), fromDbzColumn(column));
+ changes.add(new AlterColumnTypeEvent(currentTable,
typeMapping));
+ listeners.remove(columnDefinitionListener);
+ },
+ columnDefinitionListener);
+ super.exitAlterByModifyColumn(ctx);
+ }
+
@Override
public void exitAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext
ctx) {
parser.runIfNotNull(
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index 076e57364..bd2c059bb 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -554,6 +554,14 @@ public class MySqlPipelineITCase extends
MySqlSourceTestBase {
inventoryDatabase.getDatabaseName()));
expected.add(new RenameColumnEvent(tableId,
Collections.singletonMap("desc1", "desc3")));
+ statement.execute(
+ String.format(
+ "ALTER TABLE `%s`.`products` MODIFY COLUMN `DESC3`
VARCHAR(255) NULL DEFAULT NULL;",
+ inventoryDatabase.getDatabaseName()));
+ expected.add(
+ new AlterColumnTypeEvent(
+ tableId, Collections.singletonMap("DESC3",
DataTypes.VARCHAR(255))));
+
statement.execute(
String.format(
"ALTER TABLE `%s`.`products` DROP COLUMN `DESC3`;",