This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/release-3.2 by this push:
     new 6ef4734e2 [hotfix][mysql] Fix primary key restraints missing when 
using inline `PRIMARY KEY` declaration syntax
6ef4734e2 is described below

commit 6ef4734e2e6999df20f491901c7c71d34fedbe31
Author: yuxiqian <[email protected]>
AuthorDate: Wed Aug 28 21:44:48 2024 +0800

    [hotfix][mysql] Fix primary key restraints missing when using inline 
`PRIMARY KEY` declaration syntax
    
    This closes #3582
---
 .../parser/CustomAlterTableParserListener.java     | 123 +++++++-
 .../CustomColumnDefinitionParserListener.java      |   6 +
 .../mysql/source/MySqlPipelineITCase.java          | 316 +++++++++++++++++++++
 3 files changed, 440 insertions(+), 5 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
index 79583d83e..8d86895b2 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
@@ -19,9 +19,11 @@ package org.apache.flink.cdc.connectors.mysql.source.parser;
 
 import org.apache.flink.cdc.common.event.AddColumnEvent;
 import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.DropColumnEvent;
 import org.apache.flink.cdc.common.event.RenameColumnEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.DataType;
 
 import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
@@ -30,6 +32,7 @@ import io.debezium.ddl.parser.mysql.generated.MySqlParser;
 import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
 import io.debezium.relational.Column;
 import io.debezium.relational.ColumnEditor;
+import io.debezium.relational.TableEditor;
 import io.debezium.relational.TableId;
 import org.antlr.v4.runtime.tree.ParseTreeListener;
 import org.slf4j.Logger;
@@ -41,6 +44,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils.fromDbzColumn;
 
