This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 01be66dfac [Fix][Connector-V2][Clickhouse] Fix ClickHouse catalog
nullable column type and add tests (#10119)
01be66dfac is described below
commit 01be66dfac8a8053437fd92d5147b148e82b15c1
Author: Jast <[email protected]>
AuthorDate: Mon Dec 8 18:34:00 2025 +0800
[Fix][Connector-V2][Clickhouse] Fix ClickHouse catalog nullable column type
and add tests (#10119)
---
.../clickhouse/util/ClickhouseCatalogUtil.java | 40 ++++++++++++++++++++++
.../clickhouse/ClickhouseCreateTableTest.java | 10 +++---
.../clickhouse/util/ClickhouseCatalogUtilTest.java | 40 ++++++++++++++++++++++
.../seatunnel/clickhouse/ClickhouseIT.java | 2 +-
4 files changed, 87 insertions(+), 5 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
index 06078a62d1..394e1e81ca 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
@@ -21,15 +21,43 @@ import
org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
import org.apache.seatunnel.api.table.catalog.Column;
import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.TableSchema;
import
org.apache.seatunnel.connectors.seatunnel.clickhouse.catalog.ClickhouseTypeConverter;
import org.apache.seatunnel.connectors.seatunnel.common.util.CatalogUtil;
+import java.util.HashSet;
+import java.util.Set;
+
import static
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
public class ClickhouseCatalogUtil extends CatalogUtil {
+ private static final ThreadLocal<Set<String>> PRIMARY_KEY_COLUMNS =
+ ThreadLocal.withInitial(HashSet::new);
+
public static final ClickhouseCatalogUtil INSTANCE = new
ClickhouseCatalogUtil();
+ @Override
+ public String getCreateTableSql(
+ String template,
+ String database,
+ String table,
+ TableSchema tableSchema,
+ String comment,
+ String optionsKey) {
+ Set<String> pkColumns = PRIMARY_KEY_COLUMNS.get();
+ pkColumns.clear();
+ if (tableSchema.getPrimaryKey() != null) {
+ pkColumns.addAll(tableSchema.getPrimaryKey().getColumnNames());
+ }
+ try {
+ return super.getCreateTableSql(
+ template, database, table, tableSchema, comment,
optionsKey);
+ } finally {
+ pkColumns.clear();
+ }
+ }
+
public String columnToConnectorType(Column column) {
checkNotNull(column, "The column is required.");
String columnType;
@@ -38,6 +66,14 @@ public class ClickhouseCatalogUtil extends CatalogUtil {
} else {
columnType =
ClickhouseTypeConverter.INSTANCE.reconvert(column).getColumnType();
}
+
+ Set<String> pkColumns = PRIMARY_KEY_COLUMNS.get();
+ boolean isPrimaryKeyColumn = pkColumns != null &&
pkColumns.contains(column.getName());
+
+ if (column.isNullable() && !isUnsupportedNullableType(columnType) &&
!isPrimaryKeyColumn) {
+ columnType = "Nullable(" + columnType + ")";
+ }
+
return String.format(
"`%s` %s %s",
column.getName(),
@@ -49,6 +85,10 @@ public class ClickhouseCatalogUtil extends CatalogUtil {
+ "'");
}
+ private static boolean isUnsupportedNullableType(String columnType) {
+ return columnType.startsWith("Map(") ||
columnType.startsWith("Array(");
+ }
+
public String getDropTableSql(TablePath tablePath, boolean
ignoreIfNotExists) {
if (ignoreIfNotExists) {
return "DROP TABLE IF EXISTS "
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java
index 90122ebd64..b2a44e25a1 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java
@@ -99,14 +99,16 @@ public class ClickhouseCreateTableTest {
.build(),
"clickhouse test table",
ClickhouseSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
+ // Primary key columns (id, age) should NOT be wrapped in Nullable
+ // because ClickHouse does not allow nullable columns in ORDER BY /
PRIMARY KEY
Assertions.assertEquals(
createTableSql,
"CREATE TABLE IF NOT EXISTS `test1`.`test2` (\n"
+ " `id` Int64 ,`age` Int32 COMMENT 'test
comment',\n"
- + " `name` String ,\n"
- + "`score` Int32 COMMENT '''N''-N',\n"
- + "`gender` Int8 ,\n"
- + "`create_time` Int64 \n"
+ + " `name` Nullable(String) ,\n"
+ + "`score` Nullable(Int32) COMMENT '\''N''-N',\n"
+ + "`gender` Nullable(Int8) ,\n"
+ + "`create_time` Nullable(Int64) \n"
+ ") ENGINE = MergeTree()\n"
+ "ORDER BY (`id`,`age`)\n"
+ "PRIMARY KEY (`id`,`age`)\n"
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java
index dd019b3d48..c36b1d652d 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtilTest.java
@@ -24,6 +24,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -33,6 +34,8 @@ public class ClickhouseCatalogUtilTest {
Column column = mock(Column.class);
when(column.getName()).thenReturn("col1");
when(column.getSinkType()).thenReturn("String");
+ when(column.isNullable()).thenReturn(false);
+ when(column.getComment()).thenReturn("");
String result =
ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column);
@@ -44,6 +47,8 @@ public class ClickhouseCatalogUtilTest {
Column column = mock(Column.class);
when(column.getName()).thenReturn("col1");
when(column.getDataType()).thenReturn((SeaTunnelDataType)
BasicType.INT_TYPE);
+ when(column.isNullable()).thenReturn(false);
+ when(column.getComment()).thenReturn("");
String result =
ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column);
@@ -56,9 +61,44 @@ public class ClickhouseCatalogUtilTest {
when(column.getName()).thenReturn("col1");
when(column.getDataType()).thenReturn((SeaTunnelDataType)
BasicType.INT_TYPE);
when(column.getSinkType()).thenReturn("String");
+ when(column.isNullable()).thenReturn(false);
+ when(column.getComment()).thenReturn("");
String result =
ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column);
assertEquals("`col1` String ", result);
}
+
+ @Test
+ void wrapsTypeWithNullableWhenColumnIsNullable() {
+ Column column = mock(Column.class);
+ when(column.getName()).thenReturn("col1");
+ when(column.getSinkType()).thenReturn("String");
+ when(column.isNullable()).thenReturn(true);
+ when(column.getComment()).thenReturn("");
+
+ String result =
ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column);
+
+ assertEquals("`col1` Nullable(String) ", result);
+ }
+
+ @Test
+ void escapesSingleQuoteAndBackslashInComment() {
+ Column column = mock(Column.class);
+ when(column.getName()).thenReturn("col1");
+ when(column.getSinkType()).thenReturn("String");
+ when(column.isNullable()).thenReturn(false);
+ when(column.getComment()).thenReturn("O'Reilly \\ path");
+
+ String result =
ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(column);
+
+ assertEquals("`col1` String COMMENT 'O''Reilly \\\\ path'", result);
+ }
+
+ @Test
+ void throwsExceptionWhenColumnIsNull() {
+ assertThrows(
+ NullPointerException.class,
+ () ->
ClickhouseCatalogUtil.INSTANCE.columnToConnectorType(null));
+ }
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
index 1e8b40ace0..092c7cd9ad 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
@@ -206,7 +206,7 @@ public class ClickhouseIT extends TestSuiteBase implements
TestResource {
String tableName = "default.sink_table_for_schema";
Container.ExecResult execResult =
container.executeJob("/clickhouse_with_recreate_schema_and_custom.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStdout());
Assertions.assertEquals(101, countData(tableName));
dropTable(tableName);
}