This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 570faa7a0 [FLINK-38247][MySQL] Handle BIGINT UNSIGNED overflow
correctly in PreparedStatement. (#4117)
570faa7a0 is described below
commit 570faa7a01339ce01dc8b9efc243a3d9066727e4
Author: suhwan <[email protected]>
AuthorDate: Mon Mar 9 15:52:38 2026 +0900
[FLINK-38247][MySQL] Handle BIGINT UNSIGNED overflow correctly in
PreparedStatement. (#4117)
---
.../mysql/source/utils/StatementUtils.java | 36 +++++--
.../connectors/mysql/source/MySqlSourceITCase.java | 94 +++++++++++++++++
.../mysql/source/utils/StatementUtilsTest.java | 113 +++++++++++++++++++++
3 files changed, 235 insertions(+), 8 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java
index e538c325d..c910cbfdc 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtils.java
@@ -22,6 +22,8 @@ import org.apache.flink.table.types.logical.RowType;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
+import java.math.BigDecimal;
+import java.math.BigInteger;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
@@ -76,6 +78,24 @@ public class StatementUtils {
});
}
+ // PreparedStatement#setObject method will be converted to long type when
handling bigint
+ // unsigned, which poses a data overflow issue for values exceeding
Long.MAX_VALUE.
+ // Therefore, we need to convert to BigDecimal when the value is outside
the long range
+ public static void setSafeObject(PreparedStatement ps, int parameterIndex,
Object value)
+ throws SQLException {
+ if (value instanceof BigInteger) {
+ BigInteger bigIntValue = (BigInteger) value;
+ if (bigIntValue.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) > 0
+ ||
bigIntValue.compareTo(BigInteger.valueOf(Long.MIN_VALUE)) < 0) {
+ ps.setBigDecimal(parameterIndex, new BigDecimal(bigIntValue));
+ } else {
+ ps.setObject(parameterIndex, bigIntValue.longValueExact());
+ }
+ } else {
+ ps.setObject(parameterIndex, value);
+ }
+ }
+
public static Object queryMin(
JdbcConnection jdbc, TableId tableId, String columnName, Object
excludedLowerBound)
throws SQLException {
@@ -85,7 +105,7 @@ public class StatementUtils {
quote(columnName), quote(tableId), quote(columnName));
return jdbc.prepareQueryAndMap(
minQuery,
- ps -> ps.setObject(1, excludedLowerBound),
+ ps -> setSafeObject(ps, 1, excludedLowerBound),
rs -> {
if (!rs.next()) {
// this should never happen
@@ -118,7 +138,7 @@ public class StatementUtils {
chunkSize);
return jdbc.prepareQueryAndMap(
query,
- ps -> ps.setObject(1, includedLowerBound),
+ ps -> setSafeObject(ps, 1, includedLowerBound),
rs -> {
if (!rs.next()) {
// this should never happen
@@ -204,18 +224,18 @@ public class StatementUtils {
}
if (isFirstSplit) {
for (int i = 0; i < primaryKeyNum; i++) {
- statement.setObject(i + 1, splitEnd[i]);
- statement.setObject(i + 1 + primaryKeyNum, splitEnd[i]);
+ setSafeObject(statement, i + 1, splitEnd[i]);
+ setSafeObject(statement, i + 1 + primaryKeyNum,
splitEnd[i]);
}
} else if (isLastSplit) {
for (int i = 0; i < primaryKeyNum; i++) {
- statement.setObject(i + 1, splitStart[i]);
+ setSafeObject(statement, i + 1, splitStart[i]);
}
} else {
for (int i = 0; i < primaryKeyNum; i++) {
- statement.setObject(i + 1, splitStart[i]);
- statement.setObject(i + 1 + primaryKeyNum, splitEnd[i]);
- statement.setObject(i + 1 + 2 * primaryKeyNum,
splitEnd[i]);
+ setSafeObject(statement, i + 1, splitStart[i]);
+ setSafeObject(statement, i + 1 + primaryKeyNum,
splitEnd[i]);
+ setSafeObject(statement, i + 1 + 2 * primaryKeyNum,
splitEnd[i]);
}
}
return statement;
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java
index 3c7f9d141..8f48ed35b 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.java
@@ -56,6 +56,7 @@ import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
@@ -1447,6 +1448,99 @@ class MySqlSourceITCase extends MySqlSourceTestBase {
}
}
+ @Test
+ void testUnsignedBigintPrimaryKeyChunking() throws Exception {
+ customDatabase.createAndInitialize();
+
+ String db = customDatabase.getDatabaseName();
+ String table = "unsigned_bigint_pk";
+ try (MySqlConnection connection = getConnection()) {
+ connection.setAutoCommit(false);
+ String createSql =
+ String.format(
+ "CREATE TABLE %s.%s (\n"
+ + " `order_id` BIGINT UNSIGNED NOT
NULL,\n"
+ + " `desc` VARCHAR(512) NOT NULL,\n"
+ + " PRIMARY KEY (`order_id`)\n"
+ + ") ENGINE=InnoDB DEFAULT CHARSET=utf8;",
+ StatementUtils.quote(db),
StatementUtils.quote(table));
+ // Insert sample data including values near UNSIGNED BIGINT max
+ String insertSql =
+ String.format(
+ "INSERT INTO %s.%s (`order_id`, `desc`) VALUES "
+ + "(1, 'flink'),(2, 'flink'),(3,
'flink'),(4, 'flink'),(5, 'flink'),"
+ + "(6, 'flink'),(7, 'flink'),(8,
'flink'),(9, 'flink'),(10, 'flink'),"
+ + "(11, 'flink'),(12, 'flink'),"
+ + "(18446744073709551604,
'flink'),(18446744073709551605, 'flink'),"
+ + "(18446744073709551606,
'flink'),(18446744073709551607, 'flink'),"
+ + "(18446744073709551608,
'flink'),(18446744073709551609, 'flink'),"
+ + "(18446744073709551610,
'flink'),(18446744073709551611, 'flink'),"
+ + "(18446744073709551612,
'flink'),(18446744073709551613, 'flink'),"
+ + "(18446744073709551614,
'flink'),(18446744073709551615, 'flink');",
+ StatementUtils.quote(db),
StatementUtils.quote(table));
+ // Drop if exists to be idempotent across runs, then create and
insert
+ connection.execute(
+ String.format(
+ "DROP TABLE IF EXISTS %s.%s;",
+ StatementUtils.quote(db),
StatementUtils.quote(table)),
+ createSql,
+ insertSql);
+ connection.commit();
+ }
+
+ // Build a source reading only the unsigned_bigint_pk table
+ DataType dataType =
+ DataTypes.ROW(
+ DataTypes.FIELD("order_id", DataTypes.DECIMAL(20, 0)),
+ DataTypes.FIELD("desc", DataTypes.STRING()));
+ LogicalType logicalType =
TypeConversions.fromDataToLogicalType(dataType);
+ InternalTypeInfo<RowData> typeInfo = InternalTypeInfo.of(logicalType);
+ RowDataDebeziumDeserializeSchema deserializer =
+ RowDataDebeziumDeserializeSchema.newBuilder()
+ .setPhysicalRowType((RowType)
dataType.getLogicalType())
+ .setResultTypeInfo(typeInfo)
+ .build();
+
+ MySqlSource<RowData> source =
+ MySqlSource.<RowData>builder()
+ .hostname(MYSQL_CONTAINER.getHost())
+ .port(MYSQL_CONTAINER.getDatabasePort())
+ .username(customDatabase.getUsername())
+ .password(customDatabase.getPassword())
+ .serverTimeZone("UTC")
+ .databaseList(db)
+ .tableList(db + "." + table)
+ .deserializer(deserializer)
+ .startupOptions(StartupOptions.initial())
+ .chunkKeyColumn(new ObjectPath(db, table), "order_id")
+ .splitSize(2)
+ .build();
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ try (CloseableIterator<RowData> it =
+ env.fromSource(source, WatermarkStrategy.noWatermarks(),
"MySQL CDC Source")
+ .executeAndCollect()) {
+ // Expect 24 records as inserted above
+ List<String> result = fetchRowData(it, 24,
this::stringifyUnsignedPkRow);
+ // Validate a couple of boundary values exist to ensure chunking
across unsigned range
+ // works
+ assertThat(result)
+ .contains(
+ "+I[1, flink]",
+ "+I[12, flink]",
+ "+I[18446744073709551604, flink]",
+ "+I[18446744073709551615, flink]");
+ }
+ }
+
+ private String stringifyUnsignedPkRow(RowData row) {
+ DecimalData decimal = row.getDecimal(0, 20, 0);
+ String orderId = decimal.toBigDecimal().toPlainString();
+ String desc = row.getString(1).toString();
+ return "+I[" + orderId + ", " + desc + "]";
+ }
+
/**
* A {@link DebeziumDeserializationSchema} implementation which sleep
given milliseconds after
* deserialize per record, this class is designed for test.
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtilsTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtilsTest.java
new file mode 100644
index 000000000..5272b1475
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/utils/StatementUtilsTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source.utils;
+
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Proxy;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for {@link
org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils}. */
+class StatementUtilsTest {
+
+ @Test
+ void testSetSafeObjectConvertsBigIntegerToBigDecimal() throws SQLException
{
+ Map<String, Object> invocationDetails = new HashMap<>();
+ PreparedStatement psProxy =
createPreparedStatementProxy(invocationDetails);
+
+ // Create a BigInteger value that exceeds Long.MAX_VALUE
+ BigInteger bigIntValue = new BigInteger("9223372036854775808"); //
Long.MAX_VALUE + 1
+ BigDecimal expectedBigDecimal = new BigDecimal(bigIntValue);
+
+ // Use the safe method
+ StatementUtils.setSafeObject(psProxy, 1, bigIntValue);
+
+ // Assert that it correctly used setBigDecimal with the converted
BigDecimal value
+
assertThat(invocationDetails.get("methodName")).isEqualTo("setBigDecimal");
+
assertThat(invocationDetails.get("value")).isInstanceOf(BigDecimal.class);
+
assertThat(invocationDetails.get("value")).isEqualTo(expectedBigDecimal);
+ }
+
+ @Test
+ void testSetSafeObjectHandlesLargeBigIntegerValues() throws SQLException {
+ Map<String, Object> invocationDetails = new HashMap<>();
+ PreparedStatement psProxy =
createPreparedStatementProxy(invocationDetails);
+
+ // Test with BIGINT UNSIGNED max value
+ BigInteger maxUnsignedBigInt = new BigInteger("18446744073709551615");
// 2^64 - 1
+ BigDecimal expectedBigDecimal = new BigDecimal(maxUnsignedBigInt);
+
+ StatementUtils.setSafeObject(psProxy, 1, maxUnsignedBigInt);
+
+
assertThat(invocationDetails.get("methodName")).isEqualTo("setBigDecimal");
+
assertThat(invocationDetails.get("value")).isEqualTo(expectedBigDecimal);
+ }
+
+ @Test
+ void testSetSafeObjectHandlesRegularValues() throws SQLException {
+ Map<String, Object> invocationDetails = new HashMap<>();
+ PreparedStatement psProxy =
createPreparedStatementProxy(invocationDetails);
+
+ // Test with a common Long
+ StatementUtils.setSafeObject(psProxy, 1, 123L);
+ assertThat(invocationDetails.get("methodName")).isEqualTo("setObject");
+ assertThat(invocationDetails.get("value")).isEqualTo(123L);
+ invocationDetails.clear();
+
+ // Test with a String
+ StatementUtils.setSafeObject(psProxy, 2, "test");
+ assertThat(invocationDetails.get("methodName")).isEqualTo("setObject");
+ assertThat(invocationDetails.get("value")).isEqualTo("test");
+ invocationDetails.clear();
+
+ // Test with an Integer
+ StatementUtils.setSafeObject(psProxy, 3, 456);
+ assertThat(invocationDetails.get("methodName")).isEqualTo("setObject");
+ assertThat(invocationDetails.get("value")).isEqualTo(456);
+ invocationDetails.clear();
+
+ // Test with null
+ StatementUtils.setSafeObject(psProxy, 4, null);
+ assertThat(invocationDetails.get("methodName")).isEqualTo("setObject");
+ assertThat(invocationDetails.get("value")).isNull();
+ }
+
+ private PreparedStatement createPreparedStatementProxy(Map<String, Object>
invocationDetails) {
+ return (PreparedStatement)
+ Proxy.newProxyInstance(
+ StatementUtilsTest.class.getClassLoader(),
+ new Class<?>[] {PreparedStatement.class},
+ (proxy, method, args) -> {
+ String methodName = method.getName();
+ if (methodName.equals("setObject")
+ || methodName.equals("setBigDecimal")) {
+ invocationDetails.put("methodName",
methodName);
+ invocationDetails.put("parameterIndex",
args[0]);
+ invocationDetails.put("value", args[1]);
+ }
+ return null;
+ });
+ }
+}