This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9d1cffa5e1 [Feature][Connector-doris] Adds case insensitivity feature
(#9306)
9d1cffa5e1 is described below
commit 9d1cffa5e1e8cc848d80c7f01e51d9c42f4bf23e
Author: yzeng1618 <[email protected]>
AuthorDate: Sat May 17 09:43:57 2025 +0800
[Feature][Connector-doris] Adds case insensitivity feature (#9306)
---
docs/en/connector-v2/changelog/connector-doris.md | 1 +
docs/en/connector-v2/sink/Doris.md | 21 ++++++++++
docs/zh/connector-v2/changelog/connector-doris.md | 1 +
docs/zh/connector-v2/sink/Doris.md | 24 +++++++++++-
.../connectors/doris/config/DorisSinkConfig.java | 4 +-
.../connectors/doris/config/DorisSinkOptions.java | 7 ++++
.../connectors/doris/config/DorisTableConfig.java | 15 ++++++++
.../doris/datatype/AbstractDorisTypeConverter.java | 6 ++-
.../doris/datatype/DorisTypeConverterV1.java | 7 +++-
.../doris/datatype/DorisTypeConverterV2.java | 7 +++-
.../doris/serialize/SeaTunnelRowSerializer.java | 24 ++++++++++--
.../serialize/SeaTunnelRowSerializerFactory.java | 45 ++++++++++++++++++++++
.../doris/sink/writer/DorisSinkWriter.java | 11 +-----
.../doris/sink/writer/DorisStreamLoad.java | 5 ++-
.../doris/datatype/DorisTypeConvertorV1Test.java | 44 +++++++++++++++++++++
.../doris/datatype/DorisTypeConvertorV2Test.java | 44 +++++++++++++++++++++
16 files changed, 247 insertions(+), 19 deletions(-)
diff --git a/docs/en/connector-v2/changelog/connector-doris.md
b/docs/en/connector-v2/changelog/connector-doris.md
index 02b7976f85..aee41efac5 100644
--- a/docs/en/connector-v2/changelog/connector-doris.md
+++ b/docs/en/connector-v2/changelog/connector-doris.md
@@ -2,6 +2,7 @@
| Change | Commit | Version |
| --- | --- | --- |
+|[Feature][connector-doris] Added case-sensitive feature for table and column
names|https://github.com/apache/seatunnel/commit/|2.3.11|
|[Improve] doris options
(#8745)|https://github.com/apache/seatunnel/commit/268d76cbf3|2.3.10|
|[Improve] restruct connector common options
(#8634)|https://github.com/apache/seatunnel/commit/f3499a6eeb|2.3.10|
|[Fix][Connector-V2] fix starRocks automatically creates tables with comment
(#8568)|https://github.com/apache/seatunnel/commit/c4cb1fc4a3|2.3.10|
diff --git a/docs/en/connector-v2/sink/Doris.md
b/docs/en/connector-v2/sink/Doris.md
index 21bad549ce..bf97d2b469 100644
--- a/docs/en/connector-v2/sink/Doris.md
+++ b/docs/en/connector-v2/sink/Doris.md
@@ -57,6 +57,7 @@ The internal implementation of Doris sink connector is cached
and imported by st
| sink.buffer-count | int | No | 3
| the buffer count to cache data for stream load.
|
| doris.batch.size | int | No | 1024
| the batch size of the write to doris each http request, when the row
reaches the size or checkpoint is executed, the data of cached will write to
server.
|
| needs_unsupported_type_casting | boolean | No | false
| Whether to enable the unsupported type casting, such as Decimal64 to
Double
|
+| case_sensitive | boolean | No | true
| Whether to preserve the original case of table and column names. When
set to false, table and column names will be converted to lowercase.
|
| schema_save_mode | Enum | no |
CREATE_SCHEMA_WHEN_NOT_EXIST | the schema save mode, please refer to
`schema_save_mode` below
|
| data_save_mode | Enum | no | APPEND_DATA
| the data save mode, please refer to `data_save_mode` below
|
| save_mode_create_template | string | no | see below
| see below
|
@@ -347,6 +348,26 @@ sink {
}
}
}
+
+### Case-Sensitive Configuration
+
+```hocon
+sink {
+ Doris {
+ fenodes = "e2e_dorisdb:8030"
+ username = root
+ password = ""
+ database = "Test_DB" # Original case will be preserved
+ table = "Test_Table" # Original case will be preserved
+ case_sensitive = true # Default value, preserves original case
+ sink.enable-2pc = "true"
+ sink.label-prefix = "test_case_sensitive"
+ doris.config = {
+ format = "json"
+ read_json_by_line = "true"
+ }
+ }
+}
```
### Multiple table
diff --git a/docs/zh/connector-v2/changelog/connector-doris.md
b/docs/zh/connector-v2/changelog/connector-doris.md
index 02b7976f85..5a8d91d513 100644
--- a/docs/zh/connector-v2/changelog/connector-doris.md
+++ b/docs/zh/connector-v2/changelog/connector-doris.md
@@ -2,6 +2,7 @@
| Change | Commit | Version |
| --- | --- | --- |
+|[Feature][connector-doris]
添加表名和字段名大小写敏感功能|https://github.com/apache/seatunnel/commit/|2.3.11|
|[Improve] doris options
(#8745)|https://github.com/apache/seatunnel/commit/268d76cbf3|2.3.10|
|[Improve] restruct connector common options
(#8634)|https://github.com/apache/seatunnel/commit/f3499a6eeb|2.3.10|
|[Fix][Connector-V2] fix starRocks automatically creates tables with comment
(#8568)|https://github.com/apache/seatunnel/commit/c4cb1fc4a3|2.3.10|
diff --git a/docs/zh/connector-v2/sink/Doris.md
b/docs/zh/connector-v2/sink/Doris.md
index d95d321a05..97f79ed4a2 100644
--- a/docs/zh/connector-v2/sink/Doris.md
+++ b/docs/zh/connector-v2/sink/Doris.md
@@ -56,6 +56,7 @@ Doris Sink连接器的内部实现是通过stream load批量缓存和导入的
| sink.buffer-count | int | No | 3
| 用于缓存stream load数据的缓冲区计数。
|
| doris.batch.size | int | No | 1024
| 每次http请求写入doris的批量大小,当row达到该大小或者执行checkpoint时,缓存的数据就会写入服务器。
|
| needs_unsupported_type_casting | boolean | No | false
| 是否启用不支持的类型转换,例如 Decimal64 到 Double。
|
+| case_sensitive | boolean | No | true
| 是否保留表名和字段名的原始大小写。当设置为 false 时,表名和字段名将被转换为小写。
|
| schema_save_mode | Enum | no |
CREATE_SCHEMA_WHEN_NOT_EXIST | schema保存模式,请参考下面的`schema_save_mode`
|
| data_save_mode | Enum | no | APPEND_DATA
| 数据保存模式,请参考下面的`data_save_mode`。
|
| save_mode_create_template | string | no | see below
| 见下文。
|
@@ -121,7 +122,7 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
```
连接器会自动从上游获取对应类型完成填充,
-并从“rowtype_fields”中删除 id 字段。 该方法可用于自定义字段类型和属性的修改。
+并从"rowtype_fields"中删除 id 字段。 该方法可用于自定义字段类型和属性的修改。
可以使用以下占位符:
@@ -346,6 +347,27 @@ sink {
}
```
+### 大小写敏感配置
+
+```hocon
+sink {
+ Doris {
+ fenodes = "e2e_dorisdb:8030"
+ username = root
+ password = ""
+ database = "Test_DB" # 保留原始大小写
+ table = "Test_Table" # 保留原始大小写
+ case_sensitive = true # 默认值,保留原始大小写
+ sink.enable-2pc = "true"
+ sink.label-prefix = "test_case_sensitive"
+ doris.config = {
+ format = "json"
+ read_json_by_line = "true"
+ }
+ }
+}
+```
+
## 变更日志
<ChangeLog />
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java
index ee750e15c3..7c2215bf1f 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java
@@ -36,6 +36,7 @@ import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.PASS
import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.QUERY_PORT;
import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.TABLE;
import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.USERNAME;
+import static
org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.CASE_SENSITIVE;
import static
org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.DORIS_SINK_CONFIG_PREFIX;
import static
org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.NEEDS_UNSUPPORTED_TYPE_CASTING;
import static
org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE;
@@ -71,6 +72,7 @@ public class DorisSinkConfig implements Serializable {
private Integer bufferCount;
private Properties streamLoadProps;
private boolean needsUnsupportedTypeCasting;
+ private boolean caseSensitive;
// create table option
private String createTableTemplate;
@@ -102,7 +104,7 @@ public class DorisSinkConfig implements Serializable {
dorisSinkConfig.setBufferCount(config.get(SINK_BUFFER_COUNT));
dorisSinkConfig.setEnableDelete(config.get(SINK_ENABLE_DELETE));
dorisSinkConfig.setNeedsUnsupportedTypeCasting(config.get(NEEDS_UNSUPPORTED_TYPE_CASTING));
-
+ dorisSinkConfig.setCaseSensitive(config.get(CASE_SENSITIVE));
// create table option
dorisSinkConfig.setCreateTableTemplate(config.get(SAVE_MODE_CREATE_TEMPLATE));
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java
index 28494eb40e..730c9aa2d6 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java
@@ -107,6 +107,13 @@ public class DorisSinkOptions extends DorisBaseOptions {
.withDescription(
"Whether to enable the unsupported type casting,
such as Decimal64 to Double");
+ public static final Option<Boolean> CASE_SENSITIVE =
+ Options.key("case_sensitive")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "Whether to preserve the original case of table
and column names. Default is true (case sensitive)");
+
// create table
public static final Option<String> SAVE_MODE_CREATE_TEMPLATE =
Options.key("save_mode_create_template")
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java
index bfd2fc7873..2dae0c32e3 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java
@@ -38,6 +38,7 @@ import java.util.stream.Collectors;
import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DATABASE;
import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DORIS_BATCH_SIZE;
import static
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.TABLE;
+import static
org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.CASE_SENSITIVE;
import static
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_EXEC_MEM_LIMIT;
import static
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_FILTER_QUERY;
import static
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_READ_FIELD;
@@ -78,6 +79,20 @@ public class DorisTableConfig implements Serializable {
if (connectorConfig.getOptional(TABLE_LIST).isPresent()) {
tableList = connectorConfig.get(TABLE_LIST);
} else {
+ DorisTableConfig dorisTableConfig = new DorisTableConfig();
+ dorisTableConfig.setDatabase(connectorConfig.get(DATABASE));
+ dorisTableConfig.setTable(connectorConfig.get(TABLE));
+
+ boolean caseSensitive = true;
+ if (connectorConfig.getOptional(CASE_SENSITIVE).isPresent()) {
+ caseSensitive = connectorConfig.get(CASE_SENSITIVE);
+ }
+
+ if (!caseSensitive) {
+
dorisTableConfig.setDatabase(dorisTableConfig.getDatabase().toLowerCase());
+
dorisTableConfig.setTable(dorisTableConfig.getTable().toLowerCase());
+ }
+
DorisTableConfig tableProperty =
DorisTableConfig.builder()
.table(connectorConfig.get(TABLE))
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java
index a4888f3fdd..df057b38c4 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java
@@ -94,10 +94,12 @@ public abstract class AbstractDorisTypeConverter implements
TypeConverter<BasicT
public static final long MAX_STRING_LENGTH = 2147483643;
protected PhysicalColumn.PhysicalColumnBuilder getPhysicalColumnBuilder(
- BasicTypeDefine typeDefine) {
+ BasicTypeDefine typeDefine, boolean caseSensitive) {
+ String columnName =
+ caseSensitive ? typeDefine.getName() :
typeDefine.getName().toLowerCase();
PhysicalColumn.PhysicalColumnBuilder builder =
PhysicalColumn.builder()
- .name(typeDefine.getName())
+ .name(columnName)
.sourceType(typeDefine.getColumnType())
.nullable(typeDefine.isNullable())
.defaultValue(typeDefine.getDefaultValue())
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java
index c508b9a14c..a3c4684f7d 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java
@@ -48,7 +48,12 @@ public class DorisTypeConverterV1 extends
AbstractDorisTypeConverter {
@Override
public Column convert(BasicTypeDefine typeDefine) {
- PhysicalColumn.PhysicalColumnBuilder builder =
getPhysicalColumnBuilder(typeDefine);
+ return convert(typeDefine, true);
+ }
+
+ public Column convert(BasicTypeDefine typeDefine, boolean caseSensitive) {
+ PhysicalColumn.PhysicalColumnBuilder builder =
+ getPhysicalColumnBuilder(typeDefine, caseSensitive);
String dorisColumnType = getDorisColumnName(typeDefine);
switch (dorisColumnType) {
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java
index 4a24e1d5da..9d9be808f1 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java
@@ -68,7 +68,12 @@ public class DorisTypeConverterV2 extends
AbstractDorisTypeConverter {
@Override
public Column convert(BasicTypeDefine typeDefine) {
- PhysicalColumn.PhysicalColumnBuilder builder =
getPhysicalColumnBuilder(typeDefine);
+ return convert(typeDefine, true);
+ }
+
+ public Column convert(BasicTypeDefine typeDefine, boolean caseSensitive) {
+ PhysicalColumn.PhysicalColumnBuilder builder =
+ getPhysicalColumnBuilder(typeDefine, caseSensitive);
String dorisColumnType = getDorisColumnName(typeDefine);
switch (dorisColumnType) {
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
index c984580f8e..023724ada0 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
@@ -45,27 +45,45 @@ public class SeaTunnelRowSerializer implements
DorisSerializer {
private final String fieldDelimiter;
private final boolean enableDelete;
private final SerializationSchema serialize;
+ private final boolean caseSensitive;
public SeaTunnelRowSerializer(
String type,
SeaTunnelRowType seaTunnelRowType,
String fieldDelimiter,
boolean enableDelete) {
+ this(type, seaTunnelRowType, fieldDelimiter, enableDelete, true);
+ }
+
+ public SeaTunnelRowSerializer(
+ String type,
+ SeaTunnelRowType seaTunnelRowType,
+ String fieldDelimiter,
+ boolean enableDelete,
+ boolean caseSensitive) {
this.type = type;
this.fieldDelimiter = fieldDelimiter;
this.enableDelete = enableDelete;
- List<Object> fieldNames = new
ArrayList<>(Arrays.asList(seaTunnelRowType.getFieldNames()));
+ this.caseSensitive = caseSensitive;
+
+ String[] fieldNames = seaTunnelRowType.getFieldNames();
+ String[] processedFieldNames = new String[fieldNames.length];
+ for (int i = 0; i < fieldNames.length; i++) {
+ processedFieldNames[i] = caseSensitive ? fieldNames[i] :
fieldNames[i].toLowerCase();
+ }
+
+ List<Object> fieldNamesList = new
ArrayList<>(Arrays.asList(processedFieldNames));
List<SeaTunnelDataType<?>> fieldTypes =
new
ArrayList<>(Arrays.asList(seaTunnelRowType.getFieldTypes()));
if (enableDelete) {
- fieldNames.add(LoadConstants.DORIS_DELETE_SIGN);
+ fieldNamesList.add(LoadConstants.DORIS_DELETE_SIGN);
fieldTypes.add(STRING_TYPE);
}
this.seaTunnelRowType =
new SeaTunnelRowType(
- fieldNames.toArray(new String[0]),
+ fieldNamesList.toArray(new String[0]),
fieldTypes.toArray(new SeaTunnelDataType<?>[0]));
if (JSON.equals(type)) {
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializerFactory.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializerFactory.java
new file mode 100644
index 0000000000..faadae097c
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializerFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.doris.config.DorisSinkConfig;
+import org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants;
+
+public class SeaTunnelRowSerializerFactory {
+
+ /**
+ * Create a DorisSerializer instance
+ *
+ * @param dorisSinkConfig
+ * @param seaTunnelRowType
+ * @return DorisSerializer
+ */
+ public static DorisSerializer createSerializer(
+ DorisSinkConfig dorisSinkConfig, SeaTunnelRowType
seaTunnelRowType) {
+ return new SeaTunnelRowSerializer(
+ dorisSinkConfig
+ .getStreamLoadProps()
+ .getProperty(LoadConstants.FORMAT_KEY)
+ .toLowerCase(),
+ seaTunnelRowType,
+
dorisSinkConfig.getStreamLoadProps().getProperty(LoadConstants.FIELD_DELIMITER_KEY),
+ dorisSinkConfig.getEnableDelete(),
+ dorisSinkConfig.isCaseSensitive());
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
index 131012648f..a2b4f3da99 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
@@ -36,7 +36,7 @@ import
org.apache.seatunnel.connectors.doris.exception.DorisSchemaChangeExceptio
import org.apache.seatunnel.connectors.doris.rest.models.RespContent;
import org.apache.seatunnel.connectors.doris.schema.SchemaChangeManager;
import org.apache.seatunnel.connectors.doris.serialize.DorisSerializer;
-import org.apache.seatunnel.connectors.doris.serialize.SeaTunnelRowSerializer;
+import
org.apache.seatunnel.connectors.doris.serialize.SeaTunnelRowSerializerFactory;
import org.apache.seatunnel.connectors.doris.sink.LoadStatus;
import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo;
import org.apache.seatunnel.connectors.doris.util.HttpUtil;
@@ -266,13 +266,6 @@ public class DorisSinkWriter
private DorisSerializer createSerializer(
DorisSinkConfig dorisSinkConfig, SeaTunnelRowType
seaTunnelRowType) {
- return new SeaTunnelRowSerializer(
- dorisSinkConfig
- .getStreamLoadProps()
- .getProperty(LoadConstants.FORMAT_KEY)
- .toLowerCase(),
- seaTunnelRowType,
-
dorisSinkConfig.getStreamLoadProps().getProperty(LoadConstants.FIELD_DELIMITER_KEY),
- dorisSinkConfig.getEnableDelete());
+ return SeaTunnelRowSerializerFactory.createSerializer(dorisSinkConfig,
seaTunnelRowType);
}
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
index 6bc6ab9146..db916728f4 100644
---
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
+++
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
@@ -94,7 +94,10 @@ public class DorisStreamLoad implements Serializable {
CloseableHttpClient httpClient) {
this.hostPort = hostPort;
this.db = tablePath.getDatabaseName();
- this.table = tablePath.getTableName();
+ this.table =
+ dorisSinkConfig.isCaseSensitive()
+ ? tablePath.getTableName()
+ : tablePath.getTableName().toLowerCase();
this.user = dorisSinkConfig.getUsername();
this.passwd = dorisSinkConfig.getPassword();
this.labelGenerator = labelGenerator;
diff --git
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConvertorV1Test.java
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConvertorV1Test.java
index b387dc3138..fe72a41d77 100644
---
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConvertorV1Test.java
+++
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConvertorV1Test.java
@@ -873,4 +873,48 @@ public class DorisTypeConvertorV1Test {
Assertions.assertEquals("ARRAY<DECIMALV3(20, 0)>",
typeDefine.getColumnType());
Assertions.assertEquals("ARRAY<DECIMALV3>", typeDefine.getDataType());
}
+
+ @Test
+ public void testCaseSensitiveDefault() {
+ BasicTypeDefine<Object> typeDefine =
+ BasicTypeDefine.builder()
+ .name("Test_Column")
+ .columnType("varchar(255)")
+ .dataType("varchar")
+ .build();
+
+ Column column = DorisTypeConverterV1.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals("Test_Column", column.getName());
+ }
+
+ @Test
+ public void testCaseSensitiveFalse() {
+ BasicTypeDefine<Object> typeDefine =
+ BasicTypeDefine.builder()
+ .name("Test_Column")
+ .columnType("varchar(255)")
+ .dataType("varchar")
+ .build();
+
+ Column column = DorisTypeConverterV1.INSTANCE.convert(typeDefine,
false);
+ Assertions.assertEquals("test_column", column.getName());
+ }
+
+ @Test
+ public void testCaseSensitiveWithMixedCaseTypes() {
+ BasicTypeDefine<Object> typeDefine =
+ BasicTypeDefine.builder()
+ .name("mixed_case_column")
+ .columnType("VarChar(255)")
+ .dataType("VARCHAR")
+ .build();
+
+ Column columnSensitive =
DorisTypeConverterV1.INSTANCE.convert(typeDefine, true);
+ Assertions.assertEquals("mixed_case_column",
columnSensitive.getName());
+ Assertions.assertEquals(BasicType.STRING_TYPE,
columnSensitive.getDataType());
+
+ Column columnInsensitive =
DorisTypeConverterV1.INSTANCE.convert(typeDefine, false);
+ Assertions.assertEquals("mixed_case_column",
columnInsensitive.getName());
+ Assertions.assertEquals(BasicType.STRING_TYPE,
columnInsensitive.getDataType());
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConvertorV2Test.java
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConvertorV2Test.java
index ae1df0756f..841f555bd1 100644
---
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConvertorV2Test.java
+++
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConvertorV2Test.java
@@ -1234,4 +1234,48 @@ public class DorisTypeConvertorV2Test {
Assertions.assertEquals("MAP<DATETIME(6), STRING>",
typeDefine.getColumnType());
Assertions.assertEquals("MAP<DATETIME(6), STRING>",
typeDefine.getDataType());
}
+
+ @Test
+ public void testCaseSensitiveDefault() {
+ BasicTypeDefine<Object> typeDefine =
+ BasicTypeDefine.builder()
+ .name("Test_Column")
+ .columnType("varchar(255)")
+ .dataType("varchar")
+ .build();
+
+ Column column = DorisTypeConverterV2.INSTANCE.convert(typeDefine);
+ Assertions.assertEquals("Test_Column", column.getName());
+ }
+
+ @Test
+ public void testCaseSensitiveFalse() {
+ BasicTypeDefine<Object> typeDefine =
+ BasicTypeDefine.builder()
+ .name("Test_Column")
+ .columnType("varchar(255)")
+ .dataType("varchar")
+ .build();
+
+ Column column = DorisTypeConverterV2.INSTANCE.convert(typeDefine,
false);
+ Assertions.assertEquals("test_column", column.getName());
+ }
+
+ @Test
+ public void testCaseSensitiveWithMixedCaseTypes() {
+ BasicTypeDefine<Object> typeDefine =
+ BasicTypeDefine.builder()
+ .name("mixed_case_column")
+ .columnType("VarChar(255)")
+ .dataType("VARCHAR")
+ .build();
+
+ Column columnSensitive =
DorisTypeConverterV2.INSTANCE.convert(typeDefine, true);
+ Assertions.assertEquals("mixed_case_column",
columnSensitive.getName());
+ Assertions.assertEquals(BasicType.STRING_TYPE,
columnSensitive.getDataType());
+
+ Column columnInsensitive =
DorisTypeConverterV2.INSTANCE.convert(typeDefine, false);
+ Assertions.assertEquals("mixed_case_column",
columnInsensitive.getName());
+ Assertions.assertEquals(BasicType.STRING_TYPE,
columnInsensitive.getDataType());
+ }
}