@@ -58,6 +62,7 @@ public class CustomAlterTableParserListener extends 
MySqlParserBaseListener {
     private List<ColumnEditor> columnEditors;
 
     private CustomColumnDefinitionParserListener columnDefinitionListener;
+    private TableEditor tableEditor;
 
     private int parsingColumnIndex = STARTING_INDEX;
 
@@ -70,6 +75,109 @@ public class CustomAlterTableParserListener extends 
MySqlParserBaseListener {
         this.changes = changes;
     }
 
+    @Override
+    public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext 
ctx) {
+        TableId tableId = 
parser.parseQualifiedTableId(ctx.tableName().fullId());
+        if (parser.databaseTables().forTable(tableId) == null) {
+            tableEditor = parser.databaseTables().editOrCreateTable(tableId);
+        }
+        super.enterColumnCreateTable(ctx);
+    }
+
+    @Override
+    public void exitColumnCreateTable(MySqlParser.ColumnCreateTableContext 
ctx) {
+        parser.runIfNotNull(
+                () -> {
+                    // Make sure that the table's character set has been set 
...
+                    if (!tableEditor.hasDefaultCharsetName()) {
+                        tableEditor.setDefaultCharsetName(
+                                parser.charsetForTable(tableEditor.tableId()));
+                    }
+                    listeners.remove(columnDefinitionListener);
+                    columnDefinitionListener = null;
+                    // remove column definition parser listener
+                    final String defaultCharsetName = 
tableEditor.create().defaultCharsetName();
+                    tableEditor.setColumns(
+                            tableEditor.columns().stream()
+                                    .map(
+                                            column -> {
+                                                final ColumnEditor 
columnEditor = column.edit();
+                                                if 
(columnEditor.charsetNameOfTable() == null) {
+                                                    
columnEditor.charsetNameOfTable(
+                                                            
defaultCharsetName);
+                                                }
+                                                return columnEditor;
+                                            })
+                                    .map(ColumnEditor::create)
+                                    .collect(Collectors.toList()));
+                    
parser.databaseTables().overwriteTable(tableEditor.create());
+                    parser.signalCreateTable(tableEditor.tableId(), ctx);
+
+                    Schema.Builder builder = Schema.newBuilder();
+                    tableEditor.columns().forEach(column -> 
builder.column(toCdcColumn(column)));
+                    if (tableEditor.hasPrimaryKey()) {
+                        
builder.primaryKey(tableEditor.primaryKeyColumnNames());
+                    }
+                    changes.add(
+                            new CreateTableEvent(
+                                    toCdcTableId(tableEditor.tableId()), 
builder.build()));
+                },
+                tableEditor);
+        super.exitColumnCreateTable(ctx);
+    }
+
+    @Override
+    public void enterColumnDeclaration(MySqlParser.ColumnDeclarationContext 
ctx) {
+        parser.runIfNotNull(
+                () -> {
+                    String columnName = parser.parseName(ctx.uid());
+                    ColumnEditor columnEditor = 
Column.editor().name(columnName);
+                    if (columnDefinitionListener == null) {
+                        columnDefinitionListener =
+                                new CustomColumnDefinitionParserListener(
+                                        tableEditor, columnEditor, parser, 
listeners);
+                        listeners.add(columnDefinitionListener);
+                    } else {
+                        columnDefinitionListener.setColumnEditor(columnEditor);
+                    }
+                },
+                tableEditor);
+        super.enterColumnDeclaration(ctx);
+    }
+
+    @Override
+    public void exitColumnDeclaration(MySqlParser.ColumnDeclarationContext 
ctx) {
+        parser.runIfNotNull(
+                () -> {
+                    
tableEditor.addColumn(columnDefinitionListener.getColumn());
+                },
+                tableEditor,
+                columnDefinitionListener);
+        super.exitColumnDeclaration(ctx);
+    }
+
+    @Override
+    public void 
enterPrimaryKeyTableConstraint(MySqlParser.PrimaryKeyTableConstraintContext 
ctx) {
+        parser.runIfNotNull(
+                () -> {
+                    
parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor);
+                },
+                tableEditor);
+        super.enterPrimaryKeyTableConstraint(ctx);
+    }
+
+    @Override
+    public void 
enterUniqueKeyTableConstraint(MySqlParser.UniqueKeyTableConstraintContext ctx) {
+        parser.runIfNotNull(
+                () -> {
+                    if (!tableEditor.hasPrimaryKey()) {
+                        
parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor);
+                    }
+                },
+                tableEditor);
+        super.enterUniqueKeyTableConstraint(ctx);
+    }
+
     @Override
     public void enterAlterTable(MySqlParser.AlterTableContext ctx) {
         this.currentTable = 
toCdcTableId(parser.parseQualifiedTableId(ctx.tableName().fullId()));
@@ -88,7 +196,8 @@ public class CustomAlterTableParserListener extends 
MySqlParserBaseListener {
         String columnName = parser.parseName(ctx.uid(0));
         ColumnEditor columnEditor = Column.editor().name(columnName);
         columnDefinitionListener =
-                new CustomColumnDefinitionParserListener(columnEditor, parser, 
listeners);
+                new CustomColumnDefinitionParserListener(
+                        tableEditor, columnEditor, parser, listeners);
         listeners.add(columnDefinitionListener);
         super.exitAlterByAddColumn(ctx);
     }
@@ -140,7 +249,8 @@ public class CustomAlterTableParserListener extends 
MySqlParserBaseListener {
             columnEditors.add(Column.editor().name(columnName));
         }
         columnDefinitionListener =
-                new CustomColumnDefinitionParserListener(columnEditors.get(0), 
parser, listeners);
+                new CustomColumnDefinitionParserListener(
+                        tableEditor, columnEditors.get(0), parser, listeners);
         listeners.add(columnDefinitionListener);
         super.enterAlterByAddColumns(ctx);
     }
@@ -190,7 +300,8 @@ public class CustomAlterTableParserListener extends 
MySqlParserBaseListener {
         columnEditor.unsetDefaultValueExpression();
 
         columnDefinitionListener =
-                new CustomColumnDefinitionParserListener(columnEditor, parser, 
listeners);
+                new CustomColumnDefinitionParserListener(
+                        tableEditor, columnEditor, parser, listeners);
         listeners.add(columnDefinitionListener);
         super.enterAlterByChangeColumn(ctx);
     }
@@ -229,7 +340,8 @@ public class CustomAlterTableParserListener extends 
MySqlParserBaseListener {
         String oldColumnName = parser.parseName(ctx.oldColumn);
         ColumnEditor columnEditor = Column.editor().name(oldColumnName);
         columnDefinitionListener =
-                new CustomColumnDefinitionParserListener(columnEditor, parser, 
listeners);
+                new CustomColumnDefinitionParserListener(
+                        tableEditor, columnEditor, parser, listeners);
         listeners.add(columnDefinitionListener);
         super.enterAlterByRenameColumn(ctx);
     }
