This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b9e5054fa1 [Fix][Connector-V2][ClickHouse] Fix ThreadLocal memory leak 
in ClickhouseCatalogUtil (#10264)
b9e5054fa1 is described below

commit b9e5054fa126f224ad47fd21cbdcf9a118039f59
Author: Jast <[email protected]>
AuthorDate: Wed Jan 21 22:05:12 2026 +0800

    [Fix][Connector-V2][ClickHouse] Fix ThreadLocal memory leak in 
ClickhouseCatalogUtil (#10264)
---
 .../clickhouse/util/ClickhouseCatalogUtil.java     |  2 +-
 .../clickhouse/util/ClickhouseCatalogUtilTest.java | 79 ++++++++++++++++++++++
 2 files changed, 80 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
index 394e1e81ca..d1fb472642 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
@@ -54,7 +54,7 @@ public class ClickhouseCatalogUtil extends CatalogUtil {
             return super.getCreateTableSql(
                     template, database, table, tableSchema, comment, 
optionsKey);
         } finally {
-            pkColumns.clear();
+            PRIMARY_KEY_COLUMNS.remove();
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java
index c36b1d652d..12d335566e 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java
@@ -18,11 +18,20 @@
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.util;
 
 import org.apache.seatunnel.api.table.catalog.Column;
+import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
+import org.apache.seatunnel.api.table.catalog.PrimaryKey;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseSinkOptions;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.mock;
@@ -101,4 +110,74 @@ public class ClickhouseCatalogUtilTest {
                 NullPointerException.class,
                 () -> 
ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(null));
     }
+
+    @Test
+    void testPrimaryKeyColumnShouldNotBeNullable() {
+        // Test that ThreadLocal is properly cleared after getCreateTableSql 
call
+        Column column = mock(Column.class);
+        when(column.getName()).thenReturn("pk_column");
+        when(column.getSinkType()).thenReturn("String");
+        when(column.isNullable()).thenReturn(true);
+        when(column.getComment()).thenReturn("");
+
+        List<Column> columns = new ArrayList<>();
+        columns.add(column);
+
+        TableSchema tableSchema =
+                TableSchema.builder()
+                        .primaryKey(PrimaryKey.of("", 
Collections.singletonList("pk_column")))
+                        .columns(columns)
+                        .build();
+
+        ClickhouseCatalogUtil.INSTANCE.getCreateTableSql(
+                "CREATE TABLE `${database}`.`${table}` (${rowtype_fields})",
+                "test_db",
+                "test_table",
+                tableSchema,
+                null,
+                ClickhouseSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
+
+        // After getCreateTableSql call, ThreadLocal should be cleared
+        // so columnToConnectorType should treat it as NOT a primary key
+        String result = 
ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column);
+        assertEquals("`pk_column` Nullable(String) ", result);
+    }
+
+    @Test
+    void testPrimaryKeyColumnWithNullableShouldNotWrapInNullable() {
+        // Test the actual scenario: primary key columns should NOT be wrapped 
in Nullable
+        // because ClickHouse doesn't allow nullable columns in ORDER BY / 
PRIMARY KEY
+        String template =
+                "CREATE TABLE `${database}`.`${table}` (\n"
+                        + "    ${rowtype_primary_key},\n"
+                        + "    ${rowtype_fields}\n"
+                        + ") ENGINE = MergeTree()\n"
+                        + "ORDER BY (${rowtype_primary_key})";
+
+        List<Column> columns = new ArrayList<>();
+        columns.add(PhysicalColumn.of("id", BasicType.LONG_TYPE, (Long) null, 
true, null, ""));
+        columns.add(PhysicalColumn.of("name", BasicType.STRING_TYPE, (Long) 
null, true, null, ""));
+        columns.add(PhysicalColumn.of("age", BasicType.INT_TYPE, (Long) null, 
true, null, ""));
+
+        TableSchema tableSchema =
+                TableSchema.builder()
+                        .primaryKey(PrimaryKey.of("", Arrays.asList("id", 
"age")))
+                        .columns(columns)
+                        .build();
+
+        String sql =
+                ClickhouseCatalogUtil.INSTANCE.getCreateTableSql(
+                        template,
+                        "test_db",
+                        "test_table",
+                        tableSchema,
+                        null,
+                        ClickhouseSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
+
+        // Primary key columns (id, age) should NOT be wrapped in Nullable
+        assertEquals(true, sql.contains("`id` Int64 "));
+        assertEquals(true, sql.contains("`age` Int32 "));
+        // Non-primary key column (name) should be wrapped in Nullable
+        assertEquals(true, sql.contains("`name` Nullable(String) "));
+    }
 }

Reply via email to