This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new eca9af8ceba branch-3.1: [fix](csv reader) fix data loss when
concurrency read using multi char line delimiter #53374 (#53424)
eca9af8ceba is described below
commit eca9af8ceba1d0df6f54590ac958ff335df945e5
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Jul 17 18:04:20 2025 +0800
branch-3.1: [fix](csv reader) fix data loss when concurrency read using
multi char line delimiter #53374 (#53424)
Cherry-picked from #53374
Co-authored-by: hui lai <[email protected]>
---
be/src/vec/exec/format/csv/csv_reader.cpp | 8 ++-
.../ddl/test_multi_char_line_delimiter.sql | 76 +++++++++++++++++++++
.../test_multi_char_line_delimiter.groovy | 77 ++++++++++++++++++++++
3 files changed, 159 insertions(+), 2 deletions(-)
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 15fa0d35770..a853137eeb0 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -244,8 +244,12 @@ Status CsvReader::init_reader(bool is_load) {
_file_format_type != TFileFormatType::FORMAT_CSV_PLAIN)) {
return Status::InternalError<false>("For now we do not support
split compressed file");
}
- _start_offset -= 1;
- _size += 1;
+ // pre-read to promise first line skipped always read
+ int64_t pre_read_len = std::min(
+
static_cast<int64_t>(_params.file_attributes.text_params.line_delimiter.size()),
+ _start_offset);
+ _start_offset -= pre_read_len;
+ _size += pre_read_len;
// not first range will always skip one line
_skip_lines = 1;
}
diff --git
a/regression-test/suites/load_p2/broker_load/ddl/test_multi_char_line_delimiter.sql
b/regression-test/suites/load_p2/broker_load/ddl/test_multi_char_line_delimiter.sql
new file mode 100644
index 00000000000..c0be1514f90
--- /dev/null
+++
b/regression-test/suites/load_p2/broker_load/ddl/test_multi_char_line_delimiter.sql
@@ -0,0 +1,76 @@
+CREATE TABLE `test_multi_char_line_delimiter` (
+ `col1` bigint NULL,
+ `col2` bigint NULL,
+ `col3` varchar(765) NULL,
+ `col4` varchar(765) NULL,
+ `col5` bigint NULL,
+ `col6` bigint NULL,
+ `col7` int NULL,
+ `col8` int NULL,
+ `col9` tinyint NULL,
+ `col10` bigint NULL,
+ `col11` datetime NULL,
+ `col12` bigint NULL,
+ `col13` bigint NULL,
+ `col14` int NULL,
+ `col15` bigint MIN NULL,
+ `col16` decimal(19,4) SUM NULL,
+ `col17` decimal(19,4) SUM NULL,
+ `col18` decimal(19,4) SUM NULL,
+ `col19` decimal(19,4) SUM NULL,
+ `col20` decimal(19,4) SUM NULL,
+ `col21` decimal(19,4) SUM NULL,
+ `col22` decimal(19,4) SUM NULL,
+ `col23` decimal(19,4) SUM NULL,
+ `col24` decimal(19,4) SUM NULL,
+ `col25` decimal(19,4) SUM NULL,
+ `col26` decimal(19,4) SUM NULL,
+ `col27` decimal(19,4) SUM NULL,
+ `col28` decimal(19,4) SUM NULL,
+ `col29` decimal(19,4) SUM NULL,
+ `col30` decimal(19,4) SUM NULL,
+ `col31` decimal(19,4) SUM NULL,
+ `col32` decimal(19,4) SUM NULL,
+ `col33` decimal(19,4) SUM NULL DEFAULT "0",
+ `col34` decimal(19,4) SUM NULL DEFAULT "0",
+ `col35` decimal(19,4) SUM NULL DEFAULT "0",
+ `col36` decimal(19,4) SUM NULL DEFAULT "0",
+ `col37` decimal(19,4) SUM NULL DEFAULT "0",
+ `col38` decimal(19,4) SUM NULL DEFAULT "0",
+ `col39` decimal(19,4) SUM NULL DEFAULT "0",
+ `col40` decimal(19,4) SUM NULL DEFAULT "0",
+ `col41` decimal(19,4) SUM NULL DEFAULT "0",
+ `col42` decimal(19,4) SUM NULL DEFAULT "0",
+ `col43` decimal(19,4) SUM NULL DEFAULT "0",
+ `col44` decimal(19,4) SUM NULL DEFAULT "0",
+ `col45` decimal(19,4) SUM NULL DEFAULT "0",
+ `col46` decimal(19,4) SUM NULL DEFAULT "0",
+ `col47` decimal(19,4) SUM NULL DEFAULT "0",
+ `col48` decimal(19,4) SUM NULL DEFAULT "0",
+ `col49` decimal(19,4) SUM NULL DEFAULT "0",
+ `col50` decimal(19,4) SUM NULL DEFAULT "0",
+ `col51` decimal(19,4) SUM NULL,
+ `col52` datetime MIN NULL,
+ `col53` bigint MIN NULL,
+ `col54` datetime MAX NULL,
+ `col55` bigint MAX NULL,
+ `col56` tinyint MIN NULL,
+ `col57` bitmap BITMAP_UNION NOT NULL DEFAULT BITMAP_EMPTY
+) ENGINE=OLAP
+AGGREGATE KEY(`col1`, `col2`, `col3`, `col4`, `col5`, `col6`, `col7`, `col8`,
`col9`, `col10`, `col11`, `col12`, `col13`, `col14`)
+PARTITION BY RANGE(`col12`, `col11`)
+(PARTITION p_default VALUES [("0", '1900-01-01 00:00:00'), ("99999",
'2030-01-01 00:00:00')))
+DISTRIBUTED BY HASH(`col8`) BUCKETS 1
+PROPERTIES (
+"file_cache_ttl_seconds" = "0",
+"is_being_synced" = "false",
+"storage_medium" = "hdd",
+"storage_format" = "V2",
+"inverted_index_storage_format" = "V2",
+"light_schema_change" = "true",
+"disable_auto_compaction" = "false",
+"enable_single_replica_compaction" = "false",
+"group_commit_interval_ms" = "10000",
+"group_commit_data_bytes" = "134217728",
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file
diff --git
a/regression-test/suites/load_p2/broker_load/test_multi_char_line_delimiter.groovy
b/regression-test/suites/load_p2/broker_load/test_multi_char_line_delimiter.groovy
new file mode 100644
index 00000000000..184b3d5cf07
--- /dev/null
+++
b/regression-test/suites/load_p2/broker_load/test_multi_char_line_delimiter.groovy
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_multi_char_line_delimiter", "p2") {
+ def s3BucketName = getS3BucketName()
+ def s3Endpoint = getS3Endpoint()
+ def s3Region = getS3Region()
+ def ak = getS3AK()
+ def sk = getS3SK()
+ def tableName = "test_multi_char_line_delimiter"
+ def label = "test_multi_char_line_delimiter"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql new File("""${context.file.parent}/ddl/${tableName}.sql""").text
+ sql """
+ LOAD LABEL ${label}
+ (
+ DATA
INFILE("s3://${s3BucketName}/regression/load/data/test_multi_char_line_delimiter*.csv")
+ INTO TABLE ${tableName}
+ COLUMNS TERMINATED BY "\tcolumn_separator"
+ LINES TERMINATED BY "\nline_delimiter"
+ FORMAT AS CSV
+
(`col1`,`col2`,`col3`,`col4`,`col5`,`col6`,`col7`,`col8`,`col9`,`col10`,`col11`,`col12`,`col13`,`col14`,`col15`,`col16`,`col17`,`col18`,`col19`,`col20`,`col21`,`col22`,`col23`,`col24`,`col25`,`col26`,`col27`,`col28`,`col29`,`col30`,`col31`,`col32`,`col33`,`col34`,`col35`,`col36`,`col37`,`col38`,`col39`,`col40`,`col41`,`col42`,`col43`,`col44`,`col45`,`col46`,`col47`,`col48`,`col49`,`col50`,`col51`,`col52`,`col53`,`col54`,`col55`,`col56`,`col57`)
+
SET(`col1`=`col1`,`col2`=`col2`,`col3`=`col3`,`col4`=`col4`,`col5`=`col5`,`col6`=`col6`,`col7`=`col7`,`col8`=`col8`,`col9`=`col9`,`col10`=`col10`,`col11`=`col11`,`col12`=`col12`,`col13`=`col13`,`col14`=`col14`,`col15`=`col15`,`col16`=`col16`,`col17`=`col17`,`col18`=`col18`,`col19`=`col19`,`col20`=`col20`,`col21`=`col21`,`col22`=`col22`,`col23`=`col23`,`col24`=`col24`,`col25`=`col25`,`col26`=`col26`,`col27`=`col27`,`col28`=`col28`,`col29`=`col29`,`col30`=`col30`,`col31`=`col31
[...]
+ )
+ WITH S3
+ (
+ "s3.region" = "${s3Region}",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.access_key" = "${ak}",
+ "s3.secret_key" = "${sk}"
+ )
+ PROPERTIES
+ (
+ "timeout" = "3600",
+ "load_parallelism" = "4"
+ );
+ """
+
+ def max_try_milli_secs = 600000
+ while (max_try_milli_secs > 0) {
+ def String[][] result = sql """ show load where label="$label"; """
+ logger.info("Load status: " + result[0][2] + ", label: $label")
+ if (result[0][2].equals("FINISHED")) {
+ logger.info("Load FINISHED " + label)
+ break;
+ }
+ if (result[0][2].equals("CANCELLED")) {
+ assertTrue(false, "load failed: $result")
+ break;
+ }
+ Thread.sleep(1000)
+ max_try_milli_secs -= 1000
+ if(max_try_milli_secs <= 0) {
+ assertTrue(1 == 2, "load Timeout: $label")
+ }
+ }
+
+ def result = sql """ select count(*) from ${tableName}; """
+ logger.info("result: ${result[0][0]}")
+ assertTrue(result[0][0] == 2060625, "load result is not correct")
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]