This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e9a176b0 [hotfix][pipeline-connector/mysql] Fix primary key 
restraints missing when using inline `PRIMARY KEY` declaration syntax
0e9a176b0 is described below

commit 0e9a176b086629a87e5bb72505382d076dc8b754
Author: yuxiqian <[email protected]>
AuthorDate: Wed Aug 28 18:24:48 2024 +0800

    [hotfix][pipeline-connector/mysql] Fix primary key restraints missing when 
using inline `PRIMARY KEY` declaration syntax
    
    This closes  #3579.
---
 .../parser/CustomAlterTableParserListener.java     | 34 +++++++++++++---------
 .../CustomColumnDefinitionParserListener.java      |  6 ++++
 .../mysql/source/MySqlPipelineITCase.java          | 34 ++++++++++++++++++++++
 3 files changed, 61 insertions(+), 13 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
index eb30fe80d..3b30b3c49 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
@@ -81,8 +81,8 @@ public class CustomAlterTableParserListener extends 
MySqlParserBaseListener {
         TableId tableId = 
parser.parseQualifiedTableId(ctx.tableName().fullId());
         if (parser.databaseTables().forTable(tableId) == null) {
             tableEditor = parser.databaseTables().editOrCreateTable(tableId);
-            super.enterColumnCreateTable(ctx);
         }
+        super.enterColumnCreateTable(ctx);
     }
 
     @Override
@@ -113,14 +113,17 @@ public class CustomAlterTableParserListener extends 
MySqlParserBaseListener {
                                     .collect(Collectors.toList()));
                     
parser.databaseTables().overwriteTable(tableEditor.create());
                     parser.signalCreateTable(tableEditor.tableId(), ctx);
+
+                    Schema.Builder builder = Schema.newBuilder();
+                    tableEditor.columns().forEach(column -> 
builder.column(toCdcColumn(column)));
+                    if (tableEditor.hasPrimaryKey()) {
+                        
builder.primaryKey(tableEditor.primaryKeyColumnNames());
+                    }
+                    changes.add(
+                            new CreateTableEvent(
+                                    toCdcTableId(tableEditor.tableId()), 
builder.build()));
                 },
                 tableEditor);
-        Schema.Builder builder = Schema.newBuilder();
-        tableEditor.columns().forEach(column -> 
builder.column(toCdcColumn(column)));
-        if (tableEditor.hasPrimaryKey()) {
-            builder.primaryKey(tableEditor.primaryKeyColumnNames());
-        }
-        changes.add(new CreateTableEvent(toCdcTableId(tableEditor.tableId()), 
builder.build()));
         super.exitColumnCreateTable(ctx);
     }
 
@@ -133,7 +136,7 @@ public class CustomAlterTableParserListener extends 
MySqlParserBaseListener {
                     if (columnDefinitionListener == null) {
                         columnDefinitionListener =
                                 new CustomColumnDefinitionParserListener(
-                                        columnEditor, parser, listeners);
+                                        tableEditor, columnEditor, parser, 
listeners);
                         listeners.add(columnDefinitionListener);
                     } else {
                         columnDefinitionListener.setColumnEditor(columnEditor);
@@ -194,7 +197,8 @@ public class CustomAlterTableParserListener extends 
MySqlParserBaseListener {
         String columnName = parser.parseName(ctx.uid(0));
         ColumnEditor columnEditor = Column.editor().name(columnName);
         columnDefinitionListener =
-                new CustomColumnDefinitionParserListener(columnEditor, parser, 
listeners);
+                new CustomColumnDefinitionParserListener(
+                        tableEditor, columnEditor, parser, listeners);
         listeners.add(columnDefinitionListener);
         super.exitAlterByAddColumn(ctx);
     }
@@ -246,7 +250,8 @@ public class CustomAlterTableParserListener extends 
MySqlParserBaseListener {
             columnEditors.add(Column.editor().name(columnName));
         }
         columnDefinitionListener =
-                new CustomColumnDefinitionParserListener(columnEditors.get(0), 
parser, listeners);
+                new CustomColumnDefinitionParserListener(
+                        tableEditor, columnEditors.get(0), parser, listeners);
         listeners.add(columnDefinitionListener);
         super.enterAlterByAddColumns(ctx);
     }
@@ -296,7 +301,8 @@ public class CustomAlterTableParserListener extends 
MySqlParserBaseListener {
         columnEditor.unsetDefaultValueExpression();
 
         columnDefinitionListener =
-                new CustomColumnDefinitionParserListener(columnEditor, parser, 
listeners);
+                new CustomColumnDefinitionParserListener(
+                        tableEditor, columnEditor, parser, listeners);
         listeners.add(columnDefinitionListener);
         super.enterAlterByChangeColumn(ctx);
     }
