lvyanquan commented on code in PR #4117: URL: https://github.com/apache/flink-cdc/pull/4117#discussion_r2366654886
########## flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java: ########## @@ -1447,6 +1448,98 @@ private boolean hasNextData(final CloseableIterator<?> iterator) } } + @Test + void testUnsignedBigintPrimaryKeyChunking() throws Exception { + customDatabase.createAndInitialize(); + + String db = customDatabase.getDatabaseName(); + String table = "unsigned_bigint_pk"; + try (MySqlConnection connection = getConnection()) { + connection.setAutoCommit(false); + String createSql = + String.format( + "CREATE TABLE %s.%s (\n" + + " `order_id` BIGINT UNSIGNED NOT NULL,\n" + + " `desc` VARCHAR(512) NOT NULL,\n" + + " PRIMARY KEY (`order_id`)\n" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8;", + StatementUtils.quote(db), StatementUtils.quote(table)); + // Insert sample data including values near UNSIGNED BIGINT max + String insertSql = + String.format( + "INSERT INTO %s.%s (`order_id`, `desc`) VALUES " + + "(1, 'flink'),(2, 'flink'),(3, 'flink'),(4, 'flink'),(5, 'flink')," + + "(6, 'flink'),(7, 'flink'),(8, 'flink'),(9, 'flink'),(10, 'flink')," + + "(11, 'flink'),(12, 'flink')," + + "(18446744073709551604, 'flink'),(18446744073709551605, 'flink')," + + "(18446744073709551606, 'flink'),(18446744073709551607, 'flink')," + + "(18446744073709551608, 'flink'),(18446744073709551609, 'flink')," + + "(18446744073709551610, 'flink'),(18446744073709551611, 'flink')," + + "(18446744073709551612, 'flink'),(18446744073709551613, 'flink')," + + "(18446744073709551614, 'flink'),(18446744073709551615, 'flink');", + StatementUtils.quote(db), StatementUtils.quote(table)); + // Drop if exists to be idempotent across runs, then create and insert + connection.execute( + String.format( + "DROP TABLE IF EXISTS %s.%s;", + StatementUtils.quote(db), StatementUtils.quote(table)), + createSql, + insertSql); + connection.commit(); + } + + // Build a source reading only the unsigned_bigint_pk table + DataType dataType = + DataTypes.ROW( + DataTypes.FIELD("order_id", DataTypes.DECIMAL(20, 0)), + DataTypes.FIELD("desc", DataTypes.STRING())); + LogicalType logicalType = TypeConversions.fromDataToLogicalType(dataType); + InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType); + RowDataDebeziumDeserializeSchema deserializer = + RowDataDebeziumDeserializeSchema.newBuilder() + .setPhysicalRowType((RowType) dataType.getLogicalType()) + .setResultTypeInfo(typeInfo) + .build(); + + MySqlSource<RowData> source = + MySqlSource.<RowData>builder() + .hostname(MYSQL_CONTAINER.getHost()) + .port(MYSQL_CONTAINER.getDatabasePort()) + .username(customDatabase.getUsername()) + .password(customDatabase.getPassword()) + .serverTimeZone("UTC") + .databaseList(db) + .tableList(db + "." + table) + .deserializer(deserializer) + .startupOptions(StartupOptions.initial()) + .chunkKeyColumn(new ObjectPath(db, table), "order_id") Review Comment: You add `.splitSize(2)` here and you can set `rootLogger.level` in `log4j2-test.properties` to `INFO` to see the actual split information. like: ``` gners.MySqlChunkSplitter - ChunkSplitter has split 2820 chunks for table customer_kgqlle.unsigned_bigint_pk 72864 [snapshot-splitting] INFO org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 2 72869 [snapshot-splitting] INFO org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 3 72871 [snapshot-splitting] INFO org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 4 72875 [snapshot-splitting] INFO org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 5 72878 [snapshot-splitting] INFO org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 6 72879 [snapshot-splitting] INFO org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 7 72881 [snapshot-splitting] INFO org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 8 72882 [snapshot-splitting] INFO org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 9 72884 [snapshot-splitting] INFO org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 10 72885 [snapshot-splitting] INFO org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 11 72990 [snapshot-splitting] INFO org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - ChunkSplitter has split 2830 chunks for table customer_kgqlle.unsigned_bigint_pk 72990 [snapshot-splitting] INFO org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 12 72992 [snapshot-splitting] INFO org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk size is 2 from 18446744073709551604 ``` Which means that this problem was not resolved. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org