This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 6e07b9289e1 branch-3.0: [fix](csv reader) fix data loss when
concurrency read using multi char line delimiter (#53374) (#53634)
6e07b9289e1 is described below
commit 6e07b9289e184a6cebf4817e37ef77e507bbbabf
Author: hui lai <[email protected]>
AuthorDate: Mon Jul 21 17:34:29 2025 +0800
branch-3.0: [fix](csv reader) fix data loss when concurrency read using
multi char line delimiter (#53374) (#53634)
pick (#53374)
Multiple concurrent split file locations will be determined in plan
phase, if the split point happens to be in the middle of the multi char
line delimiter:
- The previous concurrent will read the complete row1 and read a little
more to read the line delimiter.
- The latter concurrency will start reading from half of the multi char
line delimiter, and row2 is the first line of this concurrency, but the
first line in the middle range is always discarded, so row2 will be
lost.
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/vec/exec/format/csv/csv_reader.cpp | 8 ++-
.../ddl/test_multi_char_line_delimiter.sql | 76 +++++++++++++++++++++
.../test_multi_char_line_delimiter.groovy | 77 ++++++++++++++++++++++
3 files changed, 159 insertions(+), 2 deletions(-)
diff --git a/be/src/vec/exec/format/csv/csv_reader.cpp
b/be/src/vec/exec/format/csv/csv_reader.cpp
index 4c5adf6f4b1..3e3ee25284f 100644
--- a/be/src/vec/exec/format/csv/csv_reader.cpp
+++ b/be/src/vec/exec/format/csv/csv_reader.cpp
@@ -300,8 +300,12 @@ Status CsvReader::init_reader(bool is_load) {
_file_compress_type != TFileCompressType::PLAIN)) {
return Status::InternalError<false>("For now we do not support
split compressed file");
}
- start_offset -= 1;
- _size += 1;
+ // pre-read to promise first line skipped always read
+ int64_t pre_read_len = std::min(
+
static_cast<int64_t>(_params.file_attributes.text_params.line_delimiter.size()),
+ start_offset);
+ start_offset -= pre_read_len;
+ _size += pre_read_len;
// not first range will always skip one line
_skip_lines = 1;
}
diff --git
a/regression-test/suites/load_p2/broker_load/ddl/test_multi_char_line_delimiter.sql
b/regression-test/suites/load_p2/broker_load/ddl/test_multi_char_line_delimiter.sql
new file mode 100644
index 00000000000..c0be1514f90
--- /dev/null
+++
b/regression-test/suites/load_p2/broker_load/ddl/test_multi_char_line_delimiter.sql
@@ -0,0 +1,76 @@
+CREATE TABLE `test_multi_char_line_delimiter` (
+ `col1` bigint NULL,
+ `col2` bigint NULL,
+ `col3` varchar(765) NULL,
+ `col4` varchar(765) NULL,
+ `col5` bigint NULL,
+ `col6` bigint NULL,
+ `col7` int NULL,
+ `col8` int NULL,
+ `col9` tinyint NULL,
+ `col10` bigint NULL,
+ `col11` datetime NULL,
+ `col12` bigint NULL,
+ `col13` bigint NULL,
+ `col14` int NULL,
+ `col15` bigint MIN NULL,
+ `col16` decimal(19,4) SUM NULL,
+ `col17` decimal(19,4) SUM NULL,
+ `col18` decimal(19,4) SUM NULL,
+ `col19` decimal(19,4) SUM NULL,
+ `col20` decimal(19,4) SUM NULL,
+ `col21` decimal(19,4) SUM NULL,
+ `col22` decimal(19,4) SUM NULL,
+ `col23` decimal(19,4) SUM NULL,
+ `col24` decimal(19,4) SUM NULL,
+ `col25` decimal(19,4) SUM NULL,
+ `col26` decimal(19,4) SUM NULL,
+ `col27` decimal(19,4) SUM NULL,
+ `col28` decimal(19,4) SUM NULL,
+ `col29` decimal(19,4) SUM NULL,
+ `col30` decimal(19,4) SUM NULL,
+ `col31` decimal(19,4) SUM NULL,
+ `col32` decimal(19,4) SUM NULL,
+ `col33` decimal(19,4) SUM NULL DEFAULT "0",
+ `col34` decimal(19,4) SUM NULL DEFAULT "0",
+ `col35` decimal(19,4) SUM NULL DEFAULT "0",
+ `col36` decimal(19,4) SUM NULL DEFAULT "0",
+ `col37` decimal(19,4) SUM NULL DEFAULT "0",
+ `col38` decimal(19,4) SUM NULL DEFAULT "0",
+ `col39` decimal(19,4) SUM NULL DEFAULT "0",
+ `col40` decimal(19,4) SUM NULL DEFAULT "0",
+ `col41` decimal(19,4) SUM NULL DEFAULT "0",
+ `col42` decimal(19,4) SUM NULL DEFAULT "0",
+ `col43` decimal(19,4) SUM NULL DEFAULT "0",
+ `col44` decimal(19,4) SUM NULL DEFAULT "0",
+ `col45` decimal(19,4) SUM NULL DEFAULT "0",
+ `col46` decimal(19,4) SUM NULL DEFAULT "0",
+ `col47` decimal(19,4) SUM NULL DEFAULT "0",
+ `col48` decimal(19,4) SUM NULL DEFAULT "0",
+ `col49` decimal(19,4) SUM NULL DEFAULT "0",
+ `col50` decimal(19,4) SUM NULL DEFAULT "0",
+ `col51` decimal(19,4) SUM NULL,
+ `col52` datetime MIN NULL,
+ `col53` bigint MIN NULL,
+ `col54` datetime MAX NULL,
+ `col55` bigint MAX NULL,
+ `col56` tinyint MIN NULL,
+ `col57` bitmap BITMAP_UNION NOT NULL DEFAULT BITMAP_EMPTY
+) ENGINE=OLAP
+AGGREGATE KEY(`col1`, `col2`, `col3`, `col4`, `col5`, `col6`, `col7`, `col8`,
`col9`, `col10`, `col11`, `col12`, `col13`, `col14`)
+PARTITION BY RANGE(`col12`, `col11`)
+(PARTITION p_default VALUES [("0", '1900-01-01 00:00:00'), ("99999",
'2030-01-01 00:00:00')))
+DISTRIBUTED BY HASH(`col8`) BUCKETS 1
+PROPERTIES (
+"file_cache_ttl_seconds" = "0",
+"is_being_synced" = "false",
+"storage_medium" = "hdd",
+"storage_format" = "V2",
+"inverted_index_storage_format" = "V2",
+"light_schema_change" = "true",
+"disable_auto_compaction" = "false",
+"enable_single_replica_compaction" = "false",
+"group_commit_interval_ms" = "10000",
+"group_commit_data_bytes" = "134217728",
+"replication_allocation" = "tag.location.default: 1"
+);
\ No newline at end of file
diff --git
a/regression-test/suites/load_p2/broker_load/test_multi_char_line_delimiter.groovy
b/regression-test/suites/load_p2/broker_load/test_multi_char_line_delimiter.groovy
new file mode 100644
index 00000000000..184b3d5cf07
--- /dev/null
+++
b/regression-test/suites/load_p2/broker_load/test_multi_char_line_delimiter.groovy
@@ -0,0 +1,77 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_multi_char_line_delimiter", "p2") {
+ def s3BucketName = getS3BucketName()
+ def s3Endpoint = getS3Endpoint()
+ def s3Region = getS3Region()
+ def ak = getS3AK()
+ def sk = getS3SK()
+ def tableName = "test_multi_char_line_delimiter"
+ def label = "test_multi_char_line_delimiter"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql new File("""${context.file.parent}/ddl/${tableName}.sql""").text
+ sql """
+ LOAD LABEL ${label}
+ (
+ DATA
INFILE("s3://${s3BucketName}/regression/load/data/test_multi_char_line_delimiter*.csv")
+ INTO TABLE ${tableName}
+ COLUMNS TERMINATED BY "\tcolumn_separator"
+ LINES TERMINATED BY "\nline_delimiter"
+ FORMAT AS CSV
+
(`col1`,`col2`,`col3`,`col4`,`col5`,`col6`,`col7`,`col8`,`col9`,`col10`,`col11`,`col12`,`col13`,`col14`,`col15`,`col16`,`col17`,`col18`,`col19`,`col20`,`col21`,`col22`,`col23`,`col24`,`col25`,`col26`,`col27`,`col28`,`col29`,`col30`,`col31`,`col32`,`col33`,`col34`,`col35`,`col36`,`col37`,`col38`,`col39`,`col40`,`col41`,`col42`,`col43`,`col44`,`col45`,`col46`,`col47`,`col48`,`col49`,`col50`,`col51`,`col52`,`col53`,`col54`,`col55`,`col56`,`col57`)
+
SET(`col1`=`col1`,`col2`=`col2`,`col3`=`col3`,`col4`=`col4`,`col5`=`col5`,`col6`=`col6`,`col7`=`col7`,`col8`=`col8`,`col9`=`col9`,`col10`=`col10`,`col11`=`col11`,`col12`=`col12`,`col13`=`col13`,`col14`=`col14`,`col15`=`col15`,`col16`=`col16`,`col17`=`col17`,`col18`=`col18`,`col19`=`col19`,`col20`=`col20`,`col21`=`col21`,`col22`=`col22`,`col23`=`col23`,`col24`=`col24`,`col25`=`col25`,`col26`=`col26`,`col27`=`col27`,`col28`=`col28`,`col29`=`col29`,`col30`=`col30`,`col31`=`col31
[...]
+ )
+ WITH S3
+ (
+ "s3.region" = "${s3Region}",
+ "s3.endpoint" = "${s3Endpoint}",
+ "s3.access_key" = "${ak}",
+ "s3.secret_key" = "${sk}"
+ )
+ PROPERTIES
+ (
+ "timeout" = "3600",
+ "load_parallelism" = "4"
+ );
+ """
+
+ def max_try_milli_secs = 600000
+ while (max_try_milli_secs > 0) {
+ def String[][] result = sql """ show load where label="$label"; """
+ logger.info("Load status: " + result[0][2] + ", label: $label")
+ if (result[0][2].equals("FINISHED")) {
+ logger.info("Load FINISHED " + label)
+ break;
+ }
+ if (result[0][2].equals("CANCELLED")) {
+ assertTrue(false, "load failed: $result")
+ break;
+ }
+ Thread.sleep(1000)
+ max_try_milli_secs -= 1000
+ if(max_try_milli_secs <= 0) {
+ assertTrue(1 == 2, "load Timeout: $label")
+ }
+ }
+
+ def result = sql """ select count(*) from ${tableName}; """
+ logger.info("result: ${result[0][0]}")
+ assertTrue(result[0][0] == 2060625, "load result is not correct")
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]