@@ -241,7 +353,8 @@ public class CustomAlterTableParserListener extends 
MySqlParserBaseListener {
         columnEditor.unsetDefaultValueExpression();
 
         columnDefinitionListener =
-                new CustomColumnDefinitionParserListener(columnEditor, parser, 
listeners);
+                new CustomColumnDefinitionParserListener(
+                        tableEditor, columnEditor, parser, listeners);
         listeners.add(columnDefinitionListener);
         super.enterAlterByModifyColumn(ctx);
     }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java
index 591836418..e886580b3 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java
@@ -25,6 +25,7 @@ import io.debezium.ddl.parser.mysql.generated.MySqlParser;
 import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
 import io.debezium.relational.Column;
 import io.debezium.relational.ColumnEditor;
+import io.debezium.relational.TableEditor;
 import io.debezium.relational.ddl.DataType;
 import io.debezium.util.Strings;
 import org.antlr.v4.runtime.tree.ParseTreeListener;
@@ -50,13 +51,16 @@ public class CustomColumnDefinitionParserListener extends 
MySqlParserBaseListene
     private boolean uniqueColumn;
     private AtomicReference<Boolean> optionalColumn = new AtomicReference<>();
     private DefaultValueParserListener defaultValueListener;
+    private final TableEditor tableEditor;
 
     private final List<ParseTreeListener> listeners;
 
     public CustomColumnDefinitionParserListener(
+            TableEditor tableEditor,
             ColumnEditor columnEditor,
             MySqlAntlrDdlParser parser,
             List<ParseTreeListener> listeners) {
+        this.tableEditor = tableEditor;
         this.columnEditor = columnEditor;
         this.parser = parser;
         this.dataTypeResolver = parser.dataTypeResolver();
@@ -106,6 +110,8 @@ public class CustomColumnDefinitionParserListener extends 
MySqlParserBaseListene
         // this rule will be parsed only if no primary key is set in a table
         // otherwise the statement can't be executed due to multiple primary 
key error
         optionalColumn.set(Boolean.FALSE);
+        tableEditor.addColumn(columnEditor.create());
+        tableEditor.setPrimaryKeyNames(columnEditor.name());
         super.enterPrimaryKeyColumnConstraint(ctx);
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index f019e98d8..8f613e0ef 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -62,6 +62,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED;
@@ -372,6 +373,281 @@ public class MySqlPipelineITCase extends 
MySqlSourceTestBase {
         assertThat(actual).isEqualTo(expected);
     }
 
+    @Test
+    public void testSchemaChangeEvents() throws Exception {
+        env.setParallelism(1);
+        inventoryDatabase.createAndInitialize();
+        MySqlSourceConfigFactory configFactory =
+                new MySqlSourceConfigFactory()
+                        .hostname(MYSQL8_CONTAINER.getHost())
+                        .port(MYSQL8_CONTAINER.getDatabasePort())
+                        .username(TEST_USER)
+                        .password(TEST_PASSWORD)
+                        .databaseList(inventoryDatabase.getDatabaseName())
+                        .tableList(inventoryDatabase.getDatabaseName() + ".*")
+                        .startupOptions(StartupOptions.latest())
+                        .serverId(getServerId(env.getParallelism()))
+                        .serverTimeZone("UTC")
+                        
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());
+
+        FlinkSourceProvider sourceProvider =
+                (FlinkSourceProvider) new 
MySqlDataSource(configFactory).getEventSourceProvider();
+        CloseableIterator<Event> events =
+                env.fromSource(
+                                sourceProvider.getSource(),
+                                WatermarkStrategy.noWatermarks(),
+                                MySqlDataSourceFactory.IDENTIFIER,
+                                new EventTypeInfo())
+                        .executeAndCollect();
+        Thread.sleep(5_000);
+
+        List<Event> expected =
+                new ArrayList<>(
+                        
getInventoryCreateAllTableEvents(inventoryDatabase.getDatabaseName()));
+
+        try (Connection connection = inventoryDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+
+            statement.execute(
+                    String.format(
+                            "ALTER TABLE `%s`.`customers` ADD COLUMN `newcol1` 
INT NULL;",
+                            inventoryDatabase.getDatabaseName()));
+            expected.add(
+                    new AddColumnEvent(
+                            
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
+                            Collections.singletonList(
+                                    new AddColumnEvent.ColumnWithPosition(
+                                            Column.physicalColumn("newcol1", 
DataTypes.INT())))));
+
+            // Test MODIFY COLUMN DDL
+            statement.execute(
+                    String.format(
+                            "ALTER TABLE `%s`.`customers` MODIFY COLUMN 
`newcol1` DOUBLE;",
+                            inventoryDatabase.getDatabaseName()));
+
+            expected.add(
+                    new AlterColumnTypeEvent(
+                            
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
+                            Collections.singletonMap("newcol1", 
DataTypes.DOUBLE())));
+
+            // Test CHANGE COLUMN DDL
+            statement.execute(
+                    String.format(
+                            "ALTER TABLE `%s`.`customers` CHANGE COLUMN 
`newcol1` `newcol2` INT;",
+                            inventoryDatabase.getDatabaseName()));
+
+            expected.add(
+                    new AlterColumnTypeEvent(
+                            
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
+                            Collections.singletonMap("newcol1", 
DataTypes.INT())));
+
+            expected.add(
+                    new RenameColumnEvent(
+                            
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
+                            Collections.singletonMap("newcol1", "newcol2")));
+
+            statement.execute(
+                    String.format(
+                            "ALTER TABLE `%s`.`customers` CHANGE COLUMN 
`newcol2` `newcol1` DOUBLE;",
+                            inventoryDatabase.getDatabaseName()));
+
+            expected.add(
+                    new AlterColumnTypeEvent(
+                            
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
+                            Collections.singletonMap("newcol2", 
DataTypes.DOUBLE())));
+
+            expected.add(
+                    new RenameColumnEvent(
+                            
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
+                            Collections.singletonMap("newcol2", "newcol1")));
+
+            // Test create table DDL
+            statement.execute(
+                    String.format(
+                            "CREATE TABLE `%s`.`newlyAddedTable1`("
+                                    + "    id                   SERIAL,\n"
+                                    + "    tiny_c               TINYINT,\n"
+                                    + "    tiny_un_c            TINYINT 
UNSIGNED,\n"
+                                    + "    tiny_un_z_c          TINYINT 
UNSIGNED ZEROFILL,\n"
+                                    + "    small_c              SMALLINT,\n"
+                                    + "    small_un_c           SMALLINT 
UNSIGNED,\n"
+                                    + "    small_un_z_c         SMALLINT 
UNSIGNED ZEROFILL,\n"
+                                    + "    medium_c             MEDIUMINT,\n"
+                                    + "    medium_un_c          MEDIUMINT 
UNSIGNED,\n"
+                                    + "    medium_un_z_c        MEDIUMINT 
UNSIGNED ZEROFILL,\n"
+                                    + "    int_c                INTEGER,\n"
+                                    + "    int_un_c             INTEGER 
UNSIGNED,\n"
+                                    + "    int_un_z_c           INTEGER 
UNSIGNED ZEROFILL,\n"
+                                    + "    int11_c              INT(11),\n"
+                                    + "    big_c                BIGINT,\n"
+                                    + "    big_un_c             BIGINT 
UNSIGNED,\n"
+                                    + "    big_un_z_c           BIGINT 
UNSIGNED ZEROFILL,\n"
+                                    + "    varchar_c            
VARCHAR(255),\n"
+                                    + "    char_c               CHAR(3),\n"
+                                    + "    real_c               REAL,\n"
+                                    + "    float_c              FLOAT,\n"
+                                    + "    float_un_c           FLOAT 
UNSIGNED,\n"
+                                    + "    float_un_z_c         FLOAT UNSIGNED 
ZEROFILL,\n"
+                                    + "    double_c             DOUBLE,\n"
+                                    + "    double_un_c          DOUBLE 
UNSIGNED,\n"
+                                    + "    double_un_z_c        DOUBLE 
UNSIGNED ZEROFILL,\n"
+                                    + "    decimal_c            DECIMAL(8, 
4),\n"
+                                    + "    decimal_un_c         DECIMAL(8, 4) 
UNSIGNED,\n"
+                                    + "    decimal_un_z_c       DECIMAL(8, 4) 
UNSIGNED ZEROFILL,\n"
+                                    + "    numeric_c            NUMERIC(6, 
0),\n"
+                                    + "    big_decimal_c        DECIMAL(65, 
1),\n"
+                                    + "    bit1_c               BIT,\n"
+                                    + "    bit3_c               BIT(3),\n"
+                                    + "    tiny1_c              TINYINT(1),\n"
+                                    + "    boolean_c            BOOLEAN,\n"
+                                    + "    file_uuid            BINARY(16),\n"
+                                    + "    bit_c                BIT(64),\n"
+                                    + "    text_c               TEXT,\n"
+                                    + "    tiny_blob_c          TINYBLOB,\n"
+                                    + "    blob_c               BLOB,\n"
+                                    + "    medium_blob_c        MEDIUMBLOB,\n"
+                                    + "    long_blob_c          LONGBLOB,\n"
+                                    + "    enum_c               enum('red', 
'white'),\n"
+                                    + "    json_c               JSON,\n"
+                                    + "    point_c              POINT,\n"
+                                    + "    geometry_c           GEOMETRY,\n"
+                                    + "    linestring_c         LINESTRING,\n"
+                                    + "    polygon_c            POLYGON,\n"
+                                    + "    multipoint_c         MULTIPOINT,\n"
+                                    + "    multiline_c          
MULTILINESTRING,\n"
+                                    + "    multipolygon_c       
MULTIPOLYGON,\n"
+                                    + "    geometrycollection_c 
GEOMETRYCOLLECTION,"
+                                    + "    year_c               YEAR,\n"
+                                    + "    date_c               DATE,\n"
+                                    + "    time_c               TIME(0),\n"
+                                    + "    time_3_c             TIME(3),\n"
+                                    + "    time_6_c             TIME(6),\n"
+                                    + "    datetime_c           DATETIME(0),\n"
+                                    + "    datetime3_c          DATETIME(3),\n"
+                                    + "    datetime6_c          DATETIME(6),\n"
+                                    + "    decimal_c0           DECIMAL(6, 
2),\n"
+                                    + "    decimal_c1           DECIMAL(9, 
4),\n"
+                                    + "    decimal_c2           DECIMAL(20, 
4),\n"
+                                    + "    timestamp_c          TIMESTAMP(0) 
NULL,\n"
+                                    + "    timestamp3_c         TIMESTAMP(3) 
NULL,\n"
+                                    + "    timestamp6_c         TIMESTAMP(6) 
NULL,"
+                                    + "primary key(id));",
+                            inventoryDatabase.getDatabaseName()));
+
+            expected.add(
+                    new CreateTableEvent(
+                            TableId.tableId(
+                                    inventoryDatabase.getDatabaseName(), 
"newlyAddedTable1"),
+                            Schema.newBuilder()
+                                    .physicalColumn("id", 
DataTypes.DECIMAL(20, 0).notNull())
+                                    .physicalColumn("tiny_c", 
DataTypes.TINYINT())
+                                    .physicalColumn("tiny_un_c", 
DataTypes.SMALLINT())
+                                    .physicalColumn("tiny_un_z_c", 
DataTypes.SMALLINT())
+                                    .physicalColumn("small_c", 
DataTypes.SMALLINT())
+                                    .physicalColumn("small_un_c", 
DataTypes.INT())
+                                    .physicalColumn("small_un_z_c", 
DataTypes.INT())
+                                    .physicalColumn("medium_c", 
DataTypes.INT())
+                                    .physicalColumn("medium_un_c", 
DataTypes.INT())
+                                    .physicalColumn("medium_un_z_c", 
DataTypes.INT())
+                                    .physicalColumn("int_c", DataTypes.INT())
+                                    .physicalColumn("int_un_c", 
DataTypes.BIGINT())
+                                    .physicalColumn("int_un_z_c", 
DataTypes.BIGINT())
+                                    .physicalColumn("int11_c", DataTypes.INT())
+                                    .physicalColumn("big_c", 
DataTypes.BIGINT())
+                                    .physicalColumn("big_un_c", 
DataTypes.DECIMAL(20, 0))
+                                    .physicalColumn("big_un_z_c", 
DataTypes.DECIMAL(20, 0))
+                                    .physicalColumn("varchar_c", 
DataTypes.VARCHAR(255))
+                                    .physicalColumn("char_c", 
DataTypes.CHAR(3))
+                                    .physicalColumn("real_c", 
DataTypes.DOUBLE())
+                                    .physicalColumn("float_c", 
DataTypes.FLOAT())
+                                    .physicalColumn("float_un_c", 
DataTypes.FLOAT())
+                                    .physicalColumn("float_un_z_c", 
DataTypes.FLOAT())
+                                    .physicalColumn("double_c", 
DataTypes.DOUBLE())
+                                    .physicalColumn("double_un_c", 
DataTypes.DOUBLE())
+                                    .physicalColumn("double_un_z_c", 
DataTypes.DOUBLE())
+                                    .physicalColumn("decimal_c", 
DataTypes.DECIMAL(8, 4))
+                                    .physicalColumn("decimal_un_c", 
DataTypes.DECIMAL(8, 4))
+                                    .physicalColumn("decimal_un_z_c", 
DataTypes.DECIMAL(8, 4))
+                                    .physicalColumn("numeric_c", 
DataTypes.DECIMAL(6, 0))
+                                    .physicalColumn("big_decimal_c", 
DataTypes.STRING())
+                                    .physicalColumn("bit1_c", 
DataTypes.BOOLEAN())
+                                    .physicalColumn("bit3_c", 
DataTypes.BINARY(1))
+                                    .physicalColumn("tiny1_c", 
DataTypes.BOOLEAN())
+                                    .physicalColumn("boolean_c", 
DataTypes.BOOLEAN())
+                                    .physicalColumn("file_uuid", 
DataTypes.BINARY(16))
+                                    .physicalColumn("bit_c", 
DataTypes.BINARY(8))
+                                    .physicalColumn("text_c", 
DataTypes.STRING())
+                                    .physicalColumn("tiny_blob_c", 
DataTypes.BYTES())
+                                    .physicalColumn("blob_c", 
DataTypes.BYTES())
+                                    .physicalColumn("medium_blob_c", 
DataTypes.BYTES())
+                                    .physicalColumn("long_blob_c", 
DataTypes.BYTES())
+                                    .physicalColumn("enum_c", 
DataTypes.STRING())
+                                    .physicalColumn("json_c", 
DataTypes.STRING())
+                                    .physicalColumn("point_c", 
DataTypes.STRING())
+                                    .physicalColumn("geometry_c", 
DataTypes.STRING())
+                                    .physicalColumn("linestring_c", 
DataTypes.STRING())
+                                    .physicalColumn("polygon_c", 
DataTypes.STRING())
+                                    .physicalColumn("multipoint_c", 
DataTypes.STRING())
+                                    .physicalColumn("multiline_c", 
DataTypes.STRING())
+                                    .physicalColumn("multipolygon_c", 
DataTypes.STRING())
+                                    .physicalColumn("geometrycollection_c", 
DataTypes.STRING())
+                                    .physicalColumn("year_c", DataTypes.INT())
+                                    .physicalColumn("date_c", DataTypes.DATE())
+                                    .physicalColumn("time_c", 
DataTypes.TIME(0))
+                                    .physicalColumn("time_3_c", 
DataTypes.TIME(3))
+                                    .physicalColumn("time_6_c", 
DataTypes.TIME(6))
+                                    .physicalColumn("datetime_c", 
DataTypes.TIMESTAMP(0))
+                                    .physicalColumn("datetime3_c", 
DataTypes.TIMESTAMP(3))
+                                    .physicalColumn("datetime6_c", 
DataTypes.TIMESTAMP(6))
+                                    .physicalColumn("decimal_c0", 
DataTypes.DECIMAL(6, 2))
+                                    .physicalColumn("decimal_c1", 
DataTypes.DECIMAL(9, 4))
+                                    .physicalColumn("decimal_c2", 
DataTypes.DECIMAL(20, 4))
+                                    .physicalColumn("timestamp_c", 
DataTypes.TIMESTAMP_LTZ(0))
+                                    .physicalColumn("timestamp3_c", 
DataTypes.TIMESTAMP_LTZ(3))
+                                    .physicalColumn("timestamp6_c", 
DataTypes.TIMESTAMP_LTZ(6))
+                                    .primaryKey("id")
+                                    .build()));
+
+            // Test create table DDL with inline primary key
+            statement.execute(
+                    String.format(
+                            "CREATE TABLE `%s`.`newlyAddedTable2`(id SERIAL 
PRIMARY KEY);",
+                            inventoryDatabase.getDatabaseName()));
+            expected.add(
+                    new CreateTableEvent(
+                            TableId.tableId(
+                                    inventoryDatabase.getDatabaseName(), 
"newlyAddedTable2"),
+                            Schema.newBuilder()
+                                    .physicalColumn("id", 
DataTypes.DECIMAL(20, 0).notNull())
+                                    .primaryKey("id")
+                                    .build()));
+
+            // Test create table DDL with multiple primary keys
+            statement.execute(
+                    String.format(
+                            "CREATE TABLE `%s`.`newlyAddedTable3`("
+                                    + "id SERIAL,"
+                                    + "name VARCHAR(17),"
+                                    + "notes TEXT,"
+                                    + "PRIMARY KEY (id, name));",
+                            inventoryDatabase.getDatabaseName()));
+            expected.add(
+                    new CreateTableEvent(
+                            TableId.tableId(
+                                    inventoryDatabase.getDatabaseName(), 
"newlyAddedTable3"),
+                            Schema.newBuilder()
+                                    .physicalColumn("id", 
DataTypes.DECIMAL(20, 0).notNull())
+                                    .physicalColumn("name", 
DataTypes.VARCHAR(17).notNull())
+                                    .physicalColumn("notes", 
DataTypes.STRING())
+                                    .primaryKey("id", "name")
+                                    .build()));
+        }
+        List<Event> actual = fetchResults(events, expected.size());
+        assertEqualsInAnyOrder(
+                
expected.stream().map(Object::toString).collect(Collectors.toList()),
+                
actual.stream().map(Object::toString).collect(Collectors.toList()));
+    }
+
     private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
         return new CreateTableEvent(
                 tableId,
@@ -384,6 +660,46 @@ public class MySqlPipelineITCase extends 
MySqlSourceTestBase {
                         .build());
     }
 
