This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 570faa7a0 [FLINK-38247][MySQL] Handle BIGINT UNSIGNED overflow 
correctly in PreparedStatement. (#4117)
570faa7a0 is described below

commit 570faa7a01339ce01dc8b9efc243a3d9066727e4
Author: suhwan <[email protected]>
AuthorDate: Mon Mar 9 15:52:38 2026 +0900

    [FLINK-38247][MySQL] Handle BIGINT UNSIGNED overflow correctly in 
PreparedStatement. (#4117)
---
 .../mysql/source/utils/StatementUtils.java         |  36 +++++--
 .../connectors/mysql/source/MySqlSourceITCase.java |  94 +++++++++++++++++
 .../mysql/source/utils/StatementUtilsTest.java     | 113 +++++++++++++++++++++
 3 files changed, 235 insertions(+), 8 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java
index e538c325d..c910cbfdc 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java
@@ -22,6 +22,8 @@ import org.apache.flink.table.types.logical.RowType;
 import io.debezium.jdbc.JdbcConnection;
 import io.debezium.relational.TableId;
 
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
@@ -76,6 +78,24 @@ public class StatementUtils {
                 });
     }
 
+    // PreparedStatement#setObject method will be converted to long type when 
handling bigint
+    // unsigned, which poses a data overflow issue for values exceeding 
Long.MAX_VALUE.
+    // Therefore, we need to convert to BigDecimal when the value is outside 
the long range
+    public static void setSafeObject(PreparedStatement ps, int parameterIndex, 
Object value)
+            throws SQLException {
+        if (value instanceof BigInteger) {
+            BigInteger bigIntValue = (BigInteger) value;
+            if (bigIntValue.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) > 0
+                    || 
bigIntValue.compareTo(BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
+                ps.setBigDecimal(parameterIndex, new BigDecimal(bigIntValue));
+            } else {
+                ps.setObject(parameterIndex, bigIntValue.longValueExact());
+            }
+        } else {
+            ps.setObject(parameterIndex, value);
+        }
+    }
+
     public static Object queryMin(
             JdbcConnection jdbc, TableId tableId, String columnName, Object 
excludedLowerBound)
             throws SQLException {
@@ -85,7 +105,7 @@ public class StatementUtils {
                         quote(columnName), quote(tableId), quote(columnName));
         return jdbc.prepareQueryAndMap(
                 minQuery,
-                ps -> ps.setObject(1, excludedLowerBound),
+                ps -> setSafeObject(ps, 1, excludedLowerBound),
                 rs -> {
                     if (!rs.next()) {
                         // this should never happen
@@ -118,7 +138,7 @@ public class StatementUtils {
                         chunkSize);
         return jdbc.prepareQueryAndMap(
                 query,
-                ps -> ps.setObject(1, includedLowerBound),
+                ps -> setSafeObject(ps, 1, includedLowerBound),
                 rs -> {
                     if (!rs.next()) {
                         // this should never happen
@@ -204,18 +224,18 @@ public class StatementUtils {
             }
             if (isFirstSplit) {
                 for (int i = 0; i < primaryKeyNum; i++) {
-                    statement.setObject(i + 1, splitEnd[i]);
-                    statement.setObject(i + 1 + primaryKeyNum, splitEnd[i]);
+                    setSafeObject(statement, i + 1, splitEnd[i]);
+                    setSafeObject(statement, i + 1 + primaryKeyNum, 
splitEnd[i]);
                 }
             } else if (isLastSplit) {
                 for (int i = 0; i < primaryKeyNum; i++) {
-                    statement.setObject(i + 1, splitStart[i]);
+                    setSafeObject(statement, i + 1, splitStart[i]);
                 }
             } else {
                 for (int i = 0; i < primaryKeyNum; i++) {
-                    statement.setObject(i + 1, splitStart[i]);
-                    statement.setObject(i + 1 + primaryKeyNum, splitEnd[i]);
-                    statement.setObject(i + 1 + 2 * primaryKeyNum, 
splitEnd[i]);
+                    setSafeObject(statement, i + 1, splitStart[i]);
+                    setSafeObject(statement, i + 1 + primaryKeyNum, 
splitEnd[i]);
+                    setSafeObject(statement, i + 1 + 2 * primaryKeyNum, 
splitEnd[i]);
                 }
             }
             return statement;
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java
index 3c7f9d141..8f48ed35b 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java
@@ -56,6 +56,7 @@ import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.DataType;
@@ -1447,6 +1448,99 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
         }
     }
 
+    @Test
+    void testUnsignedBigintPrimaryKeyChunking() throws Exception {
+        customDatabase.createAndInitialize();
+
+        String db = customDatabase.getDatabaseName();
+        String table = "unsigned_bigint_pk";
+        try (MySqlConnection connection = getConnection()) {
+            connection.setAutoCommit(false);
+            String createSql =
+                    String.format(
+                            "CREATE TABLE %s.%s (\n"
+                                    + "  `order_id` BIGINT UNSIGNED NOT 
NULL,\n"
+                                    + "  `desc` VARCHAR(512) NOT NULL,\n"
+                                    + "  PRIMARY KEY (`order_id`)\n"
+                                    + ") ENGINE=InnoDB DEFAULT CHARSET=utf8;",
+                            StatementUtils.quote(db), 
StatementUtils.quote(table));
+            // Insert sample data including values near UNSIGNED BIGINT max
+            String insertSql =
+                    String.format(
+                            "INSERT INTO %s.%s (`order_id`, `desc`) VALUES "
+                                    + "(1, 'flink'),(2, 'flink'),(3, 
'flink'),(4, 'flink'),(5, 'flink'),"
+                                    + "(6, 'flink'),(7, 'flink'),(8, 
'flink'),(9, 'flink'),(10, 'flink'),"
+                                    + "(11, 'flink'),(12, 'flink'),"
+                                    + "(18446744073709551604, 
'flink'),(18446744073709551605, 'flink'),"
+                                    + "(18446744073709551606, 
'flink'),(18446744073709551607, 'flink'),"
+                                    + "(18446744073709551608, 
'flink'),(18446744073709551609, 'flink'),"
+                                    + "(18446744073709551610, 
'flink'),(18446744073709551611, 'flink'),"
+                                    + "(18446744073709551612, 
'flink'),(18446744073709551613, 'flink'),"
+                                    + "(18446744073709551614, 
'flink'),(18446744073709551615, 'flink');",
+                            StatementUtils.quote(db), 
StatementUtils.quote(table));
+            // Drop if exists to be idempotent across runs, then create and 
insert
+            connection.execute(
+                    String.format(
+                            "DROP TABLE IF EXISTS %s.%s;",
+                            StatementUtils.quote(db), 
StatementUtils.quote(table)),
+                    createSql,
+                    insertSql);
+            connection.commit();
+        }
+
+        // Build a source reading only the unsigned_bigint_pk table
+        DataType dataType =
+                DataTypes.ROW(
+                        DataTypes.FIELD("order_id", DataTypes.DECIMAL(20, 0)),
+                        DataTypes.FIELD("desc", DataTypes.STRING()));
+        LogicalType logicalType = 
TypeConversions.fromDataToLogicalType(dataType);
+        InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType);
+        RowDataDebeziumDeserializeSchema deserializer =
+                RowDataDebeziumDeserializeSchema.newBuilder()
+                        .setPhysicalRowType((RowType) 
dataType.getLogicalType())
+                        .setResultTypeInfo(typeInfo)
+                        .build();
+
+        MySqlSource<RowData> source =
+                MySqlSource.<RowData>builder()
+                        .hostname(MYSQL_CONTAINER.getHost())
+                        .port(MYSQL_CONTAINER.getDatabasePort())
+                        .username(customDatabase.getUsername())
+                        .password(customDatabase.getPassword())
+                        .serverTimeZone("UTC")
+                        .databaseList(db)
+                        .tableList(db + "." + table)
+                        .deserializer(deserializer)
+                        .startupOptions(StartupOptions.initial())
+                        .chunkKeyColumn(new ObjectPath(db, table), "order_id")
+                        .splitSize(2)
+                        .build();
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        try (CloseableIterator<RowData> it =
+                env.fromSource(source, WatermarkStrategy.noWatermarks(), 
"MySQL CDC Source")
+                        .executeAndCollect()) {
+            // Expect 24 records as inserted above
+            List<String> result = fetchRowData(it, 24, 
this::stringifyUnsignedPkRow);
+            // Validate a couple of boundary values exist to ensure chunking 
across unsigned range
+            // works
+            assertThat(result)
+                    .contains(
+                            "+I[1, flink]",
+                            "+I[12, flink]",
+                            "+I[18446744073709551604, flink]",
+                            "+I[18446744073709551615, flink]");
+        }
+    }
+
+    private String stringifyUnsignedPkRow(RowData row) {
+        DecimalData decimal = row.getDecimal(0, 20, 0);
+        String orderId = decimal.toBigDecimal().toPlainString();
+        String desc = row.getString(1).toString();
+        return "+I[" + orderId + ", " + desc + "]";
+    }
+
     /**
      * A {@link DebeziumDeserializationSchema} implementation which sleep 
given milliseconds after
      * deserialize per record, this class is designed for test.
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtilsTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtilsTest.java
new file mode 100644
index 000000000..5272b1475
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtilsTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source.utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Proxy;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for {@link 
org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils}. */
+class StatementUtilsTest {
+
+    @Test
+    void testSetSafeObjectConvertsBigIntegerToBigDecimal() throws SQLException 
{
+        Map<String, Object> invocationDetails = new HashMap<>();
+        PreparedStatement psProxy = 
createPreparedStatementProxy(invocationDetails);
+
+        // Create a BigInteger value that exceeds Long.MAX_VALUE
+        BigInteger bigIntValue = new BigInteger("9223372036854775808"); // 
Long.MAX_VALUE + 1
+        BigDecimal expectedBigDecimal = new BigDecimal(bigIntValue);
+
+        // Use the safe method
+        StatementUtils.setSafeObject(psProxy, 1, bigIntValue);
+
+        // Assert that it correctly used setBigDecimal with the converted 
BigDecimal value
+        
assertThat(invocationDetails.get("methodName")).isEqualTo("setBigDecimal");
+        
assertThat(invocationDetails.get("value")).isInstanceOf(BigDecimal.class);
+        
assertThat(invocationDetails.get("value")).isEqualTo(expectedBigDecimal);
+    }
+
+    @Test
+    void testSetSafeObjectHandlesLargeBigIntegerValues() throws SQLException {
+        Map<String, Object> invocationDetails = new HashMap<>();
+        PreparedStatement psProxy = 
createPreparedStatementProxy(invocationDetails);
+
+        // Test with BIGINT UNSIGNED max value
+        BigInteger maxUnsignedBigInt = new BigInteger("18446744073709551615"); 
// 2^64 - 1
+        BigDecimal expectedBigDecimal = new BigDecimal(maxUnsignedBigInt);
+
+        StatementUtils.setSafeObject(psProxy, 1, maxUnsignedBigInt);
+
+        
assertThat(invocationDetails.get("methodName")).isEqualTo("setBigDecimal");
+        
assertThat(invocationDetails.get("value")).isEqualTo(expectedBigDecimal);
+    }
+
+    @Test
+    void testSetSafeObjectHandlesRegularValues() throws SQLException {
+        Map<String, Object> invocationDetails = new HashMap<>();
+        PreparedStatement psProxy = 
createPreparedStatementProxy(invocationDetails);
+
+        // Test with a common Long
+        StatementUtils.setSafeObject(psProxy, 1, 123L);
+        assertThat(invocationDetails.get("methodName")).isEqualTo("setObject");
+        assertThat(invocationDetails.get("value")).isEqualTo(123L);
+        invocationDetails.clear();
+
+        // Test with a String
+        StatementUtils.setSafeObject(psProxy, 2, "test");
+        assertThat(invocationDetails.get("methodName")).isEqualTo("setObject");
+        assertThat(invocationDetails.get("value")).isEqualTo("test");
+        invocationDetails.clear();
+
+        // Test with an Integer
+        StatementUtils.setSafeObject(psProxy, 3, 456);
+        assertThat(invocationDetails.get("methodName")).isEqualTo("setObject");
+        assertThat(invocationDetails.get("value")).isEqualTo(456);
+        invocationDetails.clear();
+
+        // Test with null
+        StatementUtils.setSafeObject(psProxy, 4, null);
+        assertThat(invocationDetails.get("methodName")).isEqualTo("setObject");
+        assertThat(invocationDetails.get("value")).isNull();
+    }
+
+    private PreparedStatement createPreparedStatementProxy(Map<String, Object> 
invocationDetails) {
+        return (PreparedStatement)
+                Proxy.newProxyInstance(
+                        StatementUtilsTest.class.getClassLoader(),
+                        new Class<?>[] {PreparedStatement.class},
+                        (proxy, method, args) -> {
+                            String methodName = method.getName();
+                            if (methodName.equals("setObject")
+                                    || methodName.equals("setBigDecimal")) {
+                                invocationDetails.put("methodName", 
methodName);
+                                invocationDetails.put("parameterIndex", 
args[0]);
+                                invocationDetails.put("value", args[1]);
+                            }
+                            return null;
+                        });
+    }
+}

Reply via email to