This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5d888e965e [Bug][connector-jdbc] Fix integer overflow when merging
table schema with large column length (#10387)
5d888e965e is described below
commit 5d888e965ec39c97e2f960e2e24c7d5116a630aa
Author: yzeng1618 <[email protected]>
AuthorDate: Sun Jan 25 00:48:36 2026 +0800
[Bug][connector-jdbc] Fix integer overflow when merging table schema with
large column length (#10387)
Co-authored-by: zengyi <[email protected]>
---
.../seatunnel/jdbc/utils/JdbcCatalogUtils.java | 11 ++-
.../seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java | 88 ++++++++++++++++++++++
2 files changed, 93 insertions(+), 6 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
index 6ea674b322..7d7908c821 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtils.java
@@ -332,23 +332,22 @@ public class JdbcCatalogUtils {
.get(column.getName())
.getDataType()
.getSqlType())
- ? PhysicalColumn.of(
+ ? new PhysicalColumn(
column.getName(),
column.getDataType(),
- column.getColumnLength()
== null
- ? null
- : Math.toIntExact(
-
column.getColumnLength()),
+ column.getColumnLength(),
+ column.getScale(),
column.isNullable(),
column.getDefaultValue(),
columnsOfPath
.get(column.getName())
.getComment(),
column.getSourceType(),
+ column.getSinkType(),
+ column.getOptions(),
column.isUnsigned(),
column.isZeroFill(),
column.getBitLen(),
- column.getOptions(),
column.getLongColumnLength())
: column;
})
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
index 3067e3c2ce..f87bd998cc 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/utils/JdbcCatalogUtilsTest.java
@@ -335,6 +335,94 @@ public class JdbcCatalogUtilsTest {
mergeTable.getTableSchema().getColumns());
}
+ @Test
+ public void testColumnNotIncludeMergeWithLargeColumnLength() {
+ long largeLength = 4294967295L;
+
+ CatalogTable tableOfPath =
+ CatalogTable.of(
+ TableIdentifier.of("mysql-1", "database-x", null,
"table-x"),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "id",
+ BasicType.LONG_TYPE,
+ (Long) null,
+ false,
+ null,
+ "id comment"))
+ .column(
+ PhysicalColumn.of(
+ "config",
+ BasicType.STRING_TYPE,
+ largeLength,
+ false,
+ null,
+ "config comment"))
+ .build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ null);
+
+ CatalogTable tableOfQuery =
+ CatalogTable.of(
+ TableIdentifier.of("default", null, null, "default"),
+ TableSchema.builder()
+ .column(
+ PhysicalColumn.of(
+ "id",
+ BasicType.LONG_TYPE,
+ (Long) null,
+ true,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "config",
+ BasicType.STRING_TYPE,
+ largeLength,
+ true,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "dummy",
+ BasicType.INT_TYPE,
+ (Long) null,
+ true,
+ null,
+ null))
+ .build(),
+ Collections.emptyMap(),
+ Collections.emptyList(),
+ null);
+
+ CatalogTable mergeTable =
JdbcCatalogUtils.mergeCatalogTable(tableOfPath, tableOfQuery);
+
+ Assertions.assertEquals(
+ tableOfPath.getTableId().toTablePath(),
mergeTable.getTableId().toTablePath());
+ Assertions.assertEquals(
+ tableOfQuery.getTableId().getCatalogName(),
+ mergeTable.getTableId().getCatalogName());
+
+ Map<String, Column> mergedColumns =
+ mergeTable.getTableSchema().getColumns().stream()
+ .collect(Collectors.toMap(e -> e.getName(), e -> e));
+
+ Column mergedId = mergedColumns.get("id");
+ Column mergedConfig = mergedColumns.get("config");
+
+ Assertions.assertNotNull(mergedId);
+ Assertions.assertNotNull(mergedConfig);
+
+ // The merge should use the query column as base, and fill comment
from the table_path.
+ Assertions.assertTrue(mergedId.isNullable());
+ Assertions.assertEquals("id comment", mergedId.getComment());
+
+ Assertions.assertEquals(Long.valueOf(largeLength),
mergedConfig.getColumnLength());
+ Assertions.assertEquals("config comment", mergedConfig.getComment());
+ }
+
@Test
public void testDecimalColumnMerge() {
CatalogTable tableOfQuery =