This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new c4cb1fc4a3 [Fix][Connector-V2] fix starRocks automatically creates
tables with comment (#8568)
c4cb1fc4a3 is described below
commit c4cb1fc4a3b5e0277f1f6c9db9497846124df04d
Author: corgy-w <[email protected]>
AuthorDate: Thu Jan 23 10:00:41 2025 +0800
[Fix][Connector-V2] fix starRocks automatically creates tables with comment
(#8568)
---
.../clickhouse/util/ClickhouseCatalogUtil.java | 4 +-
.../clickhouse/ClickhouseCreateTableTest.java | 5 +-
.../seatunnel/common/util/CatalogUtil.java | 4 +-
.../connectors/doris/util/DorisCatalogUtil.java | 11 +++-
.../doris/catalog/DorisCreateTableTest.java | 5 +-
.../starrocks/sink/StarRocksSaveModeUtil.java | 4 +-
.../catalog/StarRocksCreateTableTest.java | 4 +-
.../seatunnel/clickhouse/ClickhouseIT.java | 8 +++
...clickhouse_with_create_schema_when_comment.conf | 62 ++++++++++++++++++++++
.../src/test/resources/init/clickhouse_init.conf | 14 ++---
.../e2e/connector/doris/DorisCatalogIT.java | 4 +-
.../e2e/connector/starrocks/StarRocksIT.java | 12 ++++-
12 files changed, 116 insertions(+), 21 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
index bf4e02c3fb..73dfb766dd 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java
@@ -38,7 +38,9 @@ public class ClickhouseCatalogUtil extends CatalogUtil {
ClickhouseTypeConverter.INSTANCE.reconvert(column).getColumnType(),
StringUtils.isEmpty(column.getComment())
? ""
- : "COMMENT '" + column.getComment() + "'");
+ : "COMMENT '"
+ + column.getComment().replace("'",
"''").replace("\\", "\\\\")
+ + "'");
}
public String getDropTableSql(TablePath tablePath, boolean
ignoreIfNotExists) {
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java
index d270be52a8..d83f490dd8 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java
@@ -53,7 +53,8 @@ public class ClickhouseCreateTableTest {
columns.add(
PhysicalColumn.of(
"age", BasicType.INT_TYPE, (Long) null, true, null,
"test comment"));
- columns.add(PhysicalColumn.of("score", BasicType.INT_TYPE, (Long)
null, true, null, ""));
+ columns.add(
+ PhysicalColumn.of("score", BasicType.INT_TYPE, (Long) null,
true, null, "'N'-N"));
columns.add(PhysicalColumn.of("gender", BasicType.BYTE_TYPE, (Long)
null, true, null, ""));
columns.add(
PhysicalColumn.of("create_time", BasicType.LONG_TYPE, (Long)
null, true, null, ""));
@@ -103,7 +104,7 @@ public class ClickhouseCreateTableTest {
"CREATE TABLE IF NOT EXISTS `test1`.`test2` (\n"
+ " `id` Int64 ,`age` Int32 COMMENT 'test
comment',\n"
+ " `name` String ,\n"
- + "`score` Int32 ,\n"
+ + "`score` Int32 COMMENT '''N''-N',\n"
+ "`gender` Int8 ,\n"
+ "`create_time` Int64 \n"
+ ") ENGINE = MergeTree()\n"
diff --git
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CatalogUtil.java
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CatalogUtil.java
index 1fdc59579a..8976499704 100644
---
a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CatalogUtil.java
+++
b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CatalogUtil.java
@@ -106,7 +106,9 @@ public abstract class CatalogUtil {
SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields)
.replaceAll(
SaveModePlaceHolder.COMMENT.getReplacePlaceHolder(),
- Objects.isNull(comment) ? "" : comment);
+ Objects.isNull(comment)
+ ? ""
+ : comment.replace("'", "''").replace("\\",
"\\\\"));
}
private String mergeColumnInTemplate(
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
index 11018d5c5c..24fd728011 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java
@@ -209,7 +209,12 @@ public class DorisCatalogUtil {
SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields)
.replaceAll(
SaveModePlaceHolder.COMMENT.getReplacePlaceHolder(),
- Objects.isNull(catalogTable.getComment()) ? "" :
catalogTable.getComment());
+ Objects.isNull(catalogTable.getComment())
+ ? ""
+ : catalogTable
+ .getComment()
+ .replace("'", "''")
+ .replace("\\", "\\\\"));
}
private static String mergeColumnInTemplate(
@@ -263,6 +268,8 @@ public class DorisCatalogUtil {
column.isNullable() ? "NULL" : "NOT NULL",
StringUtils.isEmpty(column.getComment())
? ""
- : "COMMENT '" + column.getComment() + "'");
+ : "COMMENT '"
+ + column.getComment().replace("'",
"''").replace("\\", "\\\\")
+ + "'");
}
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java
index 2142c0e2e2..8dacabfa19 100644
---
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java
+++
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java
@@ -60,7 +60,8 @@ public class DorisCreateTableTest {
columns.add(
PhysicalColumn.of(
"age", BasicType.INT_TYPE, (Long) null, true, null,
"test comment"));
- columns.add(PhysicalColumn.of("score", BasicType.INT_TYPE, (Long)
null, true, null, ""));
+ columns.add(
+ PhysicalColumn.of("score", BasicType.INT_TYPE, (Long) null,
true, null, "'N'-N"));
columns.add(PhysicalColumn.of("gender", BasicType.BYTE_TYPE, (Long)
null, true, null, ""));
columns.add(
PhysicalColumn.of("create_time", BasicType.LONG_TYPE, (Long)
null, true, null, ""));
@@ -125,7 +126,7 @@ public class DorisCreateTableTest {
result,
"CREATE TABLE IF NOT EXISTS `test1`.`test2` (
\n"
+ "`id` BIGINT NULL ,`age` INT NULL COMMENT 'test
comment' , \n"
- + "`name` STRING NULL ,`score` INT NULL , \n"
+ + "`name` STRING NULL ,`score` INT NULL COMMENT
'''N''-N' , \n"
+ "`create_time` DATETIME NOT NULL , \n"
+ "`gender` TINYINT NULL \n"
+ ") ENGINE=OLAP \n"
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
index 02d3118e07..223695978f 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java
@@ -45,7 +45,9 @@ public class StarRocksSaveModeUtil extends CatalogUtil {
column.isNullable() ? "NULL" : "NOT NULL",
StringUtils.isEmpty(column.getComment())
? ""
- : "COMMENT '" + column.getComment() + "'");
+ : "COMMENT '"
+ + column.getComment().replace("'",
"''").replace("\\", "\\\\")
+ + "'");
}
private static String dataTypeToStarrocksType(SeaTunnelDataType<?>
dataType, long length) {
diff --git
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
index 25b06f2806..ca466745ed 100644
---
a/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
+++
b/seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java
@@ -58,7 +58,7 @@ public class StarRocksCreateTableTest {
columns.add(
PhysicalColumn.of(
"name", BasicType.STRING_TYPE, (Long) null, true,
null, "test comment"));
- columns.add(PhysicalColumn.of("age", BasicType.INT_TYPE, (Long) null,
true, null, ""));
+ columns.add(PhysicalColumn.of("age", BasicType.INT_TYPE, (Long) null,
true, null, "'N'-N"));
columns.add(PhysicalColumn.of("score", BasicType.INT_TYPE, (Long)
null, true, null, ""));
columns.add(PhysicalColumn.of("gender", BasicType.BYTE_TYPE, (Long)
null, true, null, ""));
columns.add(
@@ -115,7 +115,7 @@ public class StarRocksCreateTableTest {
StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
Assertions.assertEquals(
"CREATE TABLE IF NOT EXISTS `test1`.`test2` (
\n"
- + "`id` BIGINT NULL ,`age` INT NULL , \n"
+ + "`id` BIGINT NULL ,`age` INT NULL COMMENT '''N''-N'
, \n"
+ "`name` STRING NULL COMMENT 'test comment',`score`
INT NULL , \n"
+ "`create_time` DATETIME NOT NULL , \n"
+ "`gender` TINYINT NULL \n"
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
index b830a11389..8e8a7c825d 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java
@@ -107,6 +107,14 @@ public class ClickhouseIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, execResult.getExitCode());
}
+ @TestTemplate
+ public void testClickhouseWithCreateSchemaWhenComment(TestContainer
container)
+ throws Exception {
+ Container.ExecResult execResult =
+
container.executeJob("/clickhouse_with_create_schema_when_comment.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
@TestTemplate
public void clickhouseWithCreateSchemaWhenNotExist(TestContainer
container) throws Exception {
String tableName = "default.sink_table_for_schema";
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_create_schema_when_comment.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_create_schema_when_comment.conf
new file mode 100644
index 0000000000..5d21b41a10
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_with_create_schema_when_comment.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ checkpoint.interval = 10
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ Clickhouse {
+ host = "clickhouse:8123"
+ database = "default"
+ sql = "select * from source_table"
+ username = "default"
+ password = ""
+ plugin_output = "source_table"
+ }
+ # If you would like to get more information about how to configure seatunnel
and see full list of source plugins,
+ # please go to
https://seatunnel.apache.org/docs/connector-v2/source/ClickhouseSource
+}
+
+sink {
+ Clickhouse {
+ host = "clickhouse:8123"
+ database = "default"
+ table = "clickhouse_with_create_schema_when_comment"
+ username = "default"
+ password = ""
+ "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
+ "data_save_mode"="APPEND_DATA"
+ "save_mode_create_template" = """
+ CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
+ ${rowtype_fields}
+ ) ENGINE =Memory
+ COMMENT '${comment}';
+ """
+ support_upsert = true
+ allow_experimental_lightweight_delete = true
+ }
+
+ # If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf
index 78f2daa1d7..0511401612 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf
@@ -19,8 +19,8 @@ source_table = """
set allow_experimental_geo_types = 1;
create table if not exists `default`.source_table(
`id` Int64,
- `c_map` Map(String, Int32),
- `c_array_string` Array(String),
+ `c_map` Map(String, Int32) COMMENT '''N''-N',
+ `c_array_string` Array(String) COMMENT '\\N\\-N',
`c_array_short` Array(Int16),
`c_array_int` Array(Int32),
`c_array_long` Array(Int64),
@@ -51,14 +51,15 @@ create table if not exists `default`.source_table(
`c_uint256` UInt256,
`c_point` Point,
`c_ring` Ring
-)engine=Memory;
+)engine=Memory
+comment '''N''-N';
"""
sink_table = """
create table if not exists `default`.sink_table(
`id` Int64,
- `c_map` Map(String, Int32),
- `c_array_string` Array(String),
+ `c_map` Map(String, Int32) COMMENT '''N''-N',
+ `c_array_string` Array(String) COMMENT '\\N\\-N',
`c_array_short` Array(Int16),
`c_array_int` Array(Int32),
`c_array_long` Array(Int64),
@@ -89,7 +90,8 @@ create table if not exists `default`.sink_table(
`c_uint256` UInt256,
`c_point` Point,
`c_ring` Ring
-)engine=Memory;
+)engine=Memory
+comment '''N''-N';
"""
insert_sql = """
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
index 3fcca216a0..2a55cc350c 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-doris-e2e/src/test/java/org/apache/seatunnel/e2e/connector/doris/DorisCatalogIT.java
@@ -66,7 +66,7 @@ public class DorisCatalogIT extends AbstractDorisIT {
TableSchema.Builder builder = TableSchema.builder();
builder.column(PhysicalColumn.of("k1", BasicType.INT_TYPE, 10, false,
0, "k1"));
builder.column(PhysicalColumn.of("k2", BasicType.STRING_TYPE, 64,
false, "", "k2"));
- builder.column(PhysicalColumn.of("v1", BasicType.DOUBLE_TYPE, 10,
true, null, "v1"));
+ builder.column(PhysicalColumn.of("v1", BasicType.DOUBLE_TYPE, 10,
true, null, "v1-'v1'"));
builder.column(PhysicalColumn.of("v2", new DecimalType(10, 2), 0,
false, 0.1, "v2"));
builder.primaryKey(PrimaryKey.of("pk", Arrays.asList("k1", "k2")));
catalogTable =
@@ -75,7 +75,7 @@ public class DorisCatalogIT extends AbstractDorisIT {
builder.build(),
Collections.emptyMap(),
Collections.emptyList(),
- "test");
+ "test - \\ 'test'");
}
private DorisCatalogFactory factory;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
index 1e984205ae..c49b1bfa41 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksIT.java
@@ -89,8 +89,9 @@ public class StarRocksIT extends TestSuiteBase implements
TestResource {
+ SOURCE_TABLE
+ " (\n"
+ " BIGINT_COL BIGINT,\n"
- + " LARGEINT_COL LARGEINT,\n"
- + " SMALLINT_COL SMALLINT,\n"
+ // add comment for test
+ + " LARGEINT_COL LARGEINT COMMENT '''N''-N',\n"
+ + " SMALLINT_COL SMALLINT COMMENT '\\N\\-N',\n"
+ " TINYINT_COL TINYINT,\n"
+ " BOOLEAN_COL BOOLEAN,\n"
+ " DECIMAL_COL Decimal(12, 1),\n"
@@ -365,6 +366,13 @@ public class StarRocksIT extends TestSuiteBase implements
TestResource {
"CREATE TABLE IF NOT EXISTS `${database}`.`${table}`
(\n ${rowtype_fields}\n ) ENGINE=OLAP \n DUPLICATE KEY(`BIGINT_COL`) \n
COMMENT '${comment}' \n DISTRIBUTED BY HASH (BIGINT_COL) BUCKETS 1 \n
PROPERTIES (\n \"replication_num\" = \"1\", \n \"in_memory\" = \"false\" ,
\n \"storage_format\" = \"DEFAULT\" \n )");
starRocksCatalog.open();
CatalogTable catalogTable =
starRocksCatalog.getTable(tablePathStarRocksSource);
+ catalogTable =
+ CatalogTable.of(
+ catalogTable.getTableId(),
+ catalogTable.getTableSchema(),
+ catalogTable.getOptions(),
+ catalogTable.getPartitionKeys(),
+ "test'1'");
// sink tableExists ?
starRocksCatalog.dropTable(tablePathStarRocksSink, true);
boolean tableExistsBefore =
starRocksCatalog.tableExists(tablePathStarRocksSink);