ruanhang1993 commented on code in PR #3608:
URL: https://github.com/apache/flink-cdc/pull/3608#discussion_r1833670840
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java:
##########
@@ -574,9 +576,12 @@ private List<String>
getTestAssignSnapshotSplitsFromCheckpoint(AssignerStatus as
List<TableId> alreadyProcessedTables = new ArrayList<>();
alreadyProcessedTables.add(processedTable);
+ Properties jdbcProperties = new Properties();
+ jdbcProperties.put(PropertyKey.tinyInt1isBit.getKeyName(), "true");
RowType splitKeyType =
ChunkUtils.getChunkKeyColumnType(
-
Column.editor().name("id").type("INT").jdbcType(4).create());
+
Column.editor().name("id").type("INT").jdbcType(4).create(),
+ jdbcProperties);
Review Comment:
Please add some other tests for this part. A table with boolean +
tinyint(1) + tinyint(2) columns will behavior differently with tinyInt1isBit =
false or true.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java:
##########
@@ -45,7 +45,9 @@ public MySqlDataSource(MySqlSourceConfigFactory
configFactory) {
public EventSourceProvider getEventSourceProvider() {
MySqlEventDeserializer deserializer =
new MySqlEventDeserializer(
- DebeziumChangelogMode.ALL,
sourceConfig.isIncludeSchemaChanges());
+ DebeziumChangelogMode.ALL,
+ sourceConfig.isIncludeSchemaChanges(),
+ sourceConfig.getJdbcProperties());
Review Comment:
+ 1
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java:
##########
@@ -315,7 +319,7 @@ public void
exitAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx)
String newColumnName = parser.parseName(ctx.newColumn);
Map<String, DataType> typeMapping = new HashMap<>();
- typeMapping.put(column.name(), fromDbzColumn(column));
+ typeMapping.put(column.name(), fromDbzColumn(column,
jdbcProperties));
Review Comment:
I think we can not convert the tinyint(1) to boolean when it is from the
binlog.
A added tinyint(1) column can be parsed right from the binlog. If it is
changed here, the sink is impossible to add a tinyint(1) column.
A added boolean column can be parsed right as boolean from the binlog.
Please test it and add some tests to cover this alter sql.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]