This is an automated email from the ASF dual-hosted git repository.
davidzollo pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 902671729e [Fix][Connector-V2][CDC-MySQL] Honor int_type_narrowing
option on MyS… (#11004)
902671729e is described below
commit 902671729e75f41335c8cb858ebdddc4d4fdf003
Author: Ricky Makhija <[email protected]>
AuthorDate: Fri Jun 5 12:51:25 2026 +0530
[Fix][Connector-V2][CDC-MySQL] Honor int_type_narrowing option on MyS…
(#11004)
Signed-off-by: ricky2129 <[email protected]>
---
.../cdc/mysql/config/MySqlSourceConfigFactory.java | 6 ++
.../cdc/mysql/source/MySqlIncrementalSource.java | 13 +++
.../seatunnel/cdc/mysql/utils/MySqlTypeUtils.java | 25 +++++-
.../MySqlSourceConfigFactorySerializationTest.java | 92 ++++++++++++++++++++++
.../utils/MySqlTypeUtilsIntTypeNarrowingTest.java | 84 ++++++++++++++++++++
5 files changed, 219 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
index 97079da6c2..b9aa542f1e 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
@@ -87,6 +87,12 @@ public class MySqlSourceConfigFactory extends
JdbcSourceConfigFactory {
// but it'll cause lose of precise when the value is larger than 2^63,
// so use "precise" mode to avoid it.
props.put("bigint.unsigned.handling.mode", "precise");
+ // default int_type_narrowing (tinyint(1) -> boolean); the Source /
user debezium block
+ // overrides this via the dbzProperties merge below so MySqlTypeUtils
can honor a false.
+ // Carried as a property, NOT a field, on purpose: adding a
field/method to this
+ // Serializable factory drifts its serialVersionUID and breaks rolling
upgrades (jobs
+ // submitted on the prior version fail to deserialize on the new one).
+ props.setProperty("int_type_narrowing", String.valueOf(true));
if (serverIdRange != null) {
props.setProperty("database.server.id.range",
String.valueOf(serverIdRange));
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index 665d907e3f..4a7cb09c9b 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -54,6 +54,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Properties;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -85,6 +86,18 @@ public class MySqlIncrementalSource<T> extends
IncrementalSource<T, JdbcSourceCo
MySqlSourceConfigFactory configFactory = new
MySqlSourceConfigFactory();
configFactory.serverId(config.get(JdbcSourceOptions.SERVER_ID));
configFactory.fromReadonlyConfig(readonlyConfig);
+ // Carry int_type_narrowing through the debezium properties map rather
than a factory field.
+ // Adding a field/method to the Serializable MySqlSourceConfigFactory
drifts its
+ // serialVersionUID and breaks rolling upgrades (jobs submitted on the
prior version fail to
+ // deserialize). This runs after fromReadonlyConfig (which resets
dbzProperties from the
+ // user
+ // debezium block), re-merging those user props then appending
int_type_narrowing.
+ Properties dbzProperties = new Properties();
+
config.getOptional(JdbcSourceOptions.DEBEZIUM_PROPERTIES).ifPresent(dbzProperties::putAll);
+ dbzProperties.setProperty(
+ "int_type_narrowing",
+
String.valueOf(config.get(JdbcCommonOptions.INT_TYPE_NARROWING)));
+ configFactory.debeziumProperties(dbzProperties);
JdbcUrlUtil.UrlInfo urlInfo =
JdbcUrlUtil.getUrlInfo(config.get(JdbcCommonOptions.URL));
configFactory.originUrl(urlInfo.getOrigin());
configFactory.hostname(urlInfo.getHost());
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
index 2ab3524270..2ff84a440b 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
@@ -21,6 +21,7 @@ import
org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
+import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlVersion;
import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.DefaultValueUtils;
import io.debezium.connector.mysql.MySqlConnectorConfig;
@@ -36,6 +37,14 @@ import java.util.Optional;
@Slf4j
public class MySqlTypeUtils {
+ /**
+ * Converter used when {@code int_type_narrowing=false}: identical to
{@link
+ * MySqlTypeConverter#DEFAULT_INSTANCE} except narrowing is off, so {@code
tinyint(1)} stays
+ * TINYINT (byte) instead of becoming BOOLEAN. Pre-built singleton (no
per-column allocation).
+ */
+ private static final MySqlTypeConverter NO_INT_NARROWING_CONVERTER =
+ new MySqlTypeConverter(MySqlVersion.V_5_7, false);
+
public static SeaTunnelDataType<?> convertFromColumn(
Column column, RelationalDatabaseConnectorConfig
dbzConnectorConfig) {
return convertToSeaTunnelColumn(column,
dbzConnectorConfig).getDataType();
@@ -133,9 +142,23 @@ public class MySqlTypeUtils {
builder.scale(column.length());
}
break;
+ case "TINYINT":
+ // Debezium reports the bare type name "TINYINT", but the
narrowing rule in
+ // MySqlTypeConverter checks
columnType.equalsIgnoreCase("tinyint(1)"). Re-append
+ // the length so tinyint(1) is detectable on the CDC path;
otherwise it can never
+ // be narrowed (or kept) according to int_type_narrowing.
+ if (column.length() > 0) {
+ builder.columnType(String.format("TINYINT(%s)",
column.length()));
+ }
+ break;
default:
break;
}
- return MySqlTypeConverter.DEFAULT_INSTANCE.convert(builder.build());
+ // Honor the int_type_narrowing source option (carried via the
Debezium properties).
+ // Default true keeps the original DEFAULT_INSTANCE behavior
byte-for-byte.
+ boolean intTypeNarrowing =
+
dbzConnectorConfig.getConfig().getBoolean("int_type_narrowing", true);
+ return (intTypeNarrowing ? MySqlTypeConverter.DEFAULT_INSTANCE :
NO_INT_NARROWING_CONVERTER)
+ .convert(builder.build());
}
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactorySerializationTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactorySerializationTest.java
new file mode 100644
index 0000000000..e1309a7dd4
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactorySerializationTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config;
+
+import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfigFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.lang.reflect.Field;
+import java.util.Properties;
+
+/**
+ * {@link MySqlSourceConfigFactory} is part of the serialized job graph (it is
shipped to task
+ * executors and persisted with the pipeline). It does not declare an explicit
{@code
+ * serialVersionUID}, so the JVM computes it from the class structure —
including non-transient
+ * fields and non-private methods. Adding either changes the UID, which makes
jobs submitted on a
+ * previous version fail to deserialize on a new one during a rolling upgrade
({@code
+ * InvalidClassException}).
+ *
+ * <p>That is why this connector carries {@code int_type_narrowing} through
the inherited debezium
+ * properties map instead of adding a dedicated field/builder method. These
tests guard that
+ * decision: the option must survive serialization, and the class structure
(hence UID) must stay
+ * stable.
+ */
+class MySqlSourceConfigFactorySerializationTest {
+
+ /** Baseline computed UID of the factory's structure. Must not drift (see
class javadoc). */
+ private static final long BASELINE_SERIAL_VERSION_UID =
-6578851046816898665L;
+
+ @Test
+ void serialVersionUidMustNotDrift() {
+ long actual =
+
ObjectStreamClass.lookup(MySqlSourceConfigFactory.class).getSerialVersionUID();
+ Assertions.assertEquals(
+ BASELINE_SERIAL_VERSION_UID,
+ actual,
+ "MySqlSourceConfigFactory serialVersionUID drifted ("
+ + BASELINE_SERIAL_VERSION_UID
+ + " -> "
+ + actual
+ + "). Adding fields/non-private methods to this
serialized factory breaks "
+ + "rolling upgrades. Route new options through the
dbzProperties map.");
+ }
+
+ @Test
+ void intTypeNarrowingSurvivesSerializationViaDbzProperties() throws
Exception {
+ MySqlSourceConfigFactory factory = new MySqlSourceConfigFactory();
+ Properties dbz = new Properties();
+ dbz.setProperty("int_type_narrowing", "false");
+ factory.debeziumProperties(dbz);
+
+ MySqlSourceConfigFactory restored = roundTrip(factory);
+
+ Field f =
JdbcSourceConfigFactory.class.getDeclaredField("dbzProperties");
+ f.setAccessible(true);
+ Properties restoredProps = (Properties) f.get(restored);
+ Assertions.assertEquals("false",
restoredProps.getProperty("int_type_narrowing"));
+ }
+
+ private static MySqlSourceConfigFactory roundTrip(MySqlSourceConfigFactory
factory)
+ throws Exception {
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+ oos.writeObject(factory);
+ }
+ try (ObjectInputStream ois =
+ new ObjectInputStream(new
ByteArrayInputStream(bos.toByteArray()))) {
+ return (MySqlSourceConfigFactory) ois.readObject();
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtilsIntTypeNarrowingTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtilsIntTypeNarrowingTest.java
new file mode 100644
index 0000000000..43723e1fea
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtilsIntTypeNarrowingTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import io.debezium.relational.Column;
+
+/**
+ * Verifies that the documented {@code int_type_narrowing} option is honored
on the MySQL-CDC path.
+ *
+ * <p>Previously {@code MySqlTypeUtils.convertToSeaTunnelColumn} always used
{@link
+ *
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter#DEFAULT_INSTANCE}
+ * (narrowing=true), so {@code tinyint(1)} was always mapped to BOOLEAN
regardless of the option
+ * documented for the MySQL-CDC source. The value is now carried through the
Debezium properties and
+ * applied: {@code false} keeps {@code tinyint(1)} as TINYINT (BYTE), {@code
true} (and the default)
+ * narrows it to BOOLEAN.
+ */
+public class MySqlTypeUtilsIntTypeNarrowingTest {
+
+ private static MySqlConnectorConfig config(Boolean intTypeNarrowing) {
+ Configuration.Builder builder =
+ Configuration.create()
+ .with(MySqlConnectorConfig.SERVER_NAME, "test_server")
+ .with(MySqlConnectorConfig.HOSTNAME, "localhost")
+ .with(MySqlConnectorConfig.USER, "test")
+ .with(MySqlConnectorConfig.PASSWORD, "test");
+ if (intTypeNarrowing != null) {
+ builder.with("int_type_narrowing",
String.valueOf(intTypeNarrowing));
+ }
+ return new MySqlConnectorConfig(builder.build());
+ }
+
+ private static Column tinyint1() {
+ return Column.editor()
+ .name("flag")
+ .type("TINYINT", "TINYINT")
+ .jdbcType(java.sql.Types.TINYINT)
+ .length(1)
+ .optional(true)
+ .create();
+ }
+
+ @Test
+ void tinyint1NarrowsToBooleanWhenEnabled() {
+ Assertions.assertEquals(
+ BasicType.BOOLEAN_TYPE,
+ MySqlTypeUtils.convertToSeaTunnelColumn(tinyint1(),
config(true)).getDataType());
+ }
+
+ @Test
+ void tinyint1StaysTinyintWhenDisabled() {
+ Assertions.assertEquals(
+ BasicType.BYTE_TYPE,
+ MySqlTypeUtils.convertToSeaTunnelColumn(tinyint1(),
config(false)).getDataType());
+ }
+
+ @Test
+ void defaultsToNarrowingWhenAbsent() {
+ Assertions.assertEquals(
+ BasicType.BOOLEAN_TYPE,
+ MySqlTypeUtils.convertToSeaTunnelColumn(tinyint1(),
config(null)).getDataType());
+ }
+}