This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch release-3.2
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/release-3.2 by this push:
new 6ef4734e2 [hotfix][mysql] Fix primary key restraints missing when
using inline `PRIMARY KEY` declaration syntax
6ef4734e2 is described below
commit 6ef4734e2e6999df20f491901c7c71d34fedbe31
Author: yuxiqian <[email protected]>
AuthorDate: Wed Aug 28 21:44:48 2024 +0800
[hotfix][mysql] Fix primary key restraints missing when using inline
`PRIMARY KEY` declaration syntax
This closes #3582
---
.../parser/CustomAlterTableParserListener.java | 123 +++++++-
.../CustomColumnDefinitionParserListener.java | 6 +
.../mysql/source/MySqlPipelineITCase.java | 316 +++++++++++++++++++++
3 files changed, 440 insertions(+), 5 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
index 79583d83e..8d86895b2 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
@@ -19,9 +19,11 @@ package org.apache.flink.cdc.connectors.mysql.source.parser;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.DataType;
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
@@ -30,6 +32,7 @@ import io.debezium.ddl.parser.mysql.generated.MySqlParser;
import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
+import io.debezium.relational.TableEditor;
import io.debezium.relational.TableId;
import org.antlr.v4.runtime.tree.ParseTreeListener;
import org.slf4j.Logger;
@@ -41,6 +44,7 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
import static
org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils.fromDbzColumn;
@@ -58,6 +62,7 @@ public class CustomAlterTableParserListener extends
MySqlParserBaseListener {
private List<ColumnEditor> columnEditors;
private CustomColumnDefinitionParserListener columnDefinitionListener;
+ private TableEditor tableEditor;
private int parsingColumnIndex = STARTING_INDEX;
@@ -70,6 +75,109 @@ public class CustomAlterTableParserListener extends
MySqlParserBaseListener {
this.changes = changes;
}
+ @Override
+ public void enterColumnCreateTable(MySqlParser.ColumnCreateTableContext
ctx) {
+ TableId tableId =
parser.parseQualifiedTableId(ctx.tableName().fullId());
+ if (parser.databaseTables().forTable(tableId) == null) {
+ tableEditor = parser.databaseTables().editOrCreateTable(tableId);
+ }
+ super.enterColumnCreateTable(ctx);
+ }
+
+ @Override
+ public void exitColumnCreateTable(MySqlParser.ColumnCreateTableContext
ctx) {
+ parser.runIfNotNull(
+ () -> {
+ // Make sure that the table's character set has been set
...
+ if (!tableEditor.hasDefaultCharsetName()) {
+ tableEditor.setDefaultCharsetName(
+ parser.charsetForTable(tableEditor.tableId()));
+ }
+ listeners.remove(columnDefinitionListener);
+ columnDefinitionListener = null;
+ // remove column definition parser listener
+ final String defaultCharsetName =
tableEditor.create().defaultCharsetName();
+ tableEditor.setColumns(
+ tableEditor.columns().stream()
+ .map(
+ column -> {
+ final ColumnEditor
columnEditor = column.edit();
+ if
(columnEditor.charsetNameOfTable() == null) {
+
columnEditor.charsetNameOfTable(
+
defaultCharsetName);
+ }
+ return columnEditor;
+ })
+ .map(ColumnEditor::create)
+ .collect(Collectors.toList()));
+
parser.databaseTables().overwriteTable(tableEditor.create());
+ parser.signalCreateTable(tableEditor.tableId(), ctx);
+
+ Schema.Builder builder = Schema.newBuilder();
+ tableEditor.columns().forEach(column ->
builder.column(toCdcColumn(column)));
+ if (tableEditor.hasPrimaryKey()) {
+
builder.primaryKey(tableEditor.primaryKeyColumnNames());
+ }
+ changes.add(
+ new CreateTableEvent(
+ toCdcTableId(tableEditor.tableId()),
builder.build()));
+ },
+ tableEditor);
+ super.exitColumnCreateTable(ctx);
+ }
+
+ @Override
+ public void enterColumnDeclaration(MySqlParser.ColumnDeclarationContext
ctx) {
+ parser.runIfNotNull(
+ () -> {
+ String columnName = parser.parseName(ctx.uid());
+ ColumnEditor columnEditor =
Column.editor().name(columnName);
+ if (columnDefinitionListener == null) {
+ columnDefinitionListener =
+ new CustomColumnDefinitionParserListener(
+ tableEditor, columnEditor, parser,
listeners);
+ listeners.add(columnDefinitionListener);
+ } else {
+ columnDefinitionListener.setColumnEditor(columnEditor);
+ }
+ },
+ tableEditor);
+ super.enterColumnDeclaration(ctx);
+ }
+
+ @Override
+ public void exitColumnDeclaration(MySqlParser.ColumnDeclarationContext
ctx) {
+ parser.runIfNotNull(
+ () -> {
+
tableEditor.addColumn(columnDefinitionListener.getColumn());
+ },
+ tableEditor,
+ columnDefinitionListener);
+ super.exitColumnDeclaration(ctx);
+ }
+
+ @Override
+ public void
enterPrimaryKeyTableConstraint(MySqlParser.PrimaryKeyTableConstraintContext
ctx) {
+ parser.runIfNotNull(
+ () -> {
+
parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor);
+ },
+ tableEditor);
+ super.enterPrimaryKeyTableConstraint(ctx);
+ }
+
+ @Override
+ public void
enterUniqueKeyTableConstraint(MySqlParser.UniqueKeyTableConstraintContext ctx) {
+ parser.runIfNotNull(
+ () -> {
+ if (!tableEditor.hasPrimaryKey()) {
+
parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor);
+ }
+ },
+ tableEditor);
+ super.enterUniqueKeyTableConstraint(ctx);
+ }
+
@Override
public void enterAlterTable(MySqlParser.AlterTableContext ctx) {
this.currentTable =
toCdcTableId(parser.parseQualifiedTableId(ctx.tableName().fullId()));
@@ -88,7 +196,8 @@ public class CustomAlterTableParserListener extends
MySqlParserBaseListener {
String columnName = parser.parseName(ctx.uid(0));
ColumnEditor columnEditor = Column.editor().name(columnName);
columnDefinitionListener =
- new CustomColumnDefinitionParserListener(columnEditor, parser,
listeners);
+ new CustomColumnDefinitionParserListener(
+ tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.exitAlterByAddColumn(ctx);
}
@@ -140,7 +249,8 @@ public class CustomAlterTableParserListener extends
MySqlParserBaseListener {
columnEditors.add(Column.editor().name(columnName));
}
columnDefinitionListener =
- new CustomColumnDefinitionParserListener(columnEditors.get(0),
parser, listeners);
+ new CustomColumnDefinitionParserListener(
+ tableEditor, columnEditors.get(0), parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByAddColumns(ctx);
}
@@ -190,7 +300,8 @@ public class CustomAlterTableParserListener extends
MySqlParserBaseListener {
columnEditor.unsetDefaultValueExpression();
columnDefinitionListener =
- new CustomColumnDefinitionParserListener(columnEditor, parser,
listeners);
+ new CustomColumnDefinitionParserListener(
+ tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByChangeColumn(ctx);
}
@@ -229,7 +340,8 @@ public class CustomAlterTableParserListener extends
MySqlParserBaseListener {
String oldColumnName = parser.parseName(ctx.oldColumn);
ColumnEditor columnEditor = Column.editor().name(oldColumnName);
columnDefinitionListener =
- new CustomColumnDefinitionParserListener(columnEditor, parser,
listeners);
+ new CustomColumnDefinitionParserListener(
+ tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByRenameColumn(ctx);
}
@@ -241,7 +353,8 @@ public class CustomAlterTableParserListener extends
MySqlParserBaseListener {
columnEditor.unsetDefaultValueExpression();
columnDefinitionListener =
- new CustomColumnDefinitionParserListener(columnEditor, parser,
listeners);
+ new CustomColumnDefinitionParserListener(
+ tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByModifyColumn(ctx);
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java
index 591836418..e886580b3 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java
@@ -25,6 +25,7 @@ import io.debezium.ddl.parser.mysql.generated.MySqlParser;
import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
+import io.debezium.relational.TableEditor;
import io.debezium.relational.ddl.DataType;
import io.debezium.util.Strings;
import org.antlr.v4.runtime.tree.ParseTreeListener;
@@ -50,13 +51,16 @@ public class CustomColumnDefinitionParserListener extends
MySqlParserBaseListene
private boolean uniqueColumn;
private AtomicReference<Boolean> optionalColumn = new AtomicReference<>();
private DefaultValueParserListener defaultValueListener;
+ private final TableEditor tableEditor;
private final List<ParseTreeListener> listeners;
public CustomColumnDefinitionParserListener(
+ TableEditor tableEditor,
ColumnEditor columnEditor,
MySqlAntlrDdlParser parser,
List<ParseTreeListener> listeners) {
+ this.tableEditor = tableEditor;
this.columnEditor = columnEditor;
this.parser = parser;
this.dataTypeResolver = parser.dataTypeResolver();
@@ -106,6 +110,8 @@ public class CustomColumnDefinitionParserListener extends
MySqlParserBaseListene
// this rule will be parsed only if no primary key is set in a table
// otherwise the statement can't be executed due to multiple primary
key error
optionalColumn.set(Boolean.FALSE);
+ tableEditor.addColumn(columnEditor.create());
+ tableEditor.setPrimaryKeyNames(columnEditor.name());
super.enterPrimaryKeyColumnConstraint(ctx);
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index f019e98d8..8f613e0ef 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -62,6 +62,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED;
@@ -372,6 +373,281 @@ public class MySqlPipelineITCase extends
MySqlSourceTestBase {
assertThat(actual).isEqualTo(expected);
}
+ @Test
+ public void testSchemaChangeEvents() throws Exception {
+ env.setParallelism(1);
+ inventoryDatabase.createAndInitialize();
+ MySqlSourceConfigFactory configFactory =
+ new MySqlSourceConfigFactory()
+ .hostname(MYSQL8_CONTAINER.getHost())
+ .port(MYSQL8_CONTAINER.getDatabasePort())
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+ .databaseList(inventoryDatabase.getDatabaseName())
+ .tableList(inventoryDatabase.getDatabaseName() + ".*")
+ .startupOptions(StartupOptions.latest())
+ .serverId(getServerId(env.getParallelism()))
+ .serverTimeZone("UTC")
+
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());
+
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider) new
MySqlDataSource(configFactory).getEventSourceProvider();
+ CloseableIterator<Event> events =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ MySqlDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+ Thread.sleep(5_000);
+
+ List<Event> expected =
+ new ArrayList<>(
+
getInventoryCreateAllTableEvents(inventoryDatabase.getDatabaseName()));
+
+ try (Connection connection = inventoryDatabase.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+
+ statement.execute(
+ String.format(
+ "ALTER TABLE `%s`.`customers` ADD COLUMN `newcol1`
INT NULL;",
+ inventoryDatabase.getDatabaseName()));
+ expected.add(
+ new AddColumnEvent(
+
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn("newcol1",
DataTypes.INT())))));
+
+ // Test MODIFY COLUMN DDL
+ statement.execute(
+ String.format(
+ "ALTER TABLE `%s`.`customers` MODIFY COLUMN
`newcol1` DOUBLE;",
+ inventoryDatabase.getDatabaseName()));
+
+ expected.add(
+ new AlterColumnTypeEvent(
+
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
+ Collections.singletonMap("newcol1",
DataTypes.DOUBLE())));
+
+ // Test CHANGE COLUMN DDL
+ statement.execute(
+ String.format(
+ "ALTER TABLE `%s`.`customers` CHANGE COLUMN
`newcol1` `newcol2` INT;",
+ inventoryDatabase.getDatabaseName()));
+
+ expected.add(
+ new AlterColumnTypeEvent(
+
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
+ Collections.singletonMap("newcol1",
DataTypes.INT())));
+
+ expected.add(
+ new RenameColumnEvent(
+
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
+ Collections.singletonMap("newcol1", "newcol2")));
+
+ statement.execute(
+ String.format(
+ "ALTER TABLE `%s`.`customers` CHANGE COLUMN
`newcol2` `newcol1` DOUBLE;",
+ inventoryDatabase.getDatabaseName()));
+
+ expected.add(
+ new AlterColumnTypeEvent(
+
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
+ Collections.singletonMap("newcol2",
DataTypes.DOUBLE())));
+
+ expected.add(
+ new RenameColumnEvent(
+
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
+ Collections.singletonMap("newcol2", "newcol1")));
+
+ // Test create table DDL
+ statement.execute(
+ String.format(
+ "CREATE TABLE `%s`.`newlyAddedTable1`("
+ + " id SERIAL,\n"
+ + " tiny_c TINYINT,\n"
+ + " tiny_un_c TINYINT
UNSIGNED,\n"
+ + " tiny_un_z_c TINYINT
UNSIGNED ZEROFILL,\n"
+ + " small_c SMALLINT,\n"
+ + " small_un_c SMALLINT
UNSIGNED,\n"
+ + " small_un_z_c SMALLINT
UNSIGNED ZEROFILL,\n"
+ + " medium_c MEDIUMINT,\n"
+ + " medium_un_c MEDIUMINT
UNSIGNED,\n"
+ + " medium_un_z_c MEDIUMINT
UNSIGNED ZEROFILL,\n"
+ + " int_c INTEGER,\n"
+ + " int_un_c INTEGER
UNSIGNED,\n"
+ + " int_un_z_c INTEGER
UNSIGNED ZEROFILL,\n"
+ + " int11_c INT(11),\n"
+ + " big_c BIGINT,\n"
+ + " big_un_c BIGINT
UNSIGNED,\n"
+ + " big_un_z_c BIGINT
UNSIGNED ZEROFILL,\n"
+ + " varchar_c
VARCHAR(255),\n"
+ + " char_c CHAR(3),\n"
+ + " real_c REAL,\n"
+ + " float_c FLOAT,\n"
+ + " float_un_c FLOAT
UNSIGNED,\n"
+ + " float_un_z_c FLOAT UNSIGNED
ZEROFILL,\n"
+ + " double_c DOUBLE,\n"
+ + " double_un_c DOUBLE
UNSIGNED,\n"
+ + " double_un_z_c DOUBLE
UNSIGNED ZEROFILL,\n"
+ + " decimal_c DECIMAL(8,
4),\n"
+ + " decimal_un_c DECIMAL(8, 4)
UNSIGNED,\n"
+ + " decimal_un_z_c DECIMAL(8, 4)
UNSIGNED ZEROFILL,\n"
+ + " numeric_c NUMERIC(6,
0),\n"
+ + " big_decimal_c DECIMAL(65,
1),\n"
+ + " bit1_c BIT,\n"
+ + " bit3_c BIT(3),\n"
+ + " tiny1_c TINYINT(1),\n"
+ + " boolean_c BOOLEAN,\n"
+ + " file_uuid BINARY(16),\n"
+ + " bit_c BIT(64),\n"
+ + " text_c TEXT,\n"
+ + " tiny_blob_c TINYBLOB,\n"
+ + " blob_c BLOB,\n"
+ + " medium_blob_c MEDIUMBLOB,\n"
+ + " long_blob_c LONGBLOB,\n"
+ + " enum_c enum('red',
'white'),\n"
+ + " json_c JSON,\n"
+ + " point_c POINT,\n"
+ + " geometry_c GEOMETRY,\n"
+ + " linestring_c LINESTRING,\n"
+ + " polygon_c POLYGON,\n"
+ + " multipoint_c MULTIPOINT,\n"
+ + " multiline_c
MULTILINESTRING,\n"
+ + " multipolygon_c
MULTIPOLYGON,\n"
+ + " geometrycollection_c
GEOMETRYCOLLECTION,"
+ + " year_c YEAR,\n"
+ + " date_c DATE,\n"
+ + " time_c TIME(0),\n"
+ + " time_3_c TIME(3),\n"
+ + " time_6_c TIME(6),\n"
+ + " datetime_c DATETIME(0),\n"
+ + " datetime3_c DATETIME(3),\n"
+ + " datetime6_c DATETIME(6),\n"
+ + " decimal_c0 DECIMAL(6,
2),\n"
+ + " decimal_c1 DECIMAL(9,
4),\n"
+ + " decimal_c2 DECIMAL(20,
4),\n"
+ + " timestamp_c TIMESTAMP(0)
NULL,\n"
+ + " timestamp3_c TIMESTAMP(3)
NULL,\n"
+ + " timestamp6_c TIMESTAMP(6)
NULL,"
+ + "primary key(id));",
+ inventoryDatabase.getDatabaseName()));
+
+ expected.add(
+ new CreateTableEvent(
+ TableId.tableId(
+ inventoryDatabase.getDatabaseName(),
"newlyAddedTable1"),
+ Schema.newBuilder()
+ .physicalColumn("id",
DataTypes.DECIMAL(20, 0).notNull())
+ .physicalColumn("tiny_c",
DataTypes.TINYINT())
+ .physicalColumn("tiny_un_c",
DataTypes.SMALLINT())
+ .physicalColumn("tiny_un_z_c",
DataTypes.SMALLINT())
+ .physicalColumn("small_c",
DataTypes.SMALLINT())
+ .physicalColumn("small_un_c",
DataTypes.INT())
+ .physicalColumn("small_un_z_c",
DataTypes.INT())
+ .physicalColumn("medium_c",
DataTypes.INT())
+ .physicalColumn("medium_un_c",
DataTypes.INT())
+ .physicalColumn("medium_un_z_c",
DataTypes.INT())
+ .physicalColumn("int_c", DataTypes.INT())
+ .physicalColumn("int_un_c",
DataTypes.BIGINT())
+ .physicalColumn("int_un_z_c",
DataTypes.BIGINT())
+ .physicalColumn("int11_c", DataTypes.INT())
+ .physicalColumn("big_c",
DataTypes.BIGINT())
+ .physicalColumn("big_un_c",
DataTypes.DECIMAL(20, 0))
+ .physicalColumn("big_un_z_c",
DataTypes.DECIMAL(20, 0))
+ .physicalColumn("varchar_c",
DataTypes.VARCHAR(255))
+ .physicalColumn("char_c",
DataTypes.CHAR(3))
+ .physicalColumn("real_c",
DataTypes.DOUBLE())
+ .physicalColumn("float_c",
DataTypes.FLOAT())
+ .physicalColumn("float_un_c",
DataTypes.FLOAT())
+ .physicalColumn("float_un_z_c",
DataTypes.FLOAT())
+ .physicalColumn("double_c",
DataTypes.DOUBLE())
+ .physicalColumn("double_un_c",
DataTypes.DOUBLE())
+ .physicalColumn("double_un_z_c",
DataTypes.DOUBLE())
+ .physicalColumn("decimal_c",
DataTypes.DECIMAL(8, 4))
+ .physicalColumn("decimal_un_c",
DataTypes.DECIMAL(8, 4))
+ .physicalColumn("decimal_un_z_c",
DataTypes.DECIMAL(8, 4))
+ .physicalColumn("numeric_c",
DataTypes.DECIMAL(6, 0))
+ .physicalColumn("big_decimal_c",
DataTypes.STRING())
+ .physicalColumn("bit1_c",
DataTypes.BOOLEAN())
+ .physicalColumn("bit3_c",
DataTypes.BINARY(1))
+ .physicalColumn("tiny1_c",
DataTypes.BOOLEAN())
+ .physicalColumn("boolean_c",
DataTypes.BOOLEAN())
+ .physicalColumn("file_uuid",
DataTypes.BINARY(16))
+ .physicalColumn("bit_c",
DataTypes.BINARY(8))
+ .physicalColumn("text_c",
DataTypes.STRING())
+ .physicalColumn("tiny_blob_c",
DataTypes.BYTES())
+ .physicalColumn("blob_c",
DataTypes.BYTES())
+ .physicalColumn("medium_blob_c",
DataTypes.BYTES())
+ .physicalColumn("long_blob_c",
DataTypes.BYTES())
+ .physicalColumn("enum_c",
DataTypes.STRING())
+ .physicalColumn("json_c",
DataTypes.STRING())
+ .physicalColumn("point_c",
DataTypes.STRING())
+ .physicalColumn("geometry_c",
DataTypes.STRING())
+ .physicalColumn("linestring_c",
DataTypes.STRING())
+ .physicalColumn("polygon_c",
DataTypes.STRING())
+ .physicalColumn("multipoint_c",
DataTypes.STRING())
+ .physicalColumn("multiline_c",
DataTypes.STRING())
+ .physicalColumn("multipolygon_c",
DataTypes.STRING())
+ .physicalColumn("geometrycollection_c",
DataTypes.STRING())
+ .physicalColumn("year_c", DataTypes.INT())
+ .physicalColumn("date_c", DataTypes.DATE())
+ .physicalColumn("time_c",
DataTypes.TIME(0))
+ .physicalColumn("time_3_c",
DataTypes.TIME(3))
+ .physicalColumn("time_6_c",
DataTypes.TIME(6))
+ .physicalColumn("datetime_c",
DataTypes.TIMESTAMP(0))
+ .physicalColumn("datetime3_c",
DataTypes.TIMESTAMP(3))
+ .physicalColumn("datetime6_c",
DataTypes.TIMESTAMP(6))
+ .physicalColumn("decimal_c0",
DataTypes.DECIMAL(6, 2))
+ .physicalColumn("decimal_c1",
DataTypes.DECIMAL(9, 4))
+ .physicalColumn("decimal_c2",
DataTypes.DECIMAL(20, 4))
+ .physicalColumn("timestamp_c",
DataTypes.TIMESTAMP_LTZ(0))
+ .physicalColumn("timestamp3_c",
DataTypes.TIMESTAMP_LTZ(3))
+ .physicalColumn("timestamp6_c",
DataTypes.TIMESTAMP_LTZ(6))
+ .primaryKey("id")
+ .build()));
+
+ // Test create table DDL with inline primary key
+ statement.execute(
+ String.format(
+ "CREATE TABLE `%s`.`newlyAddedTable2`(id SERIAL
PRIMARY KEY);",
+ inventoryDatabase.getDatabaseName()));
+ expected.add(
+ new CreateTableEvent(
+ TableId.tableId(
+ inventoryDatabase.getDatabaseName(),
"newlyAddedTable2"),
+ Schema.newBuilder()
+ .physicalColumn("id",
DataTypes.DECIMAL(20, 0).notNull())
+ .primaryKey("id")
+ .build()));
+
+ // Test create table DDL with multiple primary keys
+ statement.execute(
+ String.format(
+ "CREATE TABLE `%s`.`newlyAddedTable3`("
+ + "id SERIAL,"
+ + "name VARCHAR(17),"
+ + "notes TEXT,"
+ + "PRIMARY KEY (id, name));",
+ inventoryDatabase.getDatabaseName()));
+ expected.add(
+ new CreateTableEvent(
+ TableId.tableId(
+ inventoryDatabase.getDatabaseName(),
"newlyAddedTable3"),
+ Schema.newBuilder()
+ .physicalColumn("id",
DataTypes.DECIMAL(20, 0).notNull())
+ .physicalColumn("name",
DataTypes.VARCHAR(17).notNull())
+ .physicalColumn("notes",
DataTypes.STRING())
+ .primaryKey("id", "name")
+ .build()));
+ }
+ List<Event> actual = fetchResults(events, expected.size());
+ assertEqualsInAnyOrder(
+
expected.stream().map(Object::toString).collect(Collectors.toList()),
+
actual.stream().map(Object::toString).collect(Collectors.toList()));
+ }
+
private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
return new CreateTableEvent(
tableId,
@@ -384,6 +660,46 @@ public class MySqlPipelineITCase extends
MySqlSourceTestBase {
.build());
}
+ private List<CreateTableEvent> getInventoryCreateAllTableEvents(String
databaseName) {
+ return Arrays.asList(
+ new CreateTableEvent(
+ TableId.tableId(databaseName, "products"),
+ Schema.newBuilder()
+ .physicalColumn("id",
DataTypes.INT().notNull())
+ .physicalColumn("name",
DataTypes.VARCHAR(255).notNull(), "flink")
+ .physicalColumn("description",
DataTypes.VARCHAR(512))
+ .physicalColumn("weight", DataTypes.FLOAT())
+ .primaryKey(Collections.singletonList("id"))
+ .build()),
+ new CreateTableEvent(
+ TableId.tableId(databaseName, "customers"),
+ Schema.newBuilder()
+ .physicalColumn("id",
DataTypes.INT().notNull())
+ .physicalColumn("first_name",
DataTypes.VARCHAR(255).notNull())
+ .physicalColumn("last_name",
DataTypes.VARCHAR(255).notNull())
+ .physicalColumn("email",
DataTypes.VARCHAR(255).notNull())
+ .primaryKey(Collections.singletonList("id"))
+ .build()),
+ new CreateTableEvent(
+ TableId.tableId(databaseName, "orders"),
+ Schema.newBuilder()
+ .physicalColumn("order_number",
DataTypes.INT().notNull())
+ .physicalColumn("order_date",
DataTypes.DATE().notNull())
+ .physicalColumn("purchaser",
DataTypes.INT().notNull())
+ .physicalColumn("quantity",
DataTypes.INT().notNull())
+ .physicalColumn("product_id",
DataTypes.INT().notNull())
+
.primaryKey(Collections.singletonList("order_number"))
+ .build()),
+ new CreateTableEvent(
+ TableId.tableId(databaseName, "multi_max_table"),
+ Schema.newBuilder()
+ .physicalColumn("order_id",
DataTypes.VARCHAR(128).notNull())
+ .physicalColumn("index",
DataTypes.INT().notNull())
+ .physicalColumn("desc",
DataTypes.VARCHAR(512).notNull())
+ .primaryKey(Arrays.asList("order_id", "index"))
+ .build()));
+ }
+
private List<Event> getSnapshotExpected(TableId tableId) {
RowType rowType =
RowType.of(