This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c1517f70 [FLINK-36148][pipeline-connector/mysql] Fix that newly added 
table can not discovered by adding custom parser for CreateTableEvent
3c1517f70 is described below

commit 3c1517f70f728e1ef8a4b6bed942d87d66d874df
Author: Kunni <[email protected]>
AuthorDate: Tue Aug 27 15:29:52 2024 +0800

    [FLINK-36148][pipeline-connector/mysql] Fix that newly added table can not 
discovered by adding custom parser for CreateTableEvent
    
    This closes #3570.
---
 .../parser/CustomAlterTableParserListener.java     | 106 ++++++++++++++-
 .../mysql/source/MySqlPipelineITCase.java          | 147 +++++++++++++++++++++
 .../source/MysqlPipelineNewlyAddedTableITCase.java |  15 ++-
 3 files changed, 263 insertions(+), 5 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
index cdb5983a5..eb30fe80d 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
@@ -19,11 +19,13 @@ package org.apache.flink.cdc.connectors.mysql.source.parser;
 
 import org.apache.flink.cdc.common.event.AddColumnEvent;
 import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.DropColumnEvent;
 import org.apache.flink.cdc.common.event.DropTableEvent;
 import org.apache.flink.cdc.common.event.RenameColumnEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TruncateTableEvent;
+import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.DataType;
 
 import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
@@ -32,6 +34,7 @@ import io.debezium.ddl.parser.mysql.generated.MySqlParser;
 import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
 import io.debezium.relational.Column;
 import io.debezium.relational.ColumnEditor;
+import io.debezium.relational.TableEditor;
 import io.debezium.relational.TableId;
 import org.antlr.v4.runtime.tree.ParseTreeListener;
 import org.slf4j.Logger;
@@ -43,6 +46,7 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import static 
org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils.fromDbzColumn;
 
