This is an automated email from the ASF dual-hosted git repository.

liaoxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new adca4d6c51f [fix](stream-load) Fix LZ4 compressed stream load 
decompress made no progress error (#60852)
adca4d6c51f is described below

commit adca4d6c51fb66525c13134e0a4b29a597342c0a
Author: Xin Liao <[email protected]>
AuthorDate: Tue Mar 3 17:12:10 2026 +0800

    [fix](stream-load) Fix LZ4 compressed stream load decompress made no 
progress error (#60852)
    
    Problem Summary:
    When LZ4F_decompress produces output from its internal buffer (tmpOut)
    without consuming new input, input_read_bytes is 0 but decompressed_len
    is positive. The original condition only checked input_read_bytes == 0
    and incorrectly treated this as "no progress", returning an error.
    
    Add decompressed_len == 0 check to the condition so that real progress
    (output produced from decompressor internal buffer) is not mistakenly
    flagged as a stall.
    
    Also add a debug point to shrink the output buffer for regression
    testing, and a regression test that uses it to reproduce the bug
    scenario.
---
 .../file_reader/new_plain_text_line_reader.cpp     |  9 ++-
 .../test_json_lz4_decompress_progress.groovy       | 84 ++++++++++++++++++++++
 2 files changed, 92 insertions(+), 1 deletion(-)

diff --git a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp 
b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
index 275d2add8b3..5cad41dc22c 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
+++ b/be/src/vec/exec/format/file_reader/new_plain_text_line_reader.cpp
@@ -232,6 +232,13 @@ 
NewPlainTextLineReader::NewPlainTextLineReader(RuntimeProfile* profile,
           _decompress_timer(nullptr) {
     _bytes_decompress_counter = ADD_COUNTER(_profile, "BytesDecompressed", 
TUnit::BYTES);
     _decompress_timer = ADD_TIMER(_profile, "DecompressTime");
+
+    DBUG_EXECUTE_IF("NewPlainTextLineReader.shrink_output_buf", {
+        size_t new_size = dp->param<int64_t>("output_buf_size", 64 * 1024);
+        delete[] _output_buf;
+        _output_buf = new uint8_t[new_size];
+        _output_buf_size = new_size;
+    });
 }
 
 NewPlainTextLineReader::~NewPlainTextLineReader() {
@@ -467,7 +474,7 @@ Status NewPlainTextLineReader::read_line(const uint8_t** 
ptr, size_t* size, bool
                 COUNTER_UPDATE(_bytes_decompress_counter, decompressed_len);
 
                 // TODO(cmy): watch this case
-                if ((input_read_bytes == 0 /*decompressed_len == 0*/) && 
_more_input_bytes == 0 &&
+                if (input_read_bytes == 0 && decompressed_len == 0 && 
_more_input_bytes == 0 &&
                     _more_output_bytes == 0) {
                     // decompress made no progress, may be
                     // A. input data is not enough to decompress data to output
diff --git 
a/regression-test/suites/load_p0/stream_load/test_json_lz4_decompress_progress.groovy
 
b/regression-test/suites/load_p0/stream_load/test_json_lz4_decompress_progress.groovy
new file mode 100644
index 00000000000..ed69c957d0b
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_json_lz4_decompress_progress.groovy
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_json_lz4_decompress_progress", "p0,nonConcurrent") {
+    def tableName = "test_lz4_decompress_progress"
+    def debugPoint = "NewPlainTextLineReader.shrink_output_buf"
+
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE ${tableName}
+        (
+            k00 INT             NOT NULL,
+            k01 DATE            NOT NULL,
+            k02 BOOLEAN         NULL,
+            k03 TINYINT         NULL,
+            k04 SMALLINT        NULL,
+            k05 INT             NULL,
+            k06 BIGINT          NULL,
+            k07 LARGEINT        NULL,
+            k08 FLOAT           NULL,
+            k09 DOUBLE          NULL,
+            k10 DECIMAL(9,1)    NULL,
+            k11 DECIMALV3(9,1)  NULL,
+            k12 DATETIME        NULL,
+            k13 DATEV2          NULL,
+            k14 DATETIMEV2      NULL,
+            k15 CHAR            NULL,
+            k16 VARCHAR         NULL,
+            k17 STRING          NULL,
+            k18 JSON            NULL
+        
+        )
+        DUPLICATE KEY(k00)
+        DISTRIBUTED BY HASH(k00) BUCKETS 2
+        PROPERTIES (
+            "replication_num" = "1"
+        );
+     """
+
+
+    try {
+        // Shrink output buffer to 2KB so that the decompressed LZ4 block
+        // (6.7KB) exceeds the buffer, triggering tmpOut internal buffering.
+        // Without the fix, this causes "decompress made no progress" error.
+        GetDebugPoint().enableDebugPointForAllBEs(debugPoint, 
[output_buf_size: 2048])
+
+        streamLoad {
+            table "${tableName}"
+            set 'column_separator', '|'
+            set 'trim_double_quotes', 'true'
+            set 'format', 'csv'
+            set 'compress_type', 'LZ4'
+
+            file "basic_data.csv.lz4"
+
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(0, json.NumberFilteredRows)
+            }
+        }
+    } finally {
+        GetDebugPoint().disableDebugPointForAllBEs(debugPoint)
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to