This is an automated email from the ASF dual-hosted git repository.

morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new eca9af8ceba branch-3.1: [fix](csv reader) fix data loss when 
concurrency read using multi char line delimiter #53374 (#53424)
eca9af8ceba is described below

commit eca9af8ceba1d0df6f54590ac958ff335df945e5
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Jul 17 18:04:20 2025 +0800

    branch-3.1: [fix](csv reader) fix data loss when concurrency read using 
multi char line delimiter #53374 (#53424)
    
    Cherry-picked from #53374
    
    Co-authored-by: hui lai <[email protected]>
---
 be/src/vec/exec/format/csv/csv_reader.cpp          |  8 ++-
 .../ddl/test_multi_char_line_delimiter.sql         | 76 +++++++++++++++++++++
 .../test_multi_char_line_delimiter.groovy          | 77 ++++++++++++++++++++++
 3 files changed, 159 insertions(+), 2 deletions(-)

diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp 
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 15fa0d35770..a853137eeb0 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -244,8 +244,12 @@ Status CsvReader::init_reader(bool is_load) {
              _file_format_type != TFileFormatType::FORMAT_CSV_PLAIN)) {
             return Status::InternalError<false>("For now we do not support 
split compressed file");
         }
-        _start_offset -= 1;
-        _size += 1;
+        // pre-read to promise first line skipped always read
+        int64_t pre_read_len = std::min(
+                
static_cast<int64_t>(_params.file_attributes.text_params.line_delimiter.size()),
+                _start_offset);
+        _start_offset -= pre_read_len;
+        _size += pre_read_len;
         // not first range will always skip one line
         _skip_lines = 1;
     }
diff --git 
a/regression-test/suites/load_p2/broker_load/ddl/test_multi_char_line_delimiter.sql
 
