This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 912ebd1ac92 branch-4.0: [fix](stream-load) Fix LZ4 compressed stream
load decompress made no progress error #60852 (#60980)
912ebd1ac92 is described below
commit 912ebd1ac92249e3c8347f997dff6311a2e9d96c
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Mar 3 20:05:15 2026 +0800
branch-4.0: [fix](stream-load) Fix LZ4 compressed stream load decompress
made no progress error #60852 (#60980)
Cherry-picked from #60852
Co-authored-by: Xin Liao <[email protected]>
---
.../file_reader/new_plain_text_line_reader.cpp | 9 ++-
.../test_json_lz4_decompress_progress.groovy | 84 ++++++++++++++++++++++
2 files changed, 92 insertions(+), 1 deletion(-)
diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
index 275d2add8b3..5cad41dc22c 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
+++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
@@ -232,6 +232,13 @@
NewPlainTextLineReader::NewPlainTextLineReader(RuntimeProfile* profile,
_decompress_timer(nullptr) {
_bytes_decompress_counter = ADD_COUNTER(_profile, "BytesDecompressed",
TUnit::BYTES);
_decompress_timer = ADD_TIMER(_profile, "DecompressTime");
+
+ DBUG_EXECUTE_IF("NewPlainTextLineReader.shrink_output_buf", {
+ size_t new_size = dp->param<int64_t>("output_buf_size", 64 * 1024);
+ delete[] _output_buf;
+ _output_buf = new uint8_t[new_size];
+ _output_buf_size = new_size;
+ });
}
NewPlainTextLineReader::~NewPlainTextLineReader() {
@@ -467,7 +474,7 @@ Status NewPlainTextLineReader::read_line(const uint8_t**
ptr, size_t* size, bool
COUNTER_UPDATE(_bytes_decompress_counter, decompressed_len);
// TODO(cmy): watch this case
- if ((input_read_bytes == 0 /*decompressed_len == 0*/) &&
_more_input_bytes == 0 &&
+ if (input_read_bytes == 0 && decompressed_len == 0 &&
_more_input_bytes == 0 &&
_more_output_bytes == 0) {
// decompress made no progress, may be
// A. input data is not enough to decompress data to output
diff --git
a/regression-test/suites/load_p0/stream_load/test_json_lz4_decompress_progress.groovy
b/regression-test/suites/load_p0/stream_load/test_json_lz4_decompress_progress.groovy
new file mode 100644
index 00000000000..ed69c957d0b
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_json_lz4_decompress_progress.groovy
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_json_lz4_decompress_progress", "p0,nonConcurrent") {
+ def tableName = "test_lz4_decompress_progress"
+ def debugPoint = "NewPlainTextLineReader.shrink_output_buf"
+
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE ${tableName}
+ (
+ k00 INT NOT NULL,
+ k01 DATE NOT NULL,
+ k02 BOOLEAN NULL,
+ k03 TINYINT NULL,
+ k04 SMALLINT NULL,
+ k05 INT NULL,
+ k06 BIGINT NULL,
+ k07 LARGEINT NULL,
+ k08 FLOAT NULL,
+ k09 DOUBLE NULL,
+ k10 DECIMAL(9,1) NULL,
+ k11 DECIMALV3(9,1) NULL,
+ k12 DATETIME NULL,
+ k13 DATEV2 NULL,
+ k14 DATETIMEV2 NULL,
+ k15 CHAR NULL,
+ k16 VARCHAR NULL,
+ k17 STRING NULL,
+ k18 JSON NULL
+
+ )
+ DUPLICATE KEY(k00)
+ DISTRIBUTED BY HASH(k00) BUCKETS 2
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+
+ try {
+ // Shrink output buffer to 2KB so that the decompressed LZ4 block
+ // (6.7KB) exceeds the buffer, triggering tmpOut internal buffering.
+ // Without the fix, this causes "decompress made no progress" error.
+ GetDebugPoint().enableDebugPointForAllBEs(debugPoint,
[output_buf_size: 2048])
+
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', '|'
+ set 'trim_double_quotes', 'true'
+ set 'format', 'csv'
+ set 'compress_type', 'LZ4'
+
+ file "basic_data.csv.lz4"
+
+ check { result, exception, startTime, endTime ->
+ if (exception != null) {
+ throw exception
+ }
+ log.info("Stream load result: ${result}".toString())
+ def json = parseJson(result)
+ assertEquals("success", json.Status.toLowerCase())
+ assertEquals(0, json.NumberFilteredRows)
+ }
+ }
+ } finally {
+ GetDebugPoint().disableDebugPointForAllBEs(debugPoint)
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]