ruanhang1993 commented on code in PR #3608:
URL: https://github.com/apache/flink-cdc/pull/3608#discussion_r1914211831
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java:
##########
@@ -366,7 +369,7 @@ public void
exitAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx)
() -> {
Column column = columnDefinitionListener.getColumn();
Map<String, DataType> typeMapping = new HashMap<>();
- typeMapping.put(column.name(), fromDbzColumn(column));
+ typeMapping.put(column.name(), fromDbzColumn(column,
tinyInt1isBit));
Review Comment:
We should use fromDbzColumn(column, false) here.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java:
##########
@@ -315,7 +318,7 @@ public void
exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx)
String newColumnName = parser.parseName(ctx.newColumn);
Map<String, DataType> typeMapping = new HashMap<>();
- typeMapping.put(column.name(), fromDbzColumn(column));
+ typeMapping.put(column.name(), fromDbzColumn(column,
tinyInt1isBit));
Review Comment:
We should use `fromDbzColumn(column, false)` here.
`Alter xxx add column newcol BOOL;` will not be parsed as a tinyint type. We
should add some tests for bool and tinyint(1) types for alter sql.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java:
##########
@@ -413,7 +416,7 @@ public void exitDropTable(MySqlParser.DropTableContext ctx)
{
private org.apache.flink.cdc.common.schema.Column toCdcColumn(Column
dbzColumn) {
return org.apache.flink.cdc.common.schema.Column.physicalColumn(
dbzColumn.name(),
- fromDbzColumn(dbzColumn),
+ fromDbzColumn(dbzColumn, tinyInt1isBit),
Review Comment:
We should use fromDbzColumn(column, false) here.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java:
##########
@@ -607,6 +652,22 @@ public void testSchemaChangeEvents() throws Exception {
new AddColumnEvent.ColumnWithPosition(
Column.physicalColumn("newcol1",
DataTypes.INT())))));
+ // Add a TINYINT(1) column
+ statement.execute(
+ String.format(
+ "ALTER TABLE `%s`.`customers` ADD COLUMN
`new_tinyint1_col1` TINYINT(1) NULL;",
+ inventoryDatabase.getDatabaseName()));
+ expected.add(
+ new AddColumnEvent(
+
TableId.tableId(inventoryDatabase.getDatabaseName(), "customers"),
+ Collections.singletonList(
+ new AddColumnEvent.ColumnWithPosition(
+ Column.physicalColumn(
+ "new_tinyint1_col1",
+ tinyInt1isBit
+ ?
DataTypes.BOOLEAN()
+ :
DataTypes.TINYINT())))));
+
Review Comment:
Add a BOOL column in the relative tests.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java:
##########
@@ -129,14 +130,19 @@ public static Schema getTableSchema(
new MySqlSchema(sourceConfig, jdbc.isTableIdCaseSensitive())) {
TableChanges.TableChange tableSchema =
mySqlSchema.getTableSchema(partition, jdbc,
toDbzTableId(tableId));
- return toSchema(tableSchema.getTable());
+ boolean tinyInt1isBit =
+ Boolean.parseBoolean(
+ sourceConfig
Review Comment:
Add a method `getTinyInt1isBit` in MySqlSourceConfig to get the
tinyInt1isBit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]