lvyanquan commented on code in PR #4117:
URL: https://github.com/apache/flink-cdc/pull/4117#discussion_r2366654886


##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java:
##########
@@ -1447,6 +1448,98 @@ private boolean hasNextData(final CloseableIterator<?> 
iterator)
         }
     }
 
+    @Test
+    void testUnsignedBigintPrimaryKeyChunking() throws Exception {
+        customDatabase.createAndInitialize();
+
+        String db = customDatabase.getDatabaseName();
+        String table = "unsigned_bigint_pk";
+        try (MySqlConnection connection = getConnection()) {
+            connection.setAutoCommit(false);
+            String createSql =
+                    String.format(
+                            "CREATE TABLE %s.%s (\n"
+                                    + "  `order_id` BIGINT UNSIGNED NOT 
NULL,\n"
+                                    + "  `desc` VARCHAR(512) NOT NULL,\n"
+                                    + "  PRIMARY KEY (`order_id`)\n"
+                                    + ") ENGINE=InnoDB DEFAULT CHARSET=utf8;",
+                            StatementUtils.quote(db), 
StatementUtils.quote(table));
+            // Insert sample data including values near UNSIGNED BIGINT max
+            String insertSql =
+                    String.format(
+                            "INSERT INTO %s.%s (`order_id`, `desc`) VALUES "
+                                    + "(1, 'flink'),(2, 'flink'),(3, 
'flink'),(4, 'flink'),(5, 'flink'),"
+                                    + "(6, 'flink'),(7, 'flink'),(8, 
'flink'),(9, 'flink'),(10, 'flink'),"
+                                    + "(11, 'flink'),(12, 'flink'),"
+                                    + "(18446744073709551604, 
'flink'),(18446744073709551605, 'flink'),"
+                                    + "(18446744073709551606, 
'flink'),(18446744073709551607, 'flink'),"
+                                    + "(18446744073709551608, 
'flink'),(18446744073709551609, 'flink'),"
+                                    + "(18446744073709551610, 
'flink'),(18446744073709551611, 'flink'),"
+                                    + "(18446744073709551612, 
'flink'),(18446744073709551613, 'flink'),"
+                                    + "(18446744073709551614, 
'flink'),(18446744073709551615, 'flink');",
+                            StatementUtils.quote(db), 
StatementUtils.quote(table));
+            // Drop if exists to be idempotent across runs, then create and 
insert
+            connection.execute(
+                    String.format(
+                            "DROP TABLE IF EXISTS %s.%s;",
+                            StatementUtils.quote(db), 
StatementUtils.quote(table)),
+                    createSql,
+                    insertSql);
+            connection.commit();
+        }
+
+        // Build a source reading only the unsigned_bigint_pk table
+        DataType dataType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("order_id", DataTypes.DECIMAL(20, 0)),
+                        DataTypes.FIELD("desc", DataTypes.STRING()));
+        LogicalType logicalType = 
TypeConversions.fromDataToLogicalType(dataType);
+        InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType);
+        RowDataDebeziumDeserializeSchema deserializer =
+                RowDataDebeziumDeserializeSchema.newBuilder()
+                        .setPhysicalRowType((RowType) 
dataType.getLogicalType())
+                        .setResultTypeInfo(typeInfo)
+                        .build();
+
+        MySqlSource<RowData> source =
+                MySqlSource.<RowData>builder()
+                        .hostname(MYSQL_CONTAINER.getHost())
+                        .port(MYSQL_CONTAINER.getDatabasePort())
+                        .username(customDatabase.getUsername())
+                        .password(customDatabase.getPassword())
+                        .serverTimeZone("UTC")
+                        .databaseList(db)
+                        .tableList(db + "." + table)
+                        .deserializer(deserializer)
+                        .startupOptions(StartupOptions.initial())
+                        .chunkKeyColumn(new ObjectPath(db, table), "order_id")

Review Comment:
   You add `.splitSize(2)` here and you can set `rootLogger.level` in 
`log4j2-test.properties` to `INFO` to see the actual split information.
   like:
   ```
   
   gners.MySqlChunkSplitter - ChunkSplitter has split 2820 chunks for table 
customer_kgqlle.unsigned_bigint_pk
   72864 [snapshot-splitting] INFO  
org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use 
unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk 
size is 2 from 2
   72869 [snapshot-splitting] INFO  
org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use 
unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk 
size is 2 from 3
   72871 [snapshot-splitting] INFO  
org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use 
unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk 
size is 2 from 4
   72875 [snapshot-splitting] INFO  
org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use 
unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk 
size is 2 from 5
   72878 [snapshot-splitting] INFO  
org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use 
unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk 
size is 2 from 6
   72879 [snapshot-splitting] INFO  
org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use 
unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk 
size is 2 from 7
   72881 [snapshot-splitting] INFO  
org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use 
unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk 
size is 2 from 8
   72882 [snapshot-splitting] INFO  
org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use 
unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk 
size is 2 from 9
   72884 [snapshot-splitting] INFO  
org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use 
unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk 
size is 2 from 10
   72885 [snapshot-splitting] INFO  
org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use 
unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk 
size is 2 from 11
   72990 [snapshot-splitting] INFO  
org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - 
ChunkSplitter has split 2830 chunks for table customer_kgqlle.unsigned_bigint_pk
   72990 [snapshot-splitting] INFO  
org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use 
unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk 
size is 2 from 12
   72992 [snapshot-splitting] INFO  
org.apache.flink.cdc.connectors.mysql.source.assigners.MySqlChunkSplitter - Use 
unevenly-sized chunks for table customer_kgqlle.unsigned_bigint_pk, the chunk 
size is 2 from 18446744073709551604
   
   ```
   Which means that this problem was not resolved.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to