b/regression-test/suites/load_p2/broker_load/ddl/test_multi_char_line_delimiter.sql
new file mode 100644
index 00000000000..c0be1514f90
--- /dev/null
+++ 
b/regression-test/suites/load_p2/broker_load/ddl/test_multi_char_line_delimiter.sql
@@ -0,0 +1,76 @@
+CREATE TABLE `test_multi_char_line_delimiter` (
+  `col1` bigint NULL,
+  `col2` bigint NULL,
+  `col3` varchar(765) NULL,
+  `col4` varchar(765) NULL,
+  `col5` bigint NULL,
+  `col6` bigint NULL,
+  `col7` int NULL,
+  `col8` int NULL,
+  `col9` tinyint NULL,
+  `col10` bigint NULL,
+  `col11` datetime NULL,
+  `col12` bigint NULL,
+  `col13` bigint NULL,
+  `col14` int NULL,
+  `col15` bigint MIN NULL,
+  `col16` decimal(19,4) SUM NULL,
+  `col17` decimal(19,4) SUM NULL,
+  `col18` decimal(19,4) SUM NULL,
+  `col19` decimal(19,4) SUM NULL,
+  `col20` decimal(19,4) SUM NULL,
+  `col21` decimal(19,4) SUM NULL,
+  `col22` decimal(19,4) SUM NULL,
+  `col23` decimal(19,4) SUM NULL,
+  `col24` decimal(19,4) SUM NULL,
+  `col25` decimal(19,4) SUM NULL,
+  `col26` decimal(19,4) SUM NULL,
+  `col27` decimal(19,4) SUM NULL,
+  `col28` decimal(19,4) SUM NULL,
+  `col29` decimal(19,4) SUM NULL,
+  `col30` decimal(19,4) SUM NULL,
+  `col31` decimal(19,4) SUM NULL,
+  `col32` decimal(19,4) SUM NULL,
+  `col33` decimal(19,4) SUM NULL DEFAULT "0",
+  `col34` decimal(19,4) SUM NULL DEFAULT "0",
+  `col35` decimal(19,4) SUM NULL DEFAULT "0",
+  `col36` decimal(19,4) SUM NULL DEFAULT "0",
+  `col37` decimal(19,4) SUM NULL DEFAULT "0",
+  `col38` decimal(19,4) SUM NULL DEFAULT "0",
+  `col39` decimal(19,4) SUM NULL DEFAULT "0",
+  `col40` decimal(19,4) SUM NULL DEFAULT "0",
+  `col41` decimal(19,4) SUM NULL DEFAULT "0",
+  `col42` decimal(19,4) SUM NULL DEFAULT "0",
+  `col43` decimal(19,4) SUM NULL DEFAULT "0",
+  `col44` decimal(19,4) SUM NULL DEFAULT "0",
+  `col45` decimal(19,4) SUM NULL DEFAULT "0",
+  `col46` decimal(19,4) SUM NULL DEFAULT "0",
+  `col47` decimal(19,4) SUM NULL DEFAULT "0",
+  `col48` decimal(19,4) SUM NULL DEFAULT "0",
+  `col49` decimal(19,4) SUM NULL DEFAULT "0",
+  `col50` decimal(19,4) SUM NULL DEFAULT "0",
+  `col51` decimal(19,4) SUM NULL,
+  `col52` datetime MIN NULL,
+  `col53` bigint MIN NULL,
+  `col54` datetime MAX NULL,
+  `col55` bigint MAX NULL,
+  `col56` tinyint MIN NULL,
+  `col57` bitmap BITMAP_UNION NOT NULL DEFAULT BITMAP_EMPTY
+) ENGINE=OLAP
+AGGREGATE KEY(`col1`, `col2`, `col3`, `col4`, `col5`, `col6`, `col7`, `col8`, 
`col9`, `col10`, `col11`, `col12`, `col13`, `col14`)
+PARTITION BY RANGE(`col12`, `col11`)
+(PARTITION p_default VALUES [("0", '1900-01-01 00:00:00'), ("99999", 
'2030-01-01 00:00:00')))
+DISTRIBUTED BY HASH(`col8`) BUCKETS 1
+PROPERTIES (
+"file_cache_ttl_seconds" = "0",
+"is_being_synced" = "false",
+"storage_medium" = "hdd",
+"storage_format" = "V2",
+"inverted_index_storage_format" = "V2",
+"light_schema_change" = "true",
+"disable_auto_compaction" = "false",
+"enable_single_replica_compaction" = "false",
+"group_commit_interval_ms" = "10000",
+"group_commit_data_bytes" = "134217728",
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file
diff --git 
a/regression-test/suites/load_p2/broker_load/test_multi_char_line_delimiter.groovy
 
b/regression-test/suites/load_p2/broker_load/test_multi_char_line_delimiter.groovy
new file mode 100644
index 00000000000..184b3d5cf07
--- /dev/null
+++ 
b/regression-test/suites/load_p2/broker_load/test_multi_char_line_delimiter.groovy
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_multi_char_line_delimiter", "p2") {
+    def s3BucketName = getS3BucketName()
+    def s3Endpoint = getS3Endpoint()
+    def s3Region = getS3Region()
+    def ak = getS3AK()
+    def sk = getS3SK()
+    def tableName = "test_multi_char_line_delimiter"
+    def label = "test_multi_char_line_delimiter"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql new File("""${context.file.parent}/ddl/${tableName}.sql""").text
+    sql """
+    LOAD LABEL ${label}
+        (
+            DATA 
INFILE("s3://${s3BucketName}/regression/load/data/test_multi_char_line_delimiter*.csv")
+            INTO TABLE ${tableName}
+            COLUMNS TERMINATED BY "\tcolumn_separator"
+            LINES TERMINATED BY "\nline_delimiter"        
+            FORMAT AS CSV  
+            
(`col1`,`col2`,`col3`,`col4`,`col5`,`col6`,`col7`,`col8`,`col9`,`col10`,`col11`,`col12`,`col13`,`col14`,`col15`,`col16`,`col17`,`col18`,`col19`,`col20`,`col21`,`col22`,`col23`,`col24`,`col25`,`col26`,`col27`,`col28`,`col29`,`col30`,`col31`,`col32`,`col33`,`col34`,`col35`,`col36`,`col37`,`col38`,`col39`,`col40`,`col41`,`col42`,`col43`,`col44`,`col45`,`col46`,`col47`,`col48`,`col49`,`col50`,`col51`,`col52`,`col53`,`col54`,`col55`,`col56`,`col57`)
+            
SET(`col1`=`col1`,`col2`=`col2`,`col3`=`col3`,`col4`=`col4`,`col5`=`col5`,`col6`=`col6`,`col7`=`col7`,`col8`=`col8`,`col9`=`col9`,`col10`=`col10`,`col11`=`col11`,`col12`=`col12`,`col13`=`col13`,`col14`=`col14`,`col15`=`col15`,`col16`=`col16`,`col17`=`col17`,`col18`=`col18`,`col19`=`col19`,`col20`=`col20`,`col21`=`col21`,`col22`=`col22`,`col23`=`col23`,`col24`=`col24`,`col25`=`col25`,`col26`=`col26`,`col27`=`col27`,`col28`=`col28`,`col29`=`col29`,`col30`=`col30`,`col31`=`col31
 [...]
+        )
+        WITH S3
+        (
+            "s3.region"  =  "${s3Region}",
+            "s3.endpoint"  =  "${s3Endpoint}",
+            "s3.access_key"   =  "${ak}",
+            "s3.secret_key"  =  "${sk}"
+        )
+        PROPERTIES
+        (
+            "timeout" = "3600",
+            "load_parallelism" = "4"
+        );
+    """
+
+    def max_try_milli_secs = 600000
+    while (max_try_milli_secs > 0) {
+        def String[][] result = sql """ show load where label="$label"; """
+        logger.info("Load status: " + result[0][2] + ", label: $label")
+        if (result[0][2].equals("FINISHED")) {
+            logger.info("Load FINISHED " + label)
+            break;
+        }
+        if (result[0][2].equals("CANCELLED")) {
+            assertTrue(false, "load failed: $result")
+            break;
+        }
+        Thread.sleep(1000)
+        max_try_milli_secs -= 1000
+        if(max_try_milli_secs <= 0) {
+            assertTrue(1 == 2, "load Timeout: $label")
+        }
+    }
+
+    def result = sql """ select count(*) from ${tableName}; """
+    logger.info("result: ${result[0][0]}")
+    assertTrue(result[0][0] == 2060625, "load result is not correct")
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to