This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b9e5054fa1 [Fix][Connector-V2][ClickHouse] Fix ThreadLocal memory leak
in ClickhouseCatalogUtil (#10264)
b9e5054fa1 is described below
commit b9e5054fa126f224ad47fd21cbdcf9a118039f59
Author: Jast <[email protected]>
AuthorDate: Wed Jan 21 22:05:12 2026 +0800
[Fix][Connector-V2][ClickHouse] Fix ThreadLocal memory leak in
ClickhouseCatalogUtil (#10264)
---
.../clickhouse/util/ClickhouseCatalogUtil.java | 2 +-
.../clickhouse/util/ClickhouseCatalogUtilTest.java | 79 ++++++++++++++++++++++
2 files changed, 80 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
index 394e1e81ca..d1fb472642 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
@@ -54,7 +54,7 @@ public class ClickhouseCatalogUtil extends CatalogUtil {
return super.getCreateTableSql(
template, database, table, tableSchema, comment,
optionsKey);
} finally {
- pkColumns.clear();
+ PRIMARY_KEY_COLUMNS.remove();
}
}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java
index c36b1d652d..12d335566e 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java
@@ -18,11 +18,20 @@
package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseSinkOptions;
import org.junit.jupiter.api.Test;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
@@ -101,4 +110,74 @@ public class ClickhouseCatalogUtilTest {
NullPointerException.class,
() ->
ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(null));
}
+
+ @Test
+ void testPrimaryKeyColumnShouldNotBeNullable() {
+ // Test that ThreadLocal is properly cleared after getCreateTableSql
call
+ Column column = mock(Column.class);
+ when(column.getName()).thenReturn("pk_column");
+ when(column.getSinkType()).thenReturn("String");
+ when(column.isNullable()).thenReturn(true);
+ when(column.getComment()).thenReturn("");
+
+ List<Column> columns = new ArrayList<>();
+ columns.add(column);
+
+ TableSchema tableSchema =
+ TableSchema.builder()
+ .primaryKey(PrimaryKey.of("",
Collections.singletonList("pk_column")))
+ .columns(columns)
+ .build();
+
+ ClickhouseCatalogUtil.INSTANCE.getCreateTableSql(
+ "CREATE TABLE `${database}`.`${table}` (${rowtype_fields})",
+ "test_db",
+ "test_table",
+ tableSchema,
+ null,
+ ClickhouseSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
+
+ // After getCreateTableSql call, ThreadLocal should be cleared
+ // so columnToConnectorType should treat it as NOT a primary key
+ String result =
ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column);
+ assertEquals("`pk_column` Nullable(String) ", result);
+ }
+
+ @Test
+ void testPrimaryKeyColumnWithNullableShouldNotWrapInNullable() {
+ // Test the actual scenario: primary key columns should NOT be wrapped
in Nullable
+ // because ClickHouse doesn't allow nullable columns in ORDER BY /
PRIMARY KEY
+ String template =
+ "CREATE TABLE `${database}`.`${table}` (\n"
+ + " ${rowtype_primary_key},\n"
+ + " ${rowtype_fields}\n"
+ + ") ENGINE = MergeTree()\n"
+ + "ORDER BY (${rowtype_primary_key})";
+
+ List<Column> columns = new ArrayList<>();
+ columns.add(PhysicalColumn.of("id", BasicType.LONG_TYPE, (Long) null,
true, null, ""));
+ columns.add(PhysicalColumn.of("name", BasicType.STRING_TYPE, (Long)
null, true, null, ""));
+ columns.add(PhysicalColumn.of("age", BasicType.INT_TYPE, (Long) null,
true, null, ""));
+
+ TableSchema tableSchema =
+ TableSchema.builder()
+ .primaryKey(PrimaryKey.of("", Arrays.asList("id",
"age")))
+ .columns(columns)
+ .build();
+
+ String sql =
+ ClickhouseCatalogUtil.INSTANCE.getCreateTableSql(
+ template,
+ "test_db",
+ "test_table",
+ tableSchema,
+ null,
+ ClickhouseSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
+
+ // Primary key columns (id, age) should NOT be wrapped in Nullable
+ assertEquals(true, sql.contains("`id` Int64 "));
+ assertEquals(true, sql.contains("`age` Int32 "));
+ // Non-primary key column (name) should be wrapped in Nullable
+ assertEquals(true, sql.contains("`name` Nullable(String) "));
+ }
}