+    private List<CreateTableEvent> getInventoryCreateAllTableEvents(String 
databaseName) {
+        return Arrays.asList(
+                new CreateTableEvent(
+                        TableId.tableId(databaseName, "products"),
+                        Schema.newBuilder()
+                                .physicalColumn("id", 
DataTypes.INT().notNull())
+                                .physicalColumn("name", 
DataTypes.VARCHAR(255).notNull(), "flink")
+                                .physicalColumn("description", 
DataTypes.VARCHAR(512))
+                                .physicalColumn("weight", DataTypes.FLOAT())
+                                .primaryKey(Collections.singletonList("id"))
+                                .build()),
+                new CreateTableEvent(
+                        TableId.tableId(databaseName, "customers"),
+                        Schema.newBuilder()
+                                .physicalColumn("id", 
DataTypes.INT().notNull())
+                                .physicalColumn("first_name", 
DataTypes.VARCHAR(255).notNull())
+                                .physicalColumn("last_name", 
DataTypes.VARCHAR(255).notNull())
+                                .physicalColumn("email", 
DataTypes.VARCHAR(255).notNull())
+                                .primaryKey(Collections.singletonList("id"))
+                                .build()),
+                new CreateTableEvent(
+                        TableId.tableId(databaseName, "orders"),
+                        Schema.newBuilder()
+                                .physicalColumn("order_number", 
DataTypes.INT().notNull())
+                                .physicalColumn("order_date", 
DataTypes.DATE().notNull())
+                                .physicalColumn("purchaser", 
DataTypes.INT().notNull())
+                                .physicalColumn("quantity", 
DataTypes.INT().notNull())
+                                .physicalColumn("product_id", 
DataTypes.INT().notNull())
+                                
.primaryKey(Collections.singletonList("order_number"))
+                                .build()),
+                new CreateTableEvent(
+                        TableId.tableId(databaseName, "multi_max_table"),
+                        Schema.newBuilder()
+                                .physicalColumn("order_id", 
DataTypes.VARCHAR(128).notNull())
+                                .physicalColumn("index", 
DataTypes.INT().notNull())
+                                .physicalColumn("desc", 
DataTypes.VARCHAR(512).notNull())
+                                .primaryKey(Arrays.asList("order_id", "index"))
+                                .build()));
+    }
+
     private List<Event> getSnapshotExpected(TableId tableId) {
         RowType rowType =
                 RowType.of(


Reply via email to