This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 0e9a176b0 [hotfix][pipeline-connector/mysql] Fix primary key
restraints missing when using inline `PRIMARY KEY` declaration syntax
0e9a176b0 is described below
commit 0e9a176b086629a87e5bb72505382d076dc8b754
Author: yuxiqian <[email protected]>
AuthorDate: Wed Aug 28 18:24:48 2024 +0800
[hotfix][pipeline-connector/mysql] Fix primary key restraints missing when
using inline `PRIMARY KEY` declaration syntax
This closes #3579.
---
.../parser/CustomAlterTableParserListener.java | 34 +++++++++++++---------
.../CustomColumnDefinitionParserListener.java | 6 ++++
.../mysql/source/MySqlPipelineITCase.java | 34 ++++++++++++++++++++++
3 files changed, 61 insertions(+), 13 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
index eb30fe80d..3b30b3c49 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java
@@ -81,8 +81,8 @@ public class CustomAlterTableParserListener extends
MySqlParserBaseListener {
TableId tableId =
parser.parseQualifiedTableId(ctx.tableName().fullId());
if (parser.databaseTables().forTable(tableId) == null) {
tableEditor = parser.databaseTables().editOrCreateTable(tableId);
- super.enterColumnCreateTable(ctx);
}
+ super.enterColumnCreateTable(ctx);
}
@Override
@@ -113,14 +113,17 @@ public class CustomAlterTableParserListener extends
MySqlParserBaseListener {
.collect(Collectors.toList()));
parser.databaseTables().overwriteTable(tableEditor.create());
parser.signalCreateTable(tableEditor.tableId(), ctx);
+
+ Schema.Builder builder = Schema.newBuilder();
+ tableEditor.columns().forEach(column ->
builder.column(toCdcColumn(column)));
+ if (tableEditor.hasPrimaryKey()) {
+
builder.primaryKey(tableEditor.primaryKeyColumnNames());
+ }
+ changes.add(
+ new CreateTableEvent(
+ toCdcTableId(tableEditor.tableId()),
builder.build()));
},
tableEditor);
- Schema.Builder builder = Schema.newBuilder();
- tableEditor.columns().forEach(column ->
builder.column(toCdcColumn(column)));
- if (tableEditor.hasPrimaryKey()) {
- builder.primaryKey(tableEditor.primaryKeyColumnNames());
- }
- changes.add(new CreateTableEvent(toCdcTableId(tableEditor.tableId()),
builder.build()));
super.exitColumnCreateTable(ctx);
}
@@ -133,7 +136,7 @@ public class CustomAlterTableParserListener extends
MySqlParserBaseListener {
if (columnDefinitionListener == null) {
columnDefinitionListener =
new CustomColumnDefinitionParserListener(
- columnEditor, parser, listeners);
+ tableEditor, columnEditor, parser,
listeners);
listeners.add(columnDefinitionListener);
} else {
columnDefinitionListener.setColumnEditor(columnEditor);
@@ -194,7 +197,8 @@ public class CustomAlterTableParserListener extends
MySqlParserBaseListener {
String columnName = parser.parseName(ctx.uid(0));
ColumnEditor columnEditor = Column.editor().name(columnName);
columnDefinitionListener =
- new CustomColumnDefinitionParserListener(columnEditor, parser,
listeners);
+ new CustomColumnDefinitionParserListener(
+ tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.exitAlterByAddColumn(ctx);
}
@@ -246,7 +250,8 @@ public class CustomAlterTableParserListener extends
MySqlParserBaseListener {
columnEditors.add(Column.editor().name(columnName));
}
columnDefinitionListener =
- new CustomColumnDefinitionParserListener(columnEditors.get(0),
parser, listeners);
+ new CustomColumnDefinitionParserListener(
+ tableEditor, columnEditors.get(0), parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByAddColumns(ctx);
}
@@ -296,7 +301,8 @@ public class CustomAlterTableParserListener extends
MySqlParserBaseListener {
columnEditor.unsetDefaultValueExpression();
columnDefinitionListener =
- new CustomColumnDefinitionParserListener(columnEditor, parser,
listeners);
+ new CustomColumnDefinitionParserListener(
+ tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByChangeColumn(ctx);
}
@@ -335,7 +341,8 @@ public class CustomAlterTableParserListener extends
MySqlParserBaseListener {
String oldColumnName = parser.parseName(ctx.oldColumn);
ColumnEditor columnEditor = Column.editor().name(oldColumnName);
columnDefinitionListener =
- new CustomColumnDefinitionParserListener(columnEditor, parser,
listeners);
+ new CustomColumnDefinitionParserListener(
+ tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByRenameColumn(ctx);
}
@@ -347,7 +354,8 @@ public class CustomAlterTableParserListener extends
MySqlParserBaseListener {
columnEditor.unsetDefaultValueExpression();
columnDefinitionListener =
- new CustomColumnDefinitionParserListener(columnEditor, parser,
listeners);
+ new CustomColumnDefinitionParserListener(
+ tableEditor, columnEditor, parser, listeners);
listeners.add(columnDefinitionListener);
super.enterAlterByModifyColumn(ctx);
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java
index 591836418..e886580b3 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java
@@ -25,6 +25,7 @@ import io.debezium.ddl.parser.mysql.generated.MySqlParser;
import io.debezium.ddl.parser.mysql.generated.MySqlParserBaseListener;
import io.debezium.relational.Column;
import io.debezium.relational.ColumnEditor;
+import io.debezium.relational.TableEditor;
import io.debezium.relational.ddl.DataType;
import io.debezium.util.Strings;
import org.antlr.v4.runtime.tree.ParseTreeListener;
@@ -50,13 +51,16 @@ public class CustomColumnDefinitionParserListener extends
MySqlParserBaseListene
private boolean uniqueColumn;
private AtomicReference<Boolean> optionalColumn = new AtomicReference<>();
private DefaultValueParserListener defaultValueListener;
+ private final TableEditor tableEditor;
private final List<ParseTreeListener> listeners;
public CustomColumnDefinitionParserListener(
+ TableEditor tableEditor,
ColumnEditor columnEditor,
MySqlAntlrDdlParser parser,
List<ParseTreeListener> listeners) {
+ this.tableEditor = tableEditor;
this.columnEditor = columnEditor;
this.parser = parser;
this.dataTypeResolver = parser.dataTypeResolver();
@@ -106,6 +110,8 @@ public class CustomColumnDefinitionParserListener extends
MySqlParserBaseListene
// this rule will be parsed only if no primary key is set in a table
// otherwise the statement can't be executed due to multiple primary
key error
optionalColumn.set(Boolean.FALSE);
+ tableEditor.addColumn(columnEditor.create());
+ tableEditor.setPrimaryKeyNames(columnEditor.name());
super.enterPrimaryKeyColumnConstraint(ctx);
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
index 09b2a308a..82f7b5e7b 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java
@@ -648,6 +648,40 @@ public class MySqlPipelineITCase extends
MySqlSourceTestBase {
.physicalColumn("timestamp6_c",
DataTypes.TIMESTAMP_LTZ(6))
.primaryKey("id")
.build()));
+
+ // Test create table DDL with inline primary key
+ statement.execute(
+ String.format(
+ "CREATE TABLE `%s`.`newlyAddedTable2`(id SERIAL
PRIMARY KEY);",
+ inventoryDatabase.getDatabaseName()));
+ expected.add(
+ new CreateTableEvent(
+ TableId.tableId(
+ inventoryDatabase.getDatabaseName(),
"newlyAddedTable2"),
+ Schema.newBuilder()
+ .physicalColumn("id",
DataTypes.DECIMAL(20, 0).notNull())
+ .primaryKey("id")
+ .build()));
+
+ // Test create table DDL with multiple primary keys
+ statement.execute(
+ String.format(
+ "CREATE TABLE `%s`.`newlyAddedTable3`("
+ + "id SERIAL,"
+ + "name VARCHAR(17),"
+ + "notes TEXT,"
+ + "PRIMARY KEY (id, name));",
+ inventoryDatabase.getDatabaseName()));
+ expected.add(
+ new CreateTableEvent(
+ TableId.tableId(
+ inventoryDatabase.getDatabaseName(),
"newlyAddedTable3"),
+ Schema.newBuilder()
+ .physicalColumn("id",
DataTypes.DECIMAL(20, 0).notNull())
+ .physicalColumn("name",
DataTypes.VARCHAR(17).notNull())
+ .physicalColumn("notes",
DataTypes.STRING())
+ .primaryKey("id", "name")
+ .build()));
}
List<Event> actual = fetchResults(events, expected.size());
assertEqualsInAnyOrder(