@@ -335,7 +341,8 @@ public class CustomAlterTableParserListener extends 
MySqlParserBaseListener {
         String oldColumnName = parser.parseName(ctx.oldColumn);
         ColumnEditor columnEditor = Column.editor().name(oldColumnName);
         columnDefinitionListener =
-                new CustomColumnDefinitionParserListener(columnEditor, parser, 
listeners);
+                new CustomColumnDefinitionParserListener(
+                        tableEditor, columnEditor, parser, listeners);
         listeners.add(columnDefinitionListener);
         super.enterAlterByRenameColumn(ctx);
     }
@@ -347,7 +354,8 @@ public class CustomAlterTableParserListener extends 
MySqlParserBaseListener {
         columnEditor.unsetDefaultValueExpression();
 
         columnDefinitionListener =
-                new CustomColumnDefinitionParserListener(columnEditor, parser, 
listeners);
+                new CustomColumnDefinitionParserListener(
+                        tableEditor, columnEditor, parser, listeners);
         listeners.add(columnDefinitionListener);
         super.enterAlterByModifyColumn(ctx);
     }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java
index 591836418..e886580b3 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java
@@ -25,6 +25,7 @@ import io.debezium.ddl.parser.mysql.generated.MySqlParser;
 import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
 import io.debezium.relational.Column;
 import io.debezium.relational.ColumnEditor;
+import io.debezium.relational.TableEditor;
 import io.debezium.relational.ddl.DataType;
 import io.debezium.util.Strings;
 import org.antlr.v4.runtime.tree.ParseTreeListener;
@@ -50,13 +51,16 @@ public class CustomColumnDefinitionParserListener extends 
MySqlParserBaseListene
     private boolean uniqueColumn;
     private AtomicReference<Boolean> optionalColumn = new AtomicReference<>();
     private DefaultValueParserListener defaultValueListener;
+    private final TableEditor tableEditor;
 
     private final List<ParseTreeListener> listeners;
 
     public CustomColumnDefinitionParserListener(
+            TableEditor tableEditor,
             ColumnEditor columnEditor,
             MySqlAntlrDdlParser parser,
             List<ParseTreeListener> listeners) {
+        this.tableEditor = tableEditor;
         this.columnEditor = columnEditor;
         this.parser = parser;
         this.dataTypeResolver = parser.dataTypeResolver();
@@ -106,6 +110,8 @@ public class CustomColumnDefinitionParserListener extends 
MySqlParserBaseListene
         // this rule will be parsed only if no primary key is set in a table
         // otherwise the statement can't be executed due to multiple primary 
key error
         optionalColumn.set(Boolean.FALSE);
+        tableEditor.addColumn(columnEditor.create());
+        tableEditor.setPrimaryKeyNames(columnEditor.name());
         super.enterPrimaryKeyColumnConstraint(ctx);
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index 09b2a308a..82f7b5e7b 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -648,6 +648,40 @@ public class MySqlPipelineITCase extends 
MySqlSourceTestBase {
                                     .physicalColumn("timestamp6_c", 
DataTypes.TIMESTAMP_LTZ(6))
                                     .primaryKey("id")
                                     .build()));
+
+            // Test create table DDL with inline primary key
+            statement.execute(
+                    String.format(
+                            "CREATE TABLE `%s`.`newlyAddedTable2`(id SERIAL 
PRIMARY KEY);",
+                            inventoryDatabase.getDatabaseName()));
+            expected.add(
+                    new CreateTableEvent(
+                            TableId.tableId(
+                                    inventoryDatabase.getDatabaseName(), 
"newlyAddedTable2"),
+                            Schema.newBuilder()
+                                    .physicalColumn("id", 
DataTypes.DECIMAL(20, 0).notNull())
+                                    .primaryKey("id")
+                                    .build()));
+
+            // Test create table DDL with multiple primary keys
+            statement.execute(
+                    String.format(
+                            "CREATE TABLE `%s`.`newlyAddedTable3`("
+                                    + "id SERIAL,"
+                                    + "name VARCHAR(17),"
+                                    + "notes TEXT,"
+                                    + "PRIMARY KEY (id, name));",
+                            inventoryDatabase.getDatabaseName()));
+            expected.add(
+                    new CreateTableEvent(
+                            TableId.tableId(
+                                    inventoryDatabase.getDatabaseName(), 
"newlyAddedTable3"),
+                            Schema.newBuilder()
+                                    .physicalColumn("id", 
DataTypes.DECIMAL(20, 0).notNull())
+                                    .physicalColumn("name", 
DataTypes.VARCHAR(17).notNull())
+                                    .physicalColumn("notes", 
DataTypes.STRING())
+                                    .primaryKey("id", "name")
+                                    .build()));
         }
         List<Event> actual = fetchResults(events, expected.size());
         assertEqualsInAnyOrder(

Reply via email to