@@ -58,8 +62,8 @@ public class CustomAlterTableParserListener extends 
MySqlParserBaseListener {
     private final LinkedList<SchemaChangeEvent> changes;
     private org.apache.flink.cdc.common.event.TableId currentTable;
     private List<ColumnEditor> columnEditors;
-
     private CustomColumnDefinitionParserListener columnDefinitionListener;
+    private TableEditor tableEditor;
 
     private int parsingColumnIndex = STARTING_INDEX;
 
@@ -72,6 +76,106 @@ public class CustomAlterTableParserListener extends 
MySqlParserBaseListener {
         this.changes = changes;
     }
 
+    @Override
+    public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext 
ctx) {
+        TableId tableId = 
parser.parseQualifiedTableId(ctx.tableName().fullId());
+        if (parser.databaseTables().forTable(tableId) == null) {
+            tableEditor = parser.databaseTables().editOrCreateTable(tableId);
+            super.enterColumnCreateTable(ctx);
+        }
+    }
+
+    @Override
+    public void exitColumnCreateTable(MySqlParser.ColumnCreateTableContext 
ctx) {
+        parser.runIfNotNull(
+                () -> {
+                    // Make sure that the table's character set has been set 
...
+                    if (!tableEditor.hasDefaultCharsetName()) {
+                        tableEditor.setDefaultCharsetName(
+                                parser.charsetForTable(tableEditor.tableId()));
+                    }
+                    listeners.remove(columnDefinitionListener);
+                    columnDefinitionListener = null;
+                    // remove column definition parser listener
+                    final String defaultCharsetName = 
tableEditor.create().defaultCharsetName();
+                    tableEditor.setColumns(
+                            tableEditor.columns().stream()
+                                    .map(
+                                            column -> {
+                                                final ColumnEditor 
columnEditor = column.edit();
+                                                if 
(columnEditor.charsetNameOfTable() == null) {
+                                                    
columnEditor.charsetNameOfTable(
+                                                            
defaultCharsetName);
+                                                }
+                                                return columnEditor;
+                                            })
+                                    .map(ColumnEditor::create)
+                                    .collect(Collectors.toList()));
+                    
parser.databaseTables().overwriteTable(tableEditor.create());
+                    parser.signalCreateTable(tableEditor.tableId(), ctx);
+                },
+                tableEditor);
+        Schema.Builder builder = Schema.newBuilder();
+        tableEditor.columns().forEach(column -> 
builder.column(toCdcColumn(column)));
+        if (tableEditor.hasPrimaryKey()) {
+            builder.primaryKey(tableEditor.primaryKeyColumnNames());
+        }
+        changes.add(new CreateTableEvent(toCdcTableId(tableEditor.tableId()), 
builder.build()));
+        super.exitColumnCreateTable(ctx);
+    }
+
+    @Override
+    public void enterColumnDeclaration(MySqlParser.ColumnDeclarationContext 
ctx) {
+        parser.runIfNotNull(
+                () -> {
+                    String columnName = parser.parseName(ctx.uid());
+                    ColumnEditor columnEditor = 
Column.editor().name(columnName);
+                    if (columnDefinitionListener == null) {
+                        columnDefinitionListener =
+                                new CustomColumnDefinitionParserListener(
+                                        columnEditor, parser, listeners);
+                        listeners.add(columnDefinitionListener);
+                    } else {
+                        columnDefinitionListener.setColumnEditor(columnEditor);
+                    }
+                },
+                tableEditor);
+        super.enterColumnDeclaration(ctx);
+    }
+
+    @Override
+    public void exitColumnDeclaration(MySqlParser.ColumnDeclarationContext 
ctx) {
+        parser.runIfNotNull(
+                () -> {
+                    
tableEditor.addColumn(columnDefinitionListener.getColumn());
+                },
+                tableEditor,
+                columnDefinitionListener);
+        super.exitColumnDeclaration(ctx);
+    }
+
+    @Override
+    public void 
enterPrimaryKeyTableConstraint(MySqlParser.PrimaryKeyTableConstraintContext 
ctx) {
+        parser.runIfNotNull(
+                () -> {
+                    
parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor);
+                },
+                tableEditor);
+        super.enterPrimaryKeyTableConstraint(ctx);
+    }
+
+    @Override
+    public void 
enterUniqueKeyTableConstraint(MySqlParser.UniqueKeyTableConstraintContext ctx) {
+        parser.runIfNotNull(
+                () -> {
+                    if (!tableEditor.hasPrimaryKey()) {
+                        
parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor);
+                    }
+                },
+                tableEditor);
+        super.enterUniqueKeyTableConstraint(ctx);
+    }
+
     @Override
     public void enterAlterTable(MySqlParser.AlterTableContext ctx) {
         this.currentTable = 
toCdcTableId(parser.parseQualifiedTableId(ctx.tableName().fullId()));
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index 108a3d36d..09b2a308a 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -501,6 +501,153 @@ public class MySqlPipelineITCase extends 
MySqlSourceTestBase {
             expected.add(
                     new DropTableEvent(
                             
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers")));
+
+            // Test create table DDL
+            statement.execute(
+                    String.format(
+                            "CREATE TABLE `%s`.`newlyAddedTable1`("
+                                    + "    id                   SERIAL,\n"
+                                    + "    tiny_c               TINYINT,\n"
+                                    + "    tiny_un_c            TINYINT 
UNSIGNED,\n"
+                                    + "    tiny_un_z_c          TINYINT 
UNSIGNED ZEROFILL,\n"
+                                    + "    small_c              SMALLINT,\n"
+                                    + "    small_un_c           SMALLINT 
UNSIGNED,\n"
+                                    + "    small_un_z_c         SMALLINT 
UNSIGNED ZEROFILL,\n"
+                                    + "    medium_c             MEDIUMINT,\n"
+                                    + "    medium_un_c          MEDIUMINT 
UNSIGNED,\n"
+                                    + "    medium_un_z_c        MEDIUMINT 
UNSIGNED ZEROFILL,\n"
+                                    + "    int_c                INTEGER,\n"
+                                    + "    int_un_c             INTEGER 
UNSIGNED,\n"
+                                    + "    int_un_z_c           INTEGER 
UNSIGNED ZEROFILL,\n"
+                                    + "    int11_c              INT(11),\n"
+                                    + "    big_c                BIGINT,\n"
+                                    + "    big_un_c             BIGINT 
UNSIGNED,\n"
+                                    + "    big_un_z_c           BIGINT 
UNSIGNED ZEROFILL,\n"
+                                    + "    varchar_c            
VARCHAR(255),\n"
+                                    + "    char_c               CHAR(3),\n"
+                                    + "    real_c               REAL,\n"
+                                    + "    float_c              FLOAT,\n"
+                                    + "    float_un_c           FLOAT 
UNSIGNED,\n"
+                                    + "    float_un_z_c         FLOAT UNSIGNED 
ZEROFILL,\n"
+                                    + "    double_c             DOUBLE,\n"
+                                    + "    double_un_c          DOUBLE 
UNSIGNED,\n"
+                                    + "    double_un_z_c        DOUBLE 
UNSIGNED ZEROFILL,\n"
+                                    + "    decimal_c            DECIMAL(8, 
4),\n"
+                                    + "    decimal_un_c         DECIMAL(8, 4) 
UNSIGNED,\n"
+                                    + "    decimal_un_z_c       DECIMAL(8, 4) 
UNSIGNED ZEROFILL,\n"
+                                    + "    numeric_c            NUMERIC(6, 
0),\n"
+                                    + "    big_decimal_c        DECIMAL(65, 
1),\n"
+                                    + "    bit1_c               BIT,\n"
+                                    + "    bit3_c               BIT(3),\n"
+                                    + "    tiny1_c              TINYINT(1),\n"
+                                    + "    boolean_c            BOOLEAN,\n"
+                                    + "    file_uuid            BINARY(16),\n"
+                                    + "    bit_c                BIT(64),\n"
+                                    + "    text_c               TEXT,\n"
+                                    + "    tiny_blob_c          TINYBLOB,\n"
+                                    + "    blob_c               BLOB,\n"
+                                    + "    medium_blob_c        MEDIUMBLOB,\n"
+                                    + "    long_blob_c          LONGBLOB,\n"
+                                    + "    enum_c               enum('red', 
'white'),\n"
+                                    + "    json_c               JSON,\n"
+                                    + "    point_c              POINT,\n"
+                                    + "    geometry_c           GEOMETRY,\n"
+                                    + "    linestring_c         LINESTRING,\n"
+                                    + "    polygon_c            POLYGON,\n"
+                                    + "    multipoint_c         MULTIPOINT,\n"
+                                    + "    multiline_c          
MULTILINESTRING,\n"
+                                    + "    multipolygon_c       
MULTIPOLYGON,\n"
+                                    + "    geometrycollection_c 
GEOMETRYCOLLECTION,"
+                                    + "    year_c               YEAR,\n"
+                                    + "    date_c               DATE,\n"
+                                    + "    time_c               TIME(0),\n"
+                                    + "    time_3_c             TIME(3),\n"
+                                    + "    time_6_c             TIME(6),\n"
+                                    + "    datetime_c           DATETIME(0),\n"
+                                    + "    datetime3_c          DATETIME(3),\n"
+                                    + "    datetime6_c          DATETIME(6),\n"
+                                    + "    decimal_c0           DECIMAL(6, 
2),\n"
+                                    + "    decimal_c1           DECIMAL(9, 
4),\n"
+                                    + "    decimal_c2           DECIMAL(20, 
4),\n"
+                                    + "    timestamp_c          TIMESTAMP(0) 
NULL,\n"
+                                    + "    timestamp3_c         TIMESTAMP(3) 
NULL,\n"
+                                    + "    timestamp6_c         TIMESTAMP(6) 
NULL,"
+                                    + "primary key(id));",
+                            inventoryDatabase.getDatabaseName()));
+
+            expected.add(
+                    new CreateTableEvent(
+                            TableId.tableId(
+                                    inventoryDatabase.getDatabaseName(), 
"newlyAddedTable1"),
+                            Schema.newBuilder()
+                                    .physicalColumn("id", 
DataTypes.DECIMAL(20, 0).notNull())
+                                    .physicalColumn("tiny_c", 
DataTypes.TINYINT())
+                                    .physicalColumn("tiny_un_c", 
DataTypes.SMALLINT())
+                                    .physicalColumn("tiny_un_z_c", 
DataTypes.SMALLINT())
+                                    .physicalColumn("small_c", 
DataTypes.SMALLINT())
+                                    .physicalColumn("small_un_c", 
DataTypes.INT())
+                                    .physicalColumn("small_un_z_c", 
DataTypes.INT())
+                                    .physicalColumn("medium_c", 
DataTypes.INT())
+                                    .physicalColumn("medium_un_c", 
DataTypes.INT())
+                                    .physicalColumn("medium_un_z_c", 
DataTypes.INT())
+                                    .physicalColumn("int_c", DataTypes.INT())
+                                    .physicalColumn("int_un_c", 
DataTypes.BIGINT())
+                                    .physicalColumn("int_un_z_c", 
DataTypes.BIGINT())
+                                    .physicalColumn("int11_c", DataTypes.INT())
+                                    .physicalColumn("big_c", 
DataTypes.BIGINT())
+                                    .physicalColumn("big_un_c", 
DataTypes.DECIMAL(20, 0))
+                                    .physicalColumn("big_un_z_c", 
DataTypes.DECIMAL(20, 0))
+                                    .physicalColumn("varchar_c", 
DataTypes.VARCHAR(255))
+                                    .physicalColumn("char_c", 
DataTypes.CHAR(3))
+                                    .physicalColumn("real_c", 
DataTypes.DOUBLE())
+                                    .physicalColumn("float_c", 
DataTypes.FLOAT())
+                                    .physicalColumn("float_un_c", 
DataTypes.FLOAT())
+                                    .physicalColumn("float_un_z_c", 
DataTypes.FLOAT())
+                                    .physicalColumn("double_c", 
DataTypes.DOUBLE())
+                                    .physicalColumn("double_un_c", 
DataTypes.DOUBLE())
+                                    .physicalColumn("double_un_z_c", 
DataTypes.DOUBLE())
+                                    .physicalColumn("decimal_c", 
DataTypes.DECIMAL(8, 4))
+                                    .physicalColumn("decimal_un_c", 
DataTypes.DECIMAL(8, 4))
+                                    .physicalColumn("decimal_un_z_c", 
DataTypes.DECIMAL(8, 4))
+                                    .physicalColumn("numeric_c", 
DataTypes.DECIMAL(6, 0))
+                                    .physicalColumn("big_decimal_c", 
DataTypes.STRING())
+                                    .physicalColumn("bit1_c", 
DataTypes.BOOLEAN())
+                                    .physicalColumn("bit3_c", 
DataTypes.BINARY(1))
+                                    .physicalColumn("tiny1_c", 
DataTypes.BOOLEAN())
+                                    .physicalColumn("boolean_c", 
DataTypes.BOOLEAN())
+                                    .physicalColumn("file_uuid", 
DataTypes.BINARY(16))
+                                    .physicalColumn("bit_c", 
DataTypes.BINARY(8))
+                                    .physicalColumn("text_c", 
DataTypes.STRING())
+                                    .physicalColumn("tiny_blob_c", 
DataTypes.BYTES())
+                                    .physicalColumn("blob_c", 
DataTypes.BYTES())
+                                    .physicalColumn("medium_blob_c", 
DataTypes.BYTES())
+                                    .physicalColumn("long_blob_c", 
DataTypes.BYTES())
+                                    .physicalColumn("enum_c", 
DataTypes.STRING())
+                                    .physicalColumn("json_c", 
DataTypes.STRING())
+                                    .physicalColumn("point_c", 
DataTypes.STRING())
+                                    .physicalColumn("geometry_c", 
DataTypes.STRING())
+                                    .physicalColumn("linestring_c", 
DataTypes.STRING())
+                                    .physicalColumn("polygon_c", 
DataTypes.STRING())
+                                    .physicalColumn("multipoint_c", 
DataTypes.STRING())
+                                    .physicalColumn("multiline_c", 
DataTypes.STRING())
+                                    .physicalColumn("multipolygon_c", 
DataTypes.STRING())
+                                    .physicalColumn("geometrycollection_c", 
DataTypes.STRING())
+                                    .physicalColumn("year_c", DataTypes.INT())
+                                    .physicalColumn("date_c", DataTypes.DATE())
+                                    .physicalColumn("time_c", 
DataTypes.TIME(0))
+                                    .physicalColumn("time_3_c", 
DataTypes.TIME(3))
+                                    .physicalColumn("time_6_c", 
DataTypes.TIME(6))
+                                    .physicalColumn("datetime_c", 
DataTypes.TIMESTAMP(0))
+                                    .physicalColumn("datetime3_c", 
DataTypes.TIMESTAMP(3))
+                                    .physicalColumn("datetime6_c", 
DataTypes.TIMESTAMP(6))
+                                    .physicalColumn("decimal_c0", 
DataTypes.DECIMAL(6, 2))
+                                    .physicalColumn("decimal_c1", 
DataTypes.DECIMAL(9, 4))
+                                    .physicalColumn("decimal_c2", 
DataTypes.DECIMAL(20, 4))
+                                    .physicalColumn("timestamp_c", 
DataTypes.TIMESTAMP_LTZ(0))
+                                    .physicalColumn("timestamp3_c", 
DataTypes.TIMESTAMP_LTZ(3))
+                                    .physicalColumn("timestamp6_c", 
DataTypes.TIMESTAMP_LTZ(6))
+                                    .primaryKey("id")
+                                    .build()));
         }
         List<Event> actual = fetchResults(events, expected.size());
         assertEqualsInAnyOrder(
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
index 4cc1e952a..fcd80440d 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
@@ -21,10 +21,10 @@ import 
org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
-import org.apache.flink.cdc.common.event.ChangeEvent;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.DataChangeEvent;
 import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.factories.Factory;
 import org.apache.flink.cdc.common.factories.FactoryHelper;
@@ -182,9 +182,16 @@ public class MysqlPipelineNewlyAddedTableITCase extends 
MySqlSourceTestBase {
                 addCollector(env, source, resultBuffer, serializer, 
accumulatorName);
         env.executeAsync("AddNewlyTablesWhenReadingBinlog");
         initialAddressTables(getConnection(), 
Collections.singletonList("address_beijing"));
-        List<Event> actual = fetchResults(iterator, 4);
-        assertThat(((ChangeEvent) actual.get(0)).tableId())
-                .isEqualTo(TableId.tableId(customDatabase.getDatabaseName(), 
"address_beijing"));
+        initialAddressTables(getConnection(), 
Collections.singletonList("address_shanghai"));
+        List<Event> actual = fetchResults(iterator, 8);
+        List<String> tableNames =
+                actual.stream()
+                        .filter((event) -> event instanceof CreateTableEvent)
+                        .map((event) -> ((SchemaChangeEvent) 
event).tableId().getTableName())
+                        .collect(Collectors.toList());
+        assertThat(tableNames.size()).isEqualTo(2);
+        assertThat(tableNames.get(0)).isEqualTo("address_beijing");
+        assertThat(tableNames.get(1)).isEqualTo("address_shanghai");
     }
 
     @Test

Reply via email to