This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9d1cffa5e1 [Feature][Connector-doris] Adds case insensitivity feature 
(#9306)
9d1cffa5e1 is described below

commit 9d1cffa5e1e8cc848d80c7f01e51d9c42f4bf23e
Author: yzeng1618 <[email protected]>
AuthorDate: Sat May 17 09:43:57 2025 +0800

    [Feature][Connector-doris] Adds case insensitivity feature (#9306)
---
 docs/en/connector-v2/changelog/connector-doris.md  |  1 +
 docs/en/connector-v2/sink/Doris.md                 | 21 ++++++++++
 docs/zh/connector-v2/changelog/connector-doris.md  |  1 +
 docs/zh/connector-v2/sink/Doris.md                 | 24 +++++++++++-
 .../connectors/doris/config/DorisSinkConfig.java   |  4 +-
 .../connectors/doris/config/DorisSinkOptions.java  |  7 ++++
 .../connectors/doris/config/DorisTableConfig.java  | 15 ++++++++
 .../doris/datatype/AbstractDorisTypeConverter.java |  6 ++-
 .../doris/datatype/DorisTypeConverterV1.java       |  7 +++-
 .../doris/datatype/DorisTypeConverterV2.java       |  7 +++-
 .../doris/serialize/SeaTunnelRowSerializer.java    | 24 ++++++++++--
 .../serialize/SeaTunnelRowSerializerFactory.java   | 45 ++++++++++++++++++++++
 .../doris/sink/writer/DorisSinkWriter.java         | 11 +-----
 .../doris/sink/writer/DorisStreamLoad.java         |  5 ++-
 .../doris/datatype/DorisTypeConvertorV1Test.java   | 44 +++++++++++++++++++++
 .../doris/datatype/DorisTypeConvertorV2Test.java   | 44 +++++++++++++++++++++
 16 files changed, 247 insertions(+), 19 deletions(-)

diff --git a/docs/en/connector-v2/changelog/connector-doris.md 
b/docs/en/connector-v2/changelog/connector-doris.md
index 02b7976f85..aee41efac5 100644
--- a/docs/en/connector-v2/changelog/connector-doris.md
+++ b/docs/en/connector-v2/changelog/connector-doris.md
@@ -2,6 +2,7 @@
 
 | Change | Commit | Version |
 | --- | --- | --- |
+|[Feature][connector-doris] Added case-sensitive feature for table and column 
names|https://github.com/apache/seatunnel/commit/|2.3.11|
 |[Improve] doris options 
(#8745)|https://github.com/apache/seatunnel/commit/268d76cbf3|2.3.10|
 |[Improve] restruct connector common options 
(#8634)|https://github.com/apache/seatunnel/commit/f3499a6eeb|2.3.10|
 |[Fix][Connector-V2] fix starRocks automatically creates tables with comment 
(#8568)|https://github.com/apache/seatunnel/commit/c4cb1fc4a3|2.3.10|
diff --git a/docs/en/connector-v2/sink/Doris.md 
b/docs/en/connector-v2/sink/Doris.md
index 21bad549ce..bf97d2b469 100644
--- a/docs/en/connector-v2/sink/Doris.md
+++ b/docs/en/connector-v2/sink/Doris.md
@@ -57,6 +57,7 @@ The internal implementation of Doris sink connector is cached 
and imported by st
 | sink.buffer-count              | int     | No       | 3                      
      | the buffer count to cache data for stream load.                         
                                                                                
                                                                                
                             |
 | doris.batch.size               | int     | No       | 1024                   
      | the batch size of the write to doris each http request, when the row 
reaches the size or checkpoint is executed, the data of cached will write to 
server.                                                                         
                                   |
 | needs_unsupported_type_casting | boolean | No       | false                  
      | Whether to enable the unsupported type casting, such as Decimal64 to 
Double                                                                          
                                                                                
                                |
+| case_sensitive                 | boolean | No       | true                   
      | Whether to preserve the original case of table and column names. When 
set to false, table and column names will be converted to lowercase.            
                                                                                
                                |
 | schema_save_mode               | Enum    | no       | 
CREATE_SCHEMA_WHEN_NOT_EXIST | the schema save mode, please refer to 
`schema_save_mode` below                                                        
                                                                                
                                                               |
 | data_save_mode                 | Enum    | no       | APPEND_DATA            
      | the data save mode, please refer to `data_save_mode` below              
                                                                                
                                                                                
                             |
 | save_mode_create_template      | string  | no       | see below              
      | see below                                                               
                                                                                
                                                                                
                             |
@@ -347,6 +348,26 @@ sink {
         }
     }
 }
+
+### Case-Sensitive Configuration
+
+```hocon
+sink {
+    Doris {
+        fenodes = "e2e_dorisdb:8030"
+        username = root
+        password = ""
+        database = "Test_DB"  # Original case will be preserved
+        table = "Test_Table"  # Original case will be preserved
+        case_sensitive = true # Default value, preserves original case
+        sink.enable-2pc = "true"
+        sink.label-prefix = "test_case_sensitive"
+        doris.config = {
+          format = "json"
+          read_json_by_line = "true"
+        }
+    }
+}
 ```
 
 ### Multiple table
diff --git a/docs/zh/connector-v2/changelog/connector-doris.md 
b/docs/zh/connector-v2/changelog/connector-doris.md
index 02b7976f85..5a8d91d513 100644
--- a/docs/zh/connector-v2/changelog/connector-doris.md
+++ b/docs/zh/connector-v2/changelog/connector-doris.md
@@ -2,6 +2,7 @@
 
 | Change | Commit | Version |
 | --- | --- | --- |
+|[Feature][connector-doris] 
添加表名和字段名大小写敏感功能|https://github.com/apache/seatunnel/commit/|2.3.11|
 |[Improve] doris options 
(#8745)|https://github.com/apache/seatunnel/commit/268d76cbf3|2.3.10|
 |[Improve] restruct connector common options 
(#8634)|https://github.com/apache/seatunnel/commit/f3499a6eeb|2.3.10|
 |[Fix][Connector-V2] fix starRocks automatically creates tables with comment 
(#8568)|https://github.com/apache/seatunnel/commit/c4cb1fc4a3|2.3.10|
diff --git a/docs/zh/connector-v2/sink/Doris.md 
b/docs/zh/connector-v2/sink/Doris.md
index d95d321a05..97f79ed4a2 100644
--- a/docs/zh/connector-v2/sink/Doris.md
+++ b/docs/zh/connector-v2/sink/Doris.md
@@ -56,6 +56,7 @@ Doris Sink连接器的内部实现是通过stream load批量缓存和导入的
 | sink.buffer-count              | int     | No       | 3                      
      | 用于缓存stream load数据的缓冲区计数。                                                
                                                                              |
 | doris.batch.size               | int     | No       | 1024                   
      | 每次http请求写入doris的批量大小,当row达到该大小或者执行checkpoint时,缓存的数据就会写入服务器。             
                                                                              |
 | needs_unsupported_type_casting | boolean | No       | false                  
      | 是否启用不支持的类型转换,例如 Decimal64 到 Double。                                     
                                                                              |
+| case_sensitive                 | boolean | No       | true                   
      | 是否保留表名和字段名的原始大小写。当设置为 false 时,表名和字段名将被转换为小写。                            
                                                            |
 | schema_save_mode               | Enum    | no       | 
CREATE_SCHEMA_WHEN_NOT_EXIST | schema保存模式,请参考下面的`schema_save_mode`              
                                                                                
                     |
 | data_save_mode                 | Enum    | no       | APPEND_DATA            
      | 数据保存模式,请参考下面的`data_save_mode`。                                          
                                                                              |
 | save_mode_create_template      | string  | no       | see below              
      | 见下文。                                                                    
                                                                              |
@@ -121,7 +122,7 @@ CREATE TABLE IF NOT EXISTS `${database}`.`${table_name}`
 ```
 
 连接器会自动从上游获取对应类型完成填充,
-并从“rowtype_fields”中删除 id 字段。 该方法可用于自定义字段类型和属性的修改。
+并从"rowtype_fields"中删除 id 字段。 该方法可用于自定义字段类型和属性的修改。
 
 可以使用以下占位符:
 
@@ -346,6 +347,27 @@ sink {
 }
 ```
 
+### 大小写敏感配置
+
+```hocon
+sink {
+    Doris {
+        fenodes = "e2e_dorisdb:8030"
+        username = root
+        password = ""
+        database = "Test_DB"  # 保留原始大小写
+        table = "Test_Table"  # 保留原始大小写
+        case_sensitive = true # 默认值,保留原始大小写
+        sink.enable-2pc = "true"
+        sink.label-prefix = "test_case_sensitive"
+        doris.config = {
+          format = "json"
+          read_json_by_line = "true"
+        }
+    }
+}
+```
+
 ## 变更日志
 
 <ChangeLog />
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java
index ee750e15c3..7c2215bf1f 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkConfig.java
@@ -36,6 +36,7 @@ import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.PASS
 import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.QUERY_PORT;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.TABLE;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.USERNAME;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.CASE_SENSITIVE;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.DORIS_SINK_CONFIG_PREFIX;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.NEEDS_UNSUPPORTED_TYPE_CASTING;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.SAVE_MODE_CREATE_TEMPLATE;
@@ -71,6 +72,7 @@ public class DorisSinkConfig implements Serializable {
     private Integer bufferCount;
     private Properties streamLoadProps;
     private boolean needsUnsupportedTypeCasting;
+    private boolean caseSensitive;
 
     // create table option
     private String createTableTemplate;
@@ -102,7 +104,7 @@ public class DorisSinkConfig implements Serializable {
         dorisSinkConfig.setBufferCount(config.get(SINK_BUFFER_COUNT));
         dorisSinkConfig.setEnableDelete(config.get(SINK_ENABLE_DELETE));
         
dorisSinkConfig.setNeedsUnsupportedTypeCasting(config.get(NEEDS_UNSUPPORTED_TYPE_CASTING));
-
+        dorisSinkConfig.setCaseSensitive(config.get(CASE_SENSITIVE));
         // create table option
         
dorisSinkConfig.setCreateTableTemplate(config.get(SAVE_MODE_CREATE_TEMPLATE));
 
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java
index 28494eb40e..730c9aa2d6 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisSinkOptions.java
@@ -107,6 +107,13 @@ public class DorisSinkOptions extends DorisBaseOptions {
                     .withDescription(
                             "Whether to enable the unsupported type casting, 
such as Decimal64 to Double");
 
+    public static final Option<Boolean> CASE_SENSITIVE =
+            Options.key("case_sensitive")
+                    .booleanType()
+                    .defaultValue(true)
+                    .withDescription(
+                            "Whether to preserve the original case of table 
and column names. Default is true (case sensitive)");
+
     // create table
     public static final Option<String> SAVE_MODE_CREATE_TEMPLATE =
             Options.key("save_mode_create_template")
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java
index bfd2fc7873..2dae0c32e3 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/config/DorisTableConfig.java
@@ -38,6 +38,7 @@ import java.util.stream.Collectors;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DATABASE;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.DORIS_BATCH_SIZE;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisBaseOptions.TABLE;
+import static 
org.apache.seatunnel.connectors.doris.config.DorisSinkOptions.CASE_SENSITIVE;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_EXEC_MEM_LIMIT;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_FILTER_QUERY;
 import static 
org.apache.seatunnel.connectors.doris.config.DorisSourceOptions.DORIS_READ_FIELD;
@@ -78,6 +79,20 @@ public class DorisTableConfig implements Serializable {
         if (connectorConfig.getOptional(TABLE_LIST).isPresent()) {
             tableList = connectorConfig.get(TABLE_LIST);
         } else {
+            DorisTableConfig dorisTableConfig = new DorisTableConfig();
+            dorisTableConfig.setDatabase(connectorConfig.get(DATABASE));
+            dorisTableConfig.setTable(connectorConfig.get(TABLE));
+
+            boolean caseSensitive = true;
+            if (connectorConfig.getOptional(CASE_SENSITIVE).isPresent()) {
+                caseSensitive = connectorConfig.get(CASE_SENSITIVE);
+            }
+
+            if (!caseSensitive) {
+                
dorisTableConfig.setDatabase(dorisTableConfig.getDatabase().toLowerCase());
+                
dorisTableConfig.setTable(dorisTableConfig.getTable().toLowerCase());
+            }
+
             DorisTableConfig tableProperty =
                     DorisTableConfig.builder()
                             .table(connectorConfig.get(TABLE))
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java
index a4888f3fdd..df057b38c4 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/AbstractDorisTypeConverter.java
@@ -94,10 +94,12 @@ public abstract class AbstractDorisTypeConverter implements 
TypeConverter<BasicT
     public static final long MAX_STRING_LENGTH = 2147483643;
 
     protected PhysicalColumn.PhysicalColumnBuilder getPhysicalColumnBuilder(
-            BasicTypeDefine typeDefine) {
+            BasicTypeDefine typeDefine, boolean caseSensitive) {
+        String columnName =
+                caseSensitive ? typeDefine.getName() : 
typeDefine.getName().toLowerCase();
         PhysicalColumn.PhysicalColumnBuilder builder =
                 PhysicalColumn.builder()
-                        .name(typeDefine.getName())
+                        .name(columnName)
                         .sourceType(typeDefine.getColumnType())
                         .nullable(typeDefine.isNullable())
                         .defaultValue(typeDefine.getDefaultValue())
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java
index c508b9a14c..a3c4684f7d 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV1.java
@@ -48,7 +48,12 @@ public class DorisTypeConverterV1 extends 
AbstractDorisTypeConverter {
 
     @Override
     public Column convert(BasicTypeDefine typeDefine) {
-        PhysicalColumn.PhysicalColumnBuilder builder = 
getPhysicalColumnBuilder(typeDefine);
+        return convert(typeDefine, true);
+    }
+
+    public Column convert(BasicTypeDefine typeDefine, boolean caseSensitive) {
+        PhysicalColumn.PhysicalColumnBuilder builder =
+                getPhysicalColumnBuilder(typeDefine, caseSensitive);
         String dorisColumnType = getDorisColumnName(typeDefine);
 
         switch (dorisColumnType) {
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java
index 4a24e1d5da..9d9be808f1 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConverterV2.java
@@ -68,7 +68,12 @@ public class DorisTypeConverterV2 extends 
AbstractDorisTypeConverter {
 
     @Override
     public Column convert(BasicTypeDefine typeDefine) {
-        PhysicalColumn.PhysicalColumnBuilder builder = 
getPhysicalColumnBuilder(typeDefine);
+        return convert(typeDefine, true);
+    }
+
+    public Column convert(BasicTypeDefine typeDefine, boolean caseSensitive) {
+        PhysicalColumn.PhysicalColumnBuilder builder =
+                getPhysicalColumnBuilder(typeDefine, caseSensitive);
         String dorisColumnType = getDorisColumnName(typeDefine);
 
         switch (dorisColumnType) {
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
index c984580f8e..023724ada0 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializer.java
@@ -45,27 +45,45 @@ public class SeaTunnelRowSerializer implements 
DorisSerializer {
     private final String fieldDelimiter;
     private final boolean enableDelete;
     private final SerializationSchema serialize;
+    private final boolean caseSensitive;
 
     public SeaTunnelRowSerializer(
             String type,
             SeaTunnelRowType seaTunnelRowType,
             String fieldDelimiter,
             boolean enableDelete) {
+        this(type, seaTunnelRowType, fieldDelimiter, enableDelete, true);
+    }
+
+    public SeaTunnelRowSerializer(
+            String type,
+            SeaTunnelRowType seaTunnelRowType,
+            String fieldDelimiter,
+            boolean enableDelete,
+            boolean caseSensitive) {
         this.type = type;
         this.fieldDelimiter = fieldDelimiter;
         this.enableDelete = enableDelete;
-        List<Object> fieldNames = new 
ArrayList<>(Arrays.asList(seaTunnelRowType.getFieldNames()));
+        this.caseSensitive = caseSensitive;
+
+        String[] fieldNames = seaTunnelRowType.getFieldNames();
+        String[] processedFieldNames = new String[fieldNames.length];
+        for (int i = 0; i < fieldNames.length; i++) {
+            processedFieldNames[i] = caseSensitive ? fieldNames[i] : 
fieldNames[i].toLowerCase();
+        }
+
+        List<Object> fieldNamesList = new 
ArrayList<>(Arrays.asList(processedFieldNames));
         List<SeaTunnelDataType<?>> fieldTypes =
                 new 
ArrayList<>(Arrays.asList(seaTunnelRowType.getFieldTypes()));
 
         if (enableDelete) {
-            fieldNames.add(LoadConstants.DORIS_DELETE_SIGN);
+            fieldNamesList.add(LoadConstants.DORIS_DELETE_SIGN);
             fieldTypes.add(STRING_TYPE);
         }
 
         this.seaTunnelRowType =
                 new SeaTunnelRowType(
-                        fieldNames.toArray(new String[0]),
+                        fieldNamesList.toArray(new String[0]),
                         fieldTypes.toArray(new SeaTunnelDataType<?>[0]));
 
         if (JSON.equals(type)) {
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializerFactory.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializerFactory.java
new file mode 100644
index 0000000000..faadae097c
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/serialize/SeaTunnelRowSerializerFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.doris.serialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.doris.config.DorisSinkConfig;
+import org.apache.seatunnel.connectors.doris.sink.writer.LoadConstants;
+
+public class SeaTunnelRowSerializerFactory {
+
+    /**
+     * Create a DorisSerializer instance
+     *
+     * @param dorisSinkConfig
+     * @param seaTunnelRowType
+     * @return DorisSerializer
+     */
+    public static DorisSerializer createSerializer(
+            DorisSinkConfig dorisSinkConfig, SeaTunnelRowType 
seaTunnelRowType) {
+        return new SeaTunnelRowSerializer(
+                dorisSinkConfig
+                        .getStreamLoadProps()
+                        .getProperty(LoadConstants.FORMAT_KEY)
+                        .toLowerCase(),
+                seaTunnelRowType,
+                
dorisSinkConfig.getStreamLoadProps().getProperty(LoadConstants.FIELD_DELIMITER_KEY),
+                dorisSinkConfig.getEnableDelete(),
+                dorisSinkConfig.isCaseSensitive());
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
index 131012648f..a2b4f3da99 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java
@@ -36,7 +36,7 @@ import 
org.apache.seatunnel.connectors.doris.exception.DorisSchemaChangeExceptio
 import org.apache.seatunnel.connectors.doris.rest.models.RespContent;
 import org.apache.seatunnel.connectors.doris.schema.SchemaChangeManager;
 import org.apache.seatunnel.connectors.doris.serialize.DorisSerializer;
-import org.apache.seatunnel.connectors.doris.serialize.SeaTunnelRowSerializer;
+import 
org.apache.seatunnel.connectors.doris.serialize.SeaTunnelRowSerializerFactory;
 import org.apache.seatunnel.connectors.doris.sink.LoadStatus;
 import org.apache.seatunnel.connectors.doris.sink.committer.DorisCommitInfo;
 import org.apache.seatunnel.connectors.doris.util.HttpUtil;
@@ -266,13 +266,6 @@ public class DorisSinkWriter
 
     private DorisSerializer createSerializer(
             DorisSinkConfig dorisSinkConfig, SeaTunnelRowType 
seaTunnelRowType) {
-        return new SeaTunnelRowSerializer(
-                dorisSinkConfig
-                        .getStreamLoadProps()
-                        .getProperty(LoadConstants.FORMAT_KEY)
-                        .toLowerCase(),
-                seaTunnelRowType,
-                
dorisSinkConfig.getStreamLoadProps().getProperty(LoadConstants.FIELD_DELIMITER_KEY),
-                dorisSinkConfig.getEnableDelete());
+        return SeaTunnelRowSerializerFactory.createSerializer(dorisSinkConfig, 
seaTunnelRowType);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
index 6bc6ab9146..db916728f4 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisStreamLoad.java
@@ -94,7 +94,10 @@ public class DorisStreamLoad implements Serializable {
             CloseableHttpClient httpClient) {
         this.hostPort = hostPort;
         this.db = tablePath.getDatabaseName();
-        this.table = tablePath.getTableName();
+        this.table =
+                dorisSinkConfig.isCaseSensitive()
+                        ? tablePath.getTableName()
+                        : tablePath.getTableName().toLowerCase();
         this.user = dorisSinkConfig.getUsername();
         this.passwd = dorisSinkConfig.getPassword();
         this.labelGenerator = labelGenerator;
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConvertorV1Test.java
 
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConvertorV1Test.java
index b387dc3138..fe72a41d77 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConvertorV1Test.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConvertorV1Test.java
@@ -873,4 +873,48 @@ public class DorisTypeConvertorV1Test {
         Assertions.assertEquals("ARRAY<DECIMALV3(20, 0)>", 
typeDefine.getColumnType());
         Assertions.assertEquals("ARRAY<DECIMALV3>", typeDefine.getDataType());
     }
+
+    @Test
+    public void testCaseSensitiveDefault() {
+        BasicTypeDefine<Object> typeDefine =
+                BasicTypeDefine.builder()
+                        .name("Test_Column")
+                        .columnType("varchar(255)")
+                        .dataType("varchar")
+                        .build();
+
+        Column column = DorisTypeConverterV1.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals("Test_Column", column.getName());
+    }
+
+    @Test
+    public void testCaseSensitiveFalse() {
+        BasicTypeDefine<Object> typeDefine =
+                BasicTypeDefine.builder()
+                        .name("Test_Column")
+                        .columnType("varchar(255)")
+                        .dataType("varchar")
+                        .build();
+
+        Column column = DorisTypeConverterV1.INSTANCE.convert(typeDefine, 
false);
+        Assertions.assertEquals("test_column", column.getName());
+    }
+
+    @Test
+    public void testCaseSensitiveWithMixedCaseTypes() {
+        BasicTypeDefine<Object> typeDefine =
+                BasicTypeDefine.builder()
+                        .name("mixed_case_column")
+                        .columnType("VarChar(255)")
+                        .dataType("VARCHAR")
+                        .build();
+
+        Column columnSensitive = 
DorisTypeConverterV1.INSTANCE.convert(typeDefine, true);
+        Assertions.assertEquals("mixed_case_column", 
columnSensitive.getName());
+        Assertions.assertEquals(BasicType.STRING_TYPE, 
columnSensitive.getDataType());
+
+        Column columnInsensitive = 
DorisTypeConverterV1.INSTANCE.convert(typeDefine, false);
+        Assertions.assertEquals("mixed_case_column", 
columnInsensitive.getName());
+        Assertions.assertEquals(BasicType.STRING_TYPE, 
columnInsensitive.getDataType());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConvertorV2Test.java
 
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConvertorV2Test.java
index ae1df0756f..841f555bd1 100644
--- 
a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConvertorV2Test.java
+++ 
b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/datatype/DorisTypeConvertorV2Test.java
@@ -1234,4 +1234,48 @@ public class DorisTypeConvertorV2Test {
         Assertions.assertEquals("MAP<DATETIME(6), STRING>", 
typeDefine.getColumnType());
         Assertions.assertEquals("MAP<DATETIME(6), STRING>", 
typeDefine.getDataType());
     }
+
+    @Test
+    public void testCaseSensitiveDefault() {
+        BasicTypeDefine<Object> typeDefine =
+                BasicTypeDefine.builder()
+                        .name("Test_Column")
+                        .columnType("varchar(255)")
+                        .dataType("varchar")
+                        .build();
+
+        Column column = DorisTypeConverterV2.INSTANCE.convert(typeDefine);
+        Assertions.assertEquals("Test_Column", column.getName());
+    }
+
+    @Test
+    public void testCaseSensitiveFalse() {
+        BasicTypeDefine<Object> typeDefine =
+                BasicTypeDefine.builder()
+                        .name("Test_Column")
+                        .columnType("varchar(255)")
+                        .dataType("varchar")
+                        .build();
+
+        Column column = DorisTypeConverterV2.INSTANCE.convert(typeDefine, 
false);
+        Assertions.assertEquals("test_column", column.getName());
+    }
+
+    @Test
+    public void testCaseSensitiveWithMixedCaseTypes() {
+        BasicTypeDefine<Object> typeDefine =
+                BasicTypeDefine.builder()
+                        .name("mixed_case_column")
+                        .columnType("VarChar(255)")
+                        .dataType("VARCHAR")
+                        .build();
+
+        Column columnSensitive = 
DorisTypeConverterV2.INSTANCE.convert(typeDefine, true);
+        Assertions.assertEquals("mixed_case_column", 
columnSensitive.getName());
+        Assertions.assertEquals(BasicType.STRING_TYPE, 
columnSensitive.getDataType());
+
+        Column columnInsensitive = 
DorisTypeConverterV2.INSTANCE.convert(typeDefine, false);
+        Assertions.assertEquals("mixed_case_column", 
columnInsensitive.getName());
+        Assertions.assertEquals(BasicType.STRING_TYPE, 
columnInsensitive.getDataType());
+    }
 }

Reply via email to