This is an automated email from the ASF dual-hosted git repository.

davidzollo pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 902671729e [Fix][Connector-V2][CDC-MySQL] Honor int_type_narrowing 
option on MyS… (#11004)
902671729e is described below

commit 902671729e75f41335c8cb858ebdddc4d4fdf003
Author: Ricky Makhija <[email protected]>
AuthorDate: Fri Jun 5 12:51:25 2026 +0530

    [Fix][Connector-V2][CDC-MySQL] Honor int_type_narrowing option on MyS… 
(#11004)
    
    Signed-off-by: ricky2129 <[email protected]>
---
 .../cdc/mysql/config/MySqlSourceConfigFactory.java |  6 ++
 .../cdc/mysql/source/MySqlIncrementalSource.java   | 13 +++
 .../seatunnel/cdc/mysql/utils/MySqlTypeUtils.java  | 25 +++++-
 .../MySqlSourceConfigFactorySerializationTest.java | 92 ++++++++++++++++++++++
 .../utils/MySqlTypeUtilsIntTypeNarrowingTest.java  | 84 ++++++++++++++++++++
 5 files changed, 219 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
index 97079da6c2..b9aa542f1e 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactory.java
@@ -87,6 +87,12 @@ public class MySqlSourceConfigFactory extends 
JdbcSourceConfigFactory {
         // but it'll cause lose of precise when the value is larger than 2^63,
         // so use "precise" mode to avoid it.
         props.put("bigint.unsigned.handling.mode", "precise");
+        // default int_type_narrowing (tinyint(1) -> boolean); the Source / 
user debezium block
+        // overrides this via the dbzProperties merge below so MySqlTypeUtils 
can honor a false.
+        // Carried as a property, NOT a field, on purpose: adding a 
field/method to this
+        // Serializable factory drifts its serialVersionUID and breaks rolling 
upgrades (jobs
+        // submitted on the prior version fail to deserialize on the new one).
+        props.setProperty("int_type_narrowing", String.valueOf(true));
 
         if (serverIdRange != null) {
             props.setProperty("database.server.id.range", 
String.valueOf(serverIdRange));
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
index 665d907e3f..4a7cb09c9b 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java
@@ -54,6 +54,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Properties;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -85,6 +86,18 @@ public class MySqlIncrementalSource<T> extends 
IncrementalSource<T, JdbcSourceCo
         MySqlSourceConfigFactory configFactory = new 
MySqlSourceConfigFactory();
         configFactory.serverId(config.get(JdbcSourceOptions.SERVER_ID));
         configFactory.fromReadonlyConfig(readonlyConfig);
+        // Carry int_type_narrowing through the debezium properties map rather 
than a factory field.
+        // Adding a field/method to the Serializable MySqlSourceConfigFactory 
drifts its
+        // serialVersionUID and breaks rolling upgrades (jobs submitted on the 
prior version fail to
+        // deserialize). This runs after fromReadonlyConfig (which resets 
dbzProperties from the
+        // user
+        // debezium block), re-merging those user props then appending 
int_type_narrowing.
+        Properties dbzProperties = new Properties();
+        
config.getOptional(JdbcSourceOptions.DEBEZIUM_PROPERTIES).ifPresent(dbzProperties::putAll);
+        dbzProperties.setProperty(
+                "int_type_narrowing",
+                
String.valueOf(config.get(JdbcCommonOptions.INT_TYPE_NARROWING)));
+        configFactory.debeziumProperties(dbzProperties);
         JdbcUrlUtil.UrlInfo urlInfo = 
JdbcUrlUtil.getUrlInfo(config.get(JdbcCommonOptions.URL));
         configFactory.originUrl(urlInfo.getOrigin());
         configFactory.hostname(urlInfo.getHost());
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
index 2ab3524270..2ff84a440b 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtils.java
@@ -21,6 +21,7 @@ import 
org.apache.seatunnel.api.table.converter.BasicTypeDefine;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.connectors.seatunnel.common.source.TypeDefineUtils;
 import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
+import 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlVersion;
 import org.apache.seatunnel.connectors.seatunnel.jdbc.utils.DefaultValueUtils;
 
 import io.debezium.connector.mysql.MySqlConnectorConfig;
@@ -36,6 +37,14 @@ import java.util.Optional;
 @Slf4j
 public class MySqlTypeUtils {
 
+    /**
+     * Converter used when {@code int_type_narrowing=false}: identical to 
{@link
+     * MySqlTypeConverter#DEFAULT_INSTANCE} except narrowing is off, so {@code 
tinyint(1)} stays
+     * TINYINT (byte) instead of becoming BOOLEAN. Pre-built singleton (no 
per-column allocation).
+     */
+    private static final MySqlTypeConverter NO_INT_NARROWING_CONVERTER =
+            new MySqlTypeConverter(MySqlVersion.V_5_7, false);
+
     public static SeaTunnelDataType<?> convertFromColumn(
             Column column, RelationalDatabaseConnectorConfig 
dbzConnectorConfig) {
         return convertToSeaTunnelColumn(column, 
dbzConnectorConfig).getDataType();
@@ -133,9 +142,23 @@ public class MySqlTypeUtils {
                     builder.scale(column.length());
                 }
                 break;
+            case "TINYINT":
+                // Debezium reports the bare type name "TINYINT", but the 
narrowing rule in
+                // MySqlTypeConverter checks 
columnType.equalsIgnoreCase("tinyint(1)"). Re-append
+                // the length so tinyint(1) is detectable on the CDC path; 
otherwise it can never
+                // be narrowed (or kept) according to int_type_narrowing.
+                if (column.length() > 0) {
+                    builder.columnType(String.format("TINYINT(%s)", 
column.length()));
+                }
+                break;
             default:
                 break;
         }
-        return MySqlTypeConverter.DEFAULT_INSTANCE.convert(builder.build());
+        // Honor the int_type_narrowing source option (carried via the 
Debezium properties).
+        // Default true keeps the original DEFAULT_INSTANCE behavior 
byte-for-byte.
+        boolean intTypeNarrowing =
+                
dbzConnectorConfig.getConfig().getBoolean("int_type_narrowing", true);
+        return (intTypeNarrowing ? MySqlTypeConverter.DEFAULT_INSTANCE : 
NO_INT_NARROWING_CONVERTER)
+                .convert(builder.build());
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactorySerializationTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactorySerializationTest.java
new file mode 100644
index 0000000000..e1309a7dd4
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/config/MySqlSourceConfigFactorySerializationTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.config;
+
+import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfigFactory;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.ObjectStreamClass;
+import java.lang.reflect.Field;
+import java.util.Properties;
+
+/**
+ * {@link MySqlSourceConfigFactory} is part of the serialized job graph (it is 
shipped to task
+ * executors and persisted with the pipeline). It does not declare an explicit 
{@code
+ * serialVersionUID}, so the JVM computes it from the class structure — 
including non-transient
+ * fields and non-private methods. Adding either changes the UID, which makes 
jobs submitted on a
+ * previous version fail to deserialize on a new one during a rolling upgrade 
({@code
+ * InvalidClassException}).
+ *
+ * <p>That is why this connector carries {@code int_type_narrowing} through 
the inherited debezium
+ * properties map instead of adding a dedicated field/builder method. These 
tests guard that
+ * decision: the option must survive serialization, and the class structure 
(hence UID) must stay
+ * stable.
+ */
+class MySqlSourceConfigFactorySerializationTest {
+
+    /** Baseline computed UID of the factory's structure. Must not drift (see 
class javadoc). */
+    private static final long BASELINE_SERIAL_VERSION_UID = 
-6578851046816898665L;
+
+    @Test
+    void serialVersionUidMustNotDrift() {
+        long actual =
+                
ObjectStreamClass.lookup(MySqlSourceConfigFactory.class).getSerialVersionUID();
+        Assertions.assertEquals(
+                BASELINE_SERIAL_VERSION_UID,
+                actual,
+                "MySqlSourceConfigFactory serialVersionUID drifted ("
+                        + BASELINE_SERIAL_VERSION_UID
+                        + " -> "
+                        + actual
+                        + "). Adding fields/non-private methods to this 
serialized factory breaks "
+                        + "rolling upgrades. Route new options through the 
dbzProperties map.");
+    }
+
+    @Test
+    void intTypeNarrowingSurvivesSerializationViaDbzProperties() throws 
Exception {
+        MySqlSourceConfigFactory factory = new MySqlSourceConfigFactory();
+        Properties dbz = new Properties();
+        dbz.setProperty("int_type_narrowing", "false");
+        factory.debeziumProperties(dbz);
+
+        MySqlSourceConfigFactory restored = roundTrip(factory);
+
+        Field f = 
JdbcSourceConfigFactory.class.getDeclaredField("dbzProperties");
+        f.setAccessible(true);
+        Properties restoredProps = (Properties) f.get(restored);
+        Assertions.assertEquals("false", 
restoredProps.getProperty("int_type_narrowing"));
+    }
+
+    private static MySqlSourceConfigFactory roundTrip(MySqlSourceConfigFactory 
factory)
+            throws Exception {
+        ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+            oos.writeObject(factory);
+        }
+        try (ObjectInputStream ois =
+                new ObjectInputStream(new 
ByteArrayInputStream(bos.toByteArray()))) {
+            return (MySqlSourceConfigFactory) ois.readObject();
+        }
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtilsIntTypeNarrowingTest.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtilsIntTypeNarrowingTest.java
new file mode 100644
index 0000000000..43723e1fea
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/utils/MySqlTypeUtilsIntTypeNarrowingTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.utils;
+
+import org.apache.seatunnel.api.table.type.BasicType;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import io.debezium.relational.Column;
+
+/**
+ * Verifies that the documented {@code int_type_narrowing} option is honored 
on the MySQL-CDC path.
+ *
+ * <p>Previously {@code MySqlTypeUtils.convertToSeaTunnelColumn} always used 
{@link
+ * 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter#DEFAULT_INSTANCE}
+ * (narrowing=true), so {@code tinyint(1)} was always mapped to BOOLEAN 
regardless of the option
+ * documented for the MySQL-CDC source. The value is now carried through the 
Debezium properties and
+ * applied: {@code false} keeps {@code tinyint(1)} as TINYINT (BYTE), {@code 
true} (and the default)
+ * narrows it to BOOLEAN.
+ */
+public class MySqlTypeUtilsIntTypeNarrowingTest {
+
+    private static MySqlConnectorConfig config(Boolean intTypeNarrowing) {
+        Configuration.Builder builder =
+                Configuration.create()
+                        .with(MySqlConnectorConfig.SERVER_NAME, "test_server")
+                        .with(MySqlConnectorConfig.HOSTNAME, "localhost")
+                        .with(MySqlConnectorConfig.USER, "test")
+                        .with(MySqlConnectorConfig.PASSWORD, "test");
+        if (intTypeNarrowing != null) {
+            builder.with("int_type_narrowing", 
String.valueOf(intTypeNarrowing));
+        }
+        return new MySqlConnectorConfig(builder.build());
+    }
+
+    private static Column tinyint1() {
+        return Column.editor()
+                .name("flag")
+                .type("TINYINT", "TINYINT")
+                .jdbcType(java.sql.Types.TINYINT)
+                .length(1)
+                .optional(true)
+                .create();
+    }
+
+    @Test
+    void tinyint1NarrowsToBooleanWhenEnabled() {
+        Assertions.assertEquals(
+                BasicType.BOOLEAN_TYPE,
+                MySqlTypeUtils.convertToSeaTunnelColumn(tinyint1(), 
config(true)).getDataType());
+    }
+
+    @Test
+    void tinyint1StaysTinyintWhenDisabled() {
+        Assertions.assertEquals(
+                BasicType.BYTE_TYPE,
+                MySqlTypeUtils.convertToSeaTunnelColumn(tinyint1(), 
config(false)).getDataType());
+    }
+
+    @Test
+    void defaultsToNarrowingWhenAbsent() {
+        Assertions.assertEquals(
+                BasicType.BOOLEAN_TYPE,
+                MySqlTypeUtils.convertToSeaTunnelColumn(tinyint1(), 
config(null)).getDataType());
+    }
+}

Reply via email to