This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 3c1517f70 [FLINK-36148][pipeline-connector/mysql] Fix that newly added
table can not discovered by adding custom parser for CreateTableEvent
3c1517f70 is described below
commit 3c1517f70f728e1ef8a4b6bed942d87d66d874df
Author: Kunni <[email protected]>
AuthorDate: Tue Aug 27 15:29:52 2024 +0800
[FLINK-36148][pipeline-connector/mysql] Fix that newly added table can not
discovered by adding custom parser for CreateTableEvent
This closes #3570.
---
.../parser/CustomAlterTableParserListener.java | 106 ++++++++++++++-
.../mysql/source/MySqlPipelineITCase.java | 147 +++++++++++++++++++++
.../source/MysqlPipelineNewlyAddedTableITCase.java | 15 ++-
3 files changed, 263 insertions(+), 5 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
index cdb5983a5..eb30fe80d 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
@@ -19,11 +19,13 @@ package org.apache.flink.cdc.connectors.mysql.source.parser;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.DropTableEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TruncateTableEvent;
+import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
@@ -32,6 +34,7 @@ import io.debezium.ddl.parser.mysql.generated.MySqlParser;
import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
+import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import org.antlr.v4.runtime.tree.ParseTreeListener;
import org.slf4j.Logger;
@@ -43,6 +46,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import static
org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils.fromDbzColumn;
@@ -58,8 +62,8 @@ public class CustomAlterTableParserListener extends
MySqlParserBaseListener {
private final LinkedList<SchemaChangeEvent> changes;
private org.apache.flink.cdc.common.event.TableId currentTable;
private List<ColumnEditor> columnEditors;
-
private CustomColumnDefinitionParserListener columnDefinitionListener;
+ private TableEditor tableEditor;
private int parsingColumnIndex = STARTING_INDEX;
@@ -72,6 +76,106 @@ public class CustomAlterTableParserListener extends
MySqlParserBaseListener {
this.changes = changes;
}
+ @Override
+ public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext
ctx) {
+ TableId tableId =
parser.parseQualifiedTableId(ctx.tableName().fullId());
+ if (parser.databaseTables().forTable(tableId) == null) {
+ tableEditor = parser.databaseTables().editOrCreateTable(tableId);
+ super.enterColumnCreateTable(ctx);
+ }
+ }
+
+ @Override
+ public void exitColumnCreateTable(MySqlParser.ColumnCreateTableContext
ctx) {
+ parser.runIfNotNull(
+ () -> {
+ // Make sure that the table's character set has been set
...
+ if (!tableEditor.hasDefaultCharsetName()) {
+ tableEditor.setDefaultCharsetName(
+ parser.charsetForTable(tableEditor.tableId()));
+ }
+ listeners.remove(columnDefinitionListener);
+ columnDefinitionListener = null;
+ // remove column definition parser listener
+ final String defaultCharsetName =
tableEditor.create().defaultCharsetName();
+ tableEditor.setColumns(
+ tableEditor.columns().stream()
+ .map(
+ column -> {
+ final ColumnEditor
columnEditor = column.edit();
+ if
(columnEditor.charsetNameOfTable() == null) {
+
columnEditor.charsetNameOfTable(
+
defaultCharsetName);
+ }
+ return columnEditor;
+ })
+ .map(ColumnEditor::create)
+ .collect(Collectors.toList()));
+
parser.databaseTables().overwriteTable(tableEditor.create());
+ parser.signalCreateTable(tableEditor.tableId(), ctx);
+ },
+ tableEditor);
+ Schema.Builder builder = Schema.newBuilder();
+ tableEditor.columns().forEach(column ->
builder.column(toCdcColumn(column)));
+ if (tableEditor.hasPrimaryKey()) {
+ builder.primaryKey(tableEditor.primaryKeyColumnNames());
+ }
+ changes.add(new CreateTableEvent(toCdcTableId(tableEditor.tableId()),
builder.build()));
+ super.exitColumnCreateTable(ctx);
+ }
+
+ @Override
+ public void enterColumnDeclaration(MySqlParser.ColumnDeclarationContext
ctx) {
+ parser.runIfNotNull(
+ () -> {
+ String columnName = parser.parseName(ctx.uid());
+ ColumnEditor columnEditor =
Column.editor().name(columnName);
+ if (columnDefinitionListener == null) {
+ columnDefinitionListener =
+ new CustomColumnDefinitionParserListener(
+ columnEditor, parser, listeners);
+ listeners.add(columnDefinitionListener);
+ } else {
+ columnDefinitionListener.setColumnEditor(columnEditor);
+ }
+ },
+ tableEditor);
+ super.enterColumnDeclaration(ctx);
+ }
+
+ @Override
+ public void exitColumnDeclaration(MySqlParser.ColumnDeclarationContext
ctx) {
+ parser.runIfNotNull(
+ () -> {
+
tableEditor.addColumn(columnDefinitionListener.getColumn());
+ },
+ tableEditor,
+ columnDefinitionListener);
+ super.exitColumnDeclaration(ctx);
+ }
+
+ @Override
+ public void
enterPrimaryKeyTableConstraint(MySqlParser.PrimaryKeyTableConstraintContext
ctx) {
+ parser.runIfNotNull(
+ () -> {
+
parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor);
+ },
+ tableEditor);
+ super.enterPrimaryKeyTableConstraint(ctx);
+ }
+
+ @Override
+ public void
enterUniqueKeyTableConstraint(MySqlParser.UniqueKeyTableConstraintContext ctx) {
+ parser.runIfNotNull(
+ () -> {
+ if (!tableEditor.hasPrimaryKey()) {
+
parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor);
+ }
+ },
+ tableEditor);
+ super.enterUniqueKeyTableConstraint(ctx);
+ }
+
@Override
public void enterAlterTable(MySqlParser.AlterTableContext ctx) {
this.currentTable =
toCdcTableId(parser.parseQualifiedTableId(ctx.tableName().fullId()));
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index 108a3d36d..09b2a308a 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -501,6 +501,153 @@ public class MySqlPipelineITCase extends
MySqlSourceTestBase {
expected.add(
new DropTableEvent(
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers")));
+
+ // Test create table DDL
+ statement.execute(
+ String.format(
+ "CREATE TABLE `%s`.`newlyAddedTable1`("
+ + " id SERIAL,\n"
+ + " tiny_c TINYINT,\n"
+ + " tiny_un_c TINYINT
UNSIGNED,\n"
+ + " tiny_un_z_c TINYINT
UNSIGNED ZEROFILL,\n"
+ + " small_c SMALLINT,\n"
+ + " small_un_c SMALLINT
UNSIGNED,\n"
+ + " small_un_z_c SMALLINT
UNSIGNED ZEROFILL,\n"
+ + " medium_c MEDIUMINT,\n"
+ + " medium_un_c MEDIUMINT
UNSIGNED,\n"
+ + " medium_un_z_c MEDIUMINT
UNSIGNED ZEROFILL,\n"
+ + " int_c INTEGER,\n"
+ + " int_un_c INTEGER
UNSIGNED,\n"
+ + " int_un_z_c INTEGER
UNSIGNED ZEROFILL,\n"
+ + " int11_c INT(11),\n"
+ + " big_c BIGINT,\n"
+ + " big_un_c BIGINT
UNSIGNED,\n"
+ + " big_un_z_c BIGINT
UNSIGNED ZEROFILL,\n"
+ + " varchar_c
VARCHAR(255),\n"
+ + " char_c CHAR(3),\n"
+ + " real_c REAL,\n"
+ + " float_c FLOAT,\n"
+ + " float_un_c FLOAT
UNSIGNED,\n"
+ + " float_un_z_c FLOAT UNSIGNED
ZEROFILL,\n"
+ + " double_c DOUBLE,\n"
+ + " double_un_c DOUBLE
UNSIGNED,\n"
+ + " double_un_z_c DOUBLE
UNSIGNED ZEROFILL,\n"
+ + " decimal_c DECIMAL(8,
4),\n"
+ + " decimal_un_c DECIMAL(8, 4)
UNSIGNED,\n"
+ + " decimal_un_z_c DECIMAL(8, 4)
UNSIGNED ZEROFILL,\n"
+ + " numeric_c NUMERIC(6,
0),\n"
+ + " big_decimal_c DECIMAL(65,
1),\n"
+ + " bit1_c BIT,\n"
+ + " bit3_c BIT(3),\n"
+ + " tiny1_c TINYINT(1),\n"
+ + " boolean_c BOOLEAN,\n"
+ + " file_uuid BINARY(16),\n"
+ + " bit_c BIT(64),\n"
+ + " text_c TEXT,\n"
+ + " tiny_blob_c TINYBLOB,\n"
+ + " blob_c BLOB,\n"
+ + " medium_blob_c MEDIUMBLOB,\n"
+ + " long_blob_c LONGBLOB,\n"
+ + " enum_c enum('red',
'white'),\n"
+ + " json_c JSON,\n"
+ + " point_c POINT,\n"
+ + " geometry_c GEOMETRY,\n"
+ + " linestring_c LINESTRING,\n"
+ + " polygon_c POLYGON,\n"
+ + " multipoint_c MULTIPOINT,\n"
+ + " multiline_c
MULTILINESTRING,\n"
+ + " multipolygon_c
MULTIPOLYGON,\n"
+ + " geometrycollection_c
GEOMETRYCOLLECTION,"
+ + " year_c YEAR,\n"
+ + " date_c DATE,\n"
+ + " time_c TIME(0),\n"
+ + " time_3_c TIME(3),\n"
+ + " time_6_c TIME(6),\n"
+ + " datetime_c DATETIME(0),\n"
+ + " datetime3_c DATETIME(3),\n"
+ + " datetime6_c DATETIME(6),\n"
+ + " decimal_c0 DECIMAL(6,
2),\n"
+ + " decimal_c1 DECIMAL(9,
4),\n"
+ + " decimal_c2 DECIMAL(20,
4),\n"
+ + " timestamp_c TIMESTAMP(0)
NULL,\n"
+ + " timestamp3_c TIMESTAMP(3)
NULL,\n"
+ + " timestamp6_c TIMESTAMP(6)
NULL,"
+ + "primary key(id));",
+ inventoryDatabase.getDatabaseName()));
+
+ expected.add(
+ new CreateTableEvent(
+ TableId.tableId(
+ inventoryDatabase.getDatabaseName(),
"newlyAddedTable1"),
+ Schema.newBuilder()
+ .physicalColumn("id",
DataTypes.DECIMAL(20, 0).notNull())
+ .physicalColumn("tiny_c",
DataTypes.TINYINT())
+ .physicalColumn("tiny_un_c",
DataTypes.SMALLINT())
+ .physicalColumn("tiny_un_z_c",
DataTypes.SMALLINT())
+ .physicalColumn("small_c",
DataTypes.SMALLINT())
+ .physicalColumn("small_un_c",
DataTypes.INT())
+ .physicalColumn("small_un_z_c",
DataTypes.INT())
+ .physicalColumn("medium_c",
DataTypes.INT())
+ .physicalColumn("medium_un_c",
DataTypes.INT())
+ .physicalColumn("medium_un_z_c",
DataTypes.INT())
+ .physicalColumn("int_c", DataTypes.INT())
+ .physicalColumn("int_un_c",
DataTypes.BIGINT())
+ .physicalColumn("int_un_z_c",
DataTypes.BIGINT())
+ .physicalColumn("int11_c", DataTypes.INT())
+ .physicalColumn("big_c",
DataTypes.BIGINT())
+ .physicalColumn("big_un_c",
DataTypes.DECIMAL(20, 0))
+ .physicalColumn("big_un_z_c",
DataTypes.DECIMAL(20, 0))
+ .physicalColumn("varchar_c",
DataTypes.VARCHAR(255))
+ .physicalColumn("char_c",
DataTypes.CHAR(3))
+ .physicalColumn("real_c",
DataTypes.DOUBLE())
+ .physicalColumn("float_c",
DataTypes.FLOAT())
+ .physicalColumn("float_un_c",
DataTypes.FLOAT())
+ .physicalColumn("float_un_z_c",
DataTypes.FLOAT())
+ .physicalColumn("double_c",
DataTypes.DOUBLE())
+ .physicalColumn("double_un_c",
DataTypes.DOUBLE())
+ .physicalColumn("double_un_z_c",
DataTypes.DOUBLE())
+ .physicalColumn("decimal_c",
DataTypes.DECIMAL(8, 4))
+ .physicalColumn("decimal_un_c",
DataTypes.DECIMAL(8, 4))
+ .physicalColumn("decimal_un_z_c",
DataTypes.DECIMAL(8, 4))
+ .physicalColumn("numeric_c",
DataTypes.DECIMAL(6, 0))
+ .physicalColumn("big_decimal_c",
DataTypes.STRING())
+ .physicalColumn("bit1_c",
DataTypes.BOOLEAN())
+ .physicalColumn("bit3_c",
DataTypes.BINARY(1))
+ .physicalColumn("tiny1_c",
DataTypes.BOOLEAN())
+ .physicalColumn("boolean_c",
DataTypes.BOOLEAN())
+ .physicalColumn("file_uuid",
DataTypes.BINARY(16))
+ .physicalColumn("bit_c",
DataTypes.BINARY(8))
+ .physicalColumn("text_c",
DataTypes.STRING())
+ .physicalColumn("tiny_blob_c",
DataTypes.BYTES())
+ .physicalColumn("blob_c",
DataTypes.BYTES())
+ .physicalColumn("medium_blob_c",
DataTypes.BYTES())
+ .physicalColumn("long_blob_c",
DataTypes.BYTES())
+ .physicalColumn("enum_c",
DataTypes.STRING())
+ .physicalColumn("json_c",
DataTypes.STRING())
+ .physicalColumn("point_c",
DataTypes.STRING())
+ .physicalColumn("geometry_c",
DataTypes.STRING())
+ .physicalColumn("linestring_c",
DataTypes.STRING())
+ .physicalColumn("polygon_c",
DataTypes.STRING())
+ .physicalColumn("multipoint_c",
DataTypes.STRING())
+ .physicalColumn("multiline_c",
DataTypes.STRING())
+ .physicalColumn("multipolygon_c",
DataTypes.STRING())
+ .physicalColumn("geometrycollection_c",
DataTypes.STRING())
+ .physicalColumn("year_c", DataTypes.INT())
+ .physicalColumn("date_c", DataTypes.DATE())
+ .physicalColumn("time_c",
DataTypes.TIME(0))
+ .physicalColumn("time_3_c",
DataTypes.TIME(3))
+ .physicalColumn("time_6_c",
DataTypes.TIME(6))
+ .physicalColumn("datetime_c",
DataTypes.TIMESTAMP(0))
+ .physicalColumn("datetime3_c",
DataTypes.TIMESTAMP(3))
+ .physicalColumn("datetime6_c",
DataTypes.TIMESTAMP(6))
+ .physicalColumn("decimal_c0",
DataTypes.DECIMAL(6, 2))
+ .physicalColumn("decimal_c1",
DataTypes.DECIMAL(9, 4))
+ .physicalColumn("decimal_c2",
DataTypes.DECIMAL(20, 4))
+ .physicalColumn("timestamp_c",
DataTypes.TIMESTAMP_LTZ(0))
+ .physicalColumn("timestamp3_c",
DataTypes.TIMESTAMP_LTZ(3))
+ .physicalColumn("timestamp6_c",
DataTypes.TIMESTAMP_LTZ(6))
+ .primaryKey("id")
+ .build()));
}
List<Event> actual = fetchResults(events, expected.size());
assertEqualsInAnyOrder(
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
index 4cc1e952a..fcd80440d 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MysqlPipelineNewlyAddedTableITCase.java
@@ -21,10 +21,10 @@ import
org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
-import org.apache.flink.cdc.common.event.ChangeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.factories.Factory;
import org.apache.flink.cdc.common.factories.FactoryHelper;
@@ -182,9 +182,16 @@ public class MysqlPipelineNewlyAddedTableITCase extends
MySqlSourceTestBase {
addCollector(env, source, resultBuffer, serializer,
accumulatorName);
env.executeAsync("AddNewlyTablesWhenReadingBinlog");
initialAddressTables(getConnection(),
Collections.singletonList("address_beijing"));
- List<Event> actual = fetchResults(iterator, 4);
- assertThat(((ChangeEvent) actual.get(0)).tableId())
- .isEqualTo(TableId.tableId(customDatabase.getDatabaseName(),
"address_beijing"));
+ initialAddressTables(getConnection(),
Collections.singletonList("address_shanghai"));
+ List<Event> actual = fetchResults(iterator, 8);
+ List<String> tableNames =
+ actual.stream()
+ .filter((event) -> event instanceof CreateTableEvent)
+ .map((event) -> ((SchemaChangeEvent)
event).tableId().getTableName())
+ .collect(Collectors.toList());
+ assertThat(tableNames.size()).isEqualTo(2);
+ assertThat(tableNames.get(0)).isEqualTo("address_beijing");
+ assertThat(tableNames.get(1)).isEqualTo("address_shanghai");
}
@Test