This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new f28cbd39857 [fix](set) fix coredump of set op if total data size 
exceeds 4G (#61471)
f28cbd39857 is described below

commit f28cbd39857117e1266ebc8f00c0d5f5413dbb1b
Author: TengJianPing <[email protected]>
AuthorDate: Fri Mar 20 15:54:06 2026 +0800

    [fix](set) fix coredump of set op if total data size exceeds 4G (#61471)
    
    ### What problem does this PR solve?
    
    Issue Number: close #xxx
    
    Related PR: #xxx
    
    Problem Summary:
    Root Cause Analysis
    
      核心原因:SetSinkOperatorX::sink() 中 build_block
      被多次覆盖,导致哈希表中的旧条目成为悬空引用。
    
      问题链路
    
      1. build_block 被覆盖
    
      在 set_sink_operator.cpp:52-56:
    
    if (eos || local_state._mutable_block.allocated_bytes() >=
    BUILD_BLOCK_MAX_SIZE) { // 4GB
    build_block = local_state._mutable_block.to_block(); // 覆盖 build_block!
    RETURN_IF_ERROR(_process_build_block(local_state, build_block, state));
          local_state._mutable_block.clear();
      }
    
      当数据总量超过 BUILD_BLOCK_MAX_SIZE(4GB)时,这个 flush 会触发多次:
    
      - 第一次 flush(allocated_bytes >= 4GB时):build_block = batch1(假设包含 rows
      0..N1),哈希表存入 row_num = 0, 1, ..., N1
      - 第二次 flush(eos 时):build_block = batch2(新数据,rows 0..N2),batch1
      的数据被销毁。哈希表新增 row_num = 0, 1, ..., N2
    
      2. 哈希表只存 row_num,不存 block 引用
    
    RowRefListWithFlags 继承自 RowRef,只存储 uint32_t row_num(join_op.h:46),没有
    block
      指针或 offset。
    
      在 hash_table_set_build.h:39,构建时存入的是:Mapped {k},即行号 k。
    
      3. 输出阶段使用单一 build_block
    
      在 set_source_operator.cpp:161-162:
    
      auto& column = *build_block.get_by_position(idx->second).column;
    local_state._mutable_cols[idx->first]->insert_from(column, it->row_num);
    
      此时 build_block 是最后一次 flush 的 batch2。但哈希表中来自 batch1 的条目的 row_num
      可能超出 batch2 的行数范围。
    
      4. 越界访问导致 SIGSEGV
    
      当 batch1 的 row_num = X(X > batch2 的行数)被用于 insert_from(column, X) 时:
    
      // column_string.h:180-197
    const size_t size_to_append = src.offsets[X] - src.offsets[X - 1]; //
    越界读取 → 垃圾值
    const size_t offset = src.offsets[X - 1]; // 垃圾值
      // ...
    memcpy(..., &src.chars[offset], size_to_append); // 垃圾 offset → 访问未映射内存
    →
      SIGSEGV
    
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/exec/operator/set_sink_operator.cpp         |  36 +++---
 be/src/exec/operator/set_source_operator.cpp       |   4 +-
 be/test/exec/operator/set_operator_test.cpp        | 137 +++++++++++++++++++++
 .../query_p2/test_set_operation_large_string.out   |  14 +++
 .../test_set_operation_large_string.groovy         | 102 +++++++++++++++
 5 files changed, 277 insertions(+), 16 deletions(-)

diff --git a/be/src/exec/operator/set_sink_operator.cpp 
b/be/src/exec/operator/set_sink_operator.cpp
index c26265f652d..11e1cb61600 100644
--- a/be/src/exec/operator/set_sink_operator.cpp
+++ b/be/src/exec/operator/set_sink_operator.cpp
@@ -63,7 +63,6 @@ Status SetSinkLocalState<is_intersect>::close(RuntimeState* 
state, Status exec_s
 
 template <bool is_intersect>
 Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, Block* 
in_block, bool eos) {
-    constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
     RETURN_IF_CANCELLED(state);
     auto& local_state = get_local_state(state);
 
@@ -74,9 +73,14 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* 
state, Block* in_block
     auto& valid_element_in_hash_tbl = 
local_state._shared_state->valid_element_in_hash_tbl;
 
     if (in_block->rows() != 0) {
+        if (local_state._mutable_block.empty()) {
+            auto tmp_build_block = *(in_block->create_same_struct_block(0, 
false));
+            local_state._mutable_block = 
MutableBlock::build_mutable_block(&tmp_build_block);
+        }
+
         {
             SCOPED_TIMER(local_state._merge_block_timer);
-            RETURN_IF_ERROR(local_state._mutable_block.merge(*in_block));
+            
RETURN_IF_ERROR(local_state._mutable_block.merge_ignore_overflow(std::move(*in_block)));
         }
         if (local_state._mutable_block.rows() > 
std::numeric_limits<uint32_t>::max()) {
             return Status::NotSupported("set operator do not support build 
table rows over:" +
@@ -84,26 +88,24 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* 
state, Block* in_block
         }
     }
 
-    if (eos || local_state._mutable_block.allocated_bytes() >= 
BUILD_BLOCK_MAX_SIZE) {
+    if (eos) {
         SCOPED_TIMER(local_state._build_timer);
         build_block = local_state._mutable_block.to_block();
         RETURN_IF_ERROR(_process_build_block(local_state, build_block, state));
         local_state._mutable_block.clear();
 
-        if (eos) {
-            uint64_t hash_table_size = 
local_state._shared_state->get_hash_table_size();
-            valid_element_in_hash_tbl = is_intersect ? 0 : hash_table_size;
+        uint64_t hash_table_size = 
local_state._shared_state->get_hash_table_size();
+        valid_element_in_hash_tbl = is_intersect ? 0 : hash_table_size;
 
-            // record hash table
-            COUNTER_SET(local_state._hash_table_size, 
(int64_t)hash_table_size);
-            COUNTER_SET(local_state._valid_element_in_hash_table, 
valid_element_in_hash_tbl);
+        // record hash table
+        COUNTER_SET(local_state._hash_table_size, (int64_t)hash_table_size);
+        COUNTER_SET(local_state._valid_element_in_hash_table, 
valid_element_in_hash_tbl);
 
-            
local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1]
-                    ->set_ready();
-            DCHECK_GT(_child_quantity, 1);
-            
RETURN_IF_ERROR(local_state._runtime_filter_producer_helper->send_filter_size(
-                    state, hash_table_size, local_state._finish_dependency));
-        }
+        
local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1]
+                ->set_ready();
+        DCHECK_GT(_child_quantity, 1);
+        
RETURN_IF_ERROR(local_state._runtime_filter_producer_helper->send_filter_size(
+                state, hash_table_size, local_state._finish_dependency));
     }
     return Status::OK();
 }
@@ -117,6 +119,10 @@ Status 
SetSinkOperatorX<is_intersect>::_process_build_block(
     }
 
     materialize_block_inplace(block);
+    // Dispose the overflow of ColumnString
+    for (auto& data : block) {
+        data.column = 
std::move(*data.column).mutate()->convert_column_if_overflow();
+    }
     ColumnRawPtrs raw_ptrs(_child_exprs.size());
     RETURN_IF_ERROR(_extract_build_column(local_state, block, raw_ptrs, rows));
     auto st = Status::OK();
diff --git a/be/src/exec/operator/set_source_operator.cpp 
b/be/src/exec/operator/set_source_operator.cpp
index 55defd9dacc..75f28b4127b 100644
--- a/be/src/exec/operator/set_source_operator.cpp
+++ b/be/src/exec/operator/set_source_operator.cpp
@@ -187,7 +187,9 @@ void 
SetSourceLocalState<is_intersect>::_add_result_columns() {
 
     for (auto& idx : build_col_idx) {
         const auto& column = *build_block.get_by_position(idx.second).column;
-        column.append_data_by_selector(_mutable_cols[idx.first], 
_result_indexs);
+        // use insert_indices_from to support ColumnString64
+        _mutable_cols[idx.first]->insert_indices_from(column, 
_result_indexs.data(),
+                                                      
&_result_indexs[_result_indexs.size()]);
     }
 }
 template class SetSourceLocalState<true>;
diff --git a/be/test/exec/operator/set_operator_test.cpp 
b/be/test/exec/operator/set_operator_test.cpp
index d85e7a04c06..620d13583d7 100644
--- a/be/test/exec/operator/set_operator_test.cpp
+++ b/be/test/exec/operator/set_operator_test.cpp
@@ -384,6 +384,143 @@ TEST_F(ExceptOperatorTest, test_output_null_batsh_size) {
     }
 }
 
+TEST_F(IntersectOperatorTest, test_sink_large_string_data_over_4g) {
+    // Test that SetSinkOperatorX can handle string data exceeding 4GB total 
size.
+    // This exercises the convert_column_if_overflow path in 
_process_build_block.
+    init_op(2, {std::make_shared<DataTypeString>()});
+
+    sink_op->_child_exprs =
+            MockSlotRef::create_mock_contexts(DataTypes 
{std::make_shared<DataTypeString>()});
+    probe_sink_ops[0]->_child_exprs =
+            MockSlotRef::create_mock_contexts(DataTypes 
{std::make_shared<DataTypeString>()});
+
+    init_local_state();
+
+    // Create a large string (~1MB each) and insert enough rows to exceed 4GB 
total.
+    // We need total string data > 4GB to trigger ColumnString offset overflow
+    // and exercise the convert_column_if_overflow path in 
_process_build_block.
+    const size_t large_str_size = 1 * 1024 * 1024; // 1MB per string
+    const size_t num_rows = 4200;                  // ~4.1GB total
+    std::string large_str(large_str_size, 'x');
+
+    auto string_type = std::make_shared<DataTypeString>();
+
+    // Build a block with large strings and sink in batches (non-eos), then 
send eos.
+    const size_t rows_per_batch = 500;
+    for (size_t batch_start = 0; batch_start < num_rows; batch_start += 
rows_per_batch) {
+        size_t current_batch_size = std::min(rows_per_batch, num_rows - 
batch_start);
+
+        auto col = string_type->create_column();
+        for (size_t i = 0; i < current_batch_size; i++) {
+            // Make each string slightly different to avoid dedup in hash 
table.
+            // Modify large_str in-place, insert, then restore to avoid 
copying 1MB per row.
+            auto suffix = std::to_string(batch_start + i);
+            // Save original bytes
+            char saved[32];
+            std::memcpy(saved, large_str.data(), suffix.size());
+            // Stamp the suffix
+            std::memcpy(large_str.data(), suffix.data(), suffix.size());
+            col->insert_data(large_str.data(), large_str.size());
+            // Restore original bytes
+            std::memcpy(large_str.data(), saved, suffix.size());
+        }
+
+        Block block;
+        block.insert({std::move(col), string_type, "col0"});
+
+        bool is_last = (batch_start + rows_per_batch >= num_rows);
+        auto st = sink_op->sink(state.get(), &block, is_last);
+        EXPECT_TRUE(st.ok()) << st.to_string();
+    }
+
+    // Verify hash table was built successfully
+    EXPECT_EQ(shared_state->get_hash_table_size(), num_rows);
+
+    // Now probe with a small subset to verify correctness
+    {
+        auto col = string_type->create_column();
+        // Insert string matching row 0
+        auto suffix = std::to_string(0);
+        char saved[32];
+        std::memcpy(saved, large_str.data(), suffix.size());
+        std::memcpy(large_str.data(), suffix.data(), suffix.size());
+        col->insert_data(large_str.data(), large_str.size());
+        std::memcpy(large_str.data(), saved, suffix.size());
+
+        Block block;
+        block.insert({std::move(col), string_type, "col0"});
+        EXPECT_TRUE(probe_sink_ops[0]->sink(states[0].get(), &block, true));
+    }
+
+    // Read from source - for INTERSECT, should get the one matching row
+    {
+        Block block;
+        bool eos = false;
+        EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos));
+        EXPECT_EQ(block.rows(), 1);
+    }
+}
+
+TEST_F(ExceptOperatorTest, test_sink_large_string_data_over_4g) {
+    // Test that SetSinkOperatorX (EXCEPT) can handle string data exceeding 
4GB total size.
+    init_op(2, {std::make_shared<DataTypeString>()});
+
+    sink_op->_child_exprs =
+            MockSlotRef::create_mock_contexts(DataTypes 
{std::make_shared<DataTypeString>()});
+    probe_sink_ops[0]->_child_exprs =
+            MockSlotRef::create_mock_contexts(DataTypes 
{std::make_shared<DataTypeString>()});
+
+    init_local_state();
+
+    auto string_type = std::make_shared<DataTypeString>();
+    const size_t large_str_size = 1 * 1024 * 1024; // 1MB per string
+    const size_t num_rows = 4200;                  // ~4.1GB total
+    std::string large_str(large_str_size, 'y');
+
+    const size_t rows_per_batch = 100;
+    for (size_t batch_start = 0; batch_start < num_rows; batch_start += 
rows_per_batch) {
+        size_t current_batch_size = std::min(rows_per_batch, num_rows - 
batch_start);
+
+        auto col = string_type->create_column();
+        for (size_t i = 0; i < current_batch_size; i++) {
+            auto suffix = std::to_string(batch_start + i);
+            char saved[32];
+            std::memcpy(saved, large_str.data(), suffix.size());
+            std::memcpy(large_str.data(), suffix.data(), suffix.size());
+            col->insert_data(large_str.data(), large_str.size());
+            std::memcpy(large_str.data(), saved, suffix.size());
+        }
+
+        Block block;
+        block.insert({std::move(col), string_type, "col0"});
+
+        bool is_last = (batch_start + rows_per_batch >= num_rows);
+        auto st = sink_op->sink(state.get(), &block, is_last);
+        EXPECT_TRUE(st.ok()) << st.to_string();
+    }
+
+    EXPECT_EQ(shared_state->get_hash_table_size(), num_rows);
+
+    // Probe with empty block - EXCEPT should return all rows
+    {
+        Block block;
+        block.insert({string_type->create_column(), string_type, "col0"});
+        EXPECT_TRUE(probe_sink_ops[0]->sink(states[0].get(), &block, true));
+    }
+
+    // Read from source - for EXCEPT with empty probe, should get all build 
rows
+    {
+        size_t total_rows = 0;
+        bool eos = false;
+        while (!eos) {
+            Block block;
+            EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos));
+            total_rows += block.rows();
+        }
+        EXPECT_EQ(total_rows, num_rows);
+    }
+}
+
 TEST_F(IntersectOperatorTest, test_extract_probe_column) {
     init_op(2, 
{std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>()),
                 
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>())});
diff --git a/regression-test/data/query_p2/test_set_operation_large_string.out 
b/regression-test/data/query_p2/test_set_operation_large_string.out
new file mode 100644
index 00000000000..c744ed46620
--- /dev/null
+++ b/regression-test/data/query_p2/test_set_operation_large_string.out
@@ -0,0 +1,14 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !except_subset --
+4208xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+4209xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+
+-- !except_self --
+
+-- !intersect_subset --
+0xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+1xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+
+-- !intersect_self --
+4210
+
diff --git 
a/regression-test/suites/query_p2/test_set_operation_large_string.groovy 
b/regression-test/suites/query_p2/test_set_operation_large_string.groovy
new file mode 100644
index 00000000000..adde9418ebe
--- /dev/null
+++ b/regression-test/suites/query_p2/test_set_operation_large_string.groovy
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Test set operations (EXCEPT/INTERSECT) with large string data that exceeds 
4GB total.
+// This exercises the convert_column_if_overflow path in 
SetSinkOperatorX::_process_build_block.
+suite("test_set_operation_large_string") {
+    def totalRows = 4210
+    sql """ DROP TABLE IF EXISTS test_set_op_large_string """
+    sql """
+        CREATE TABLE test_set_op_large_string (
+            id INT NULL,
+            large_val STRING NULL
+        )
+        DISTRIBUTED BY HASH(id) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        );
+    """
+
+    // Each row has a ~1MB string. Insert 4200 rows to exceed 4GB total string 
data.
+    // Use repeat() to generate large strings efficiently.
+    // Insert in batches to avoid timeout.
+    def batchSize = 500
+    def strSizePerRow = 1048576 // 1MB
+
+    // repeat('x', 1048576) produces a 1MB string per row.
+    // We concat the id to make each row unique.
+    for (int batchStart = 0; batchStart < totalRows; batchStart += batchSize) {
+        def batchEnd = Math.min(batchStart + batchSize, totalRows)
+        sql """
+            INSERT INTO test_set_op_large_string
+            SELECT number, concat(cast(number as string), repeat('x', 
${strSizePerRow}))
+            FROM numbers("number" = "${batchEnd}")
+            WHERE number >= ${batchStart};
+        """
+    }
+
+    def rowCount = sql """ SELECT count(*) FROM test_set_op_large_string """
+    log.info("Inserted rows: ${rowCount}")
+    assert rowCount[0][0] == totalRows
+
+    sql " set parallel_pipeline_task_num = 1;"
+    sql " set enable_profile = true;"
+    sql " set batch_size = 128;"
+    sql " set runtime_filter_mode = off;"
+    sql """ set disable_nereids_rules = "INFER_SET_OPERATOR_DISTINCT";"""
+    // Test EXCEPT: all rows from test_set_op_large_string except a small 
subset should return most rows
+    qt_except_subset """
+        SELECT substring(large_val, 1, 100) FROM (
+            SELECT large_val FROM test_set_op_large_string
+            EXCEPT
+            SELECT large_val FROM test_set_op_large_string WHERE id < 4208
+        ) t
+        order by 1;
+    """
+
+    // Test EXCEPT: table minus itself should be empty
+    qt_except_self """
+        SELECT * FROM (
+            SELECT large_val FROM test_set_op_large_string
+            EXCEPT
+            SELECT large_val FROM test_set_op_large_string
+        ) t
+        order by 1;
+    """
+
+    // Test INTERSECT: intersection with a small subset should return the 
subset
+    qt_intersect_subset """
+        SELECT substring(large_val, 1, 100) FROM (
+            SELECT large_val FROM test_set_op_large_string
+            INTERSECT
+            SELECT large_val FROM test_set_op_large_string WHERE id < 2
+        ) t
+        order by 1;
+    """
+
+    // Test INTERSECT: table with itself should return all rows
+    qt_intersect_self """
+        SELECT count(*) FROM (
+            SELECT large_val FROM test_set_op_large_string
+            INTERSECT
+            SELECT large_val FROM test_set_op_large_string
+        ) t
+        order by 1;
+    """
+
+    sql """ DROP TABLE IF EXISTS test_set_op_large_string """
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to