This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new f28cbd39857 [fix](set) fix coredump of set op if total data size
exceeds 4G (#61471)
f28cbd39857 is described below
commit f28cbd39857117e1266ebc8f00c0d5f5413dbb1b
Author: TengJianPing <[email protected]>
AuthorDate: Fri Mar 20 15:54:06 2026 +0800
[fix](set) fix coredump of set op if total data size exceeds 4G (#61471)
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
Root Cause Analysis
核心原因:SetSinkOperatorX::sink() 中 build_block
被多次覆盖,导致哈希表中的旧条目成为悬空引用。
问题链路
1. build_block 被覆盖
在 set_sink_operator.cpp:52-56:
if (eos || local_state._mutable_block.allocated_bytes() >=
BUILD_BLOCK_MAX_SIZE) { // 4GB
build_block = local_state._mutable_block.to_block(); // 覆盖 build_block!
RETURN_IF_ERROR(_process_build_block(local_state, build_block, state));
local_state._mutable_block.clear();
}
当数据总量超过 BUILD_BLOCK_MAX_SIZE(4GB)时,这个 flush 会触发多次:
- 第一次 flush(allocated_bytes >= 4GB时):build_block = batch1(假设包含 rows
0..N1),哈希表存入 row_num = 0, 1, ..., N1
- 第二次 flush(eos 时):build_block = batch2(新数据,rows 0..N2),batch1
的数据被销毁。哈希表新增 row_num = 0, 1, ..., N2
2. 哈希表只存 row_num,不存 block 引用
RowRefListWithFlags 继承自 RowRef,只存储 uint32_t row_num(join_op.h:46),没有
block
指针或 offset。
在 hash_table_set_build.h:39,构建时存入的是:Mapped {k},即行号 k。
3. 输出阶段使用单一 build_block
在 set_source_operator.cpp:161-162:
auto& column = *build_block.get_by_position(idx->second).column;
local_state._mutable_cols[idx->first]->insert_from(column, it->row_num);
此时 build_block 是最后一次 flush 的 batch2。但哈希表中来自 batch1 的条目的 row_num
可能超出 batch2 的行数范围。
4. 越界访问导致 SIGSEGV
当 batch1 的 row_num = X(X > batch2 的行数)被用于 insert_from(column, X) 时:
// column_string.h:180-197
const size_t size_to_append = src.offsets[X] - src.offsets[X - 1]; //
越界读取 → 垃圾值
const size_t offset = src.offsets[X - 1]; // 垃圾值
// ...
memcpy(..., &src.chars[offset], size_to_append); // 垃圾 offset → 访问未映射内存
→
SIGSEGV
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/exec/operator/set_sink_operator.cpp | 36 +++---
be/src/exec/operator/set_source_operator.cpp | 4 +-
be/test/exec/operator/set_operator_test.cpp | 137 +++++++++++++++++++++
.../query_p2/test_set_operation_large_string.out | 14 +++
.../test_set_operation_large_string.groovy | 102 +++++++++++++++
5 files changed, 277 insertions(+), 16 deletions(-)
diff --git a/be/src/exec/operator/set_sink_operator.cpp
b/be/src/exec/operator/set_sink_operator.cpp
index c26265f652d..11e1cb61600 100644
--- a/be/src/exec/operator/set_sink_operator.cpp
+++ b/be/src/exec/operator/set_sink_operator.cpp
@@ -63,7 +63,6 @@ Status SetSinkLocalState<is_intersect>::close(RuntimeState*
state, Status exec_s
template <bool is_intersect>
Status SetSinkOperatorX<is_intersect>::sink(RuntimeState* state, Block*
in_block, bool eos) {
- constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
RETURN_IF_CANCELLED(state);
auto& local_state = get_local_state(state);
@@ -74,9 +73,14 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState*
state, Block* in_block
auto& valid_element_in_hash_tbl =
local_state._shared_state->valid_element_in_hash_tbl;
if (in_block->rows() != 0) {
+ if (local_state._mutable_block.empty()) {
+ auto tmp_build_block = *(in_block->create_same_struct_block(0,
false));
+ local_state._mutable_block =
MutableBlock::build_mutable_block(&tmp_build_block);
+ }
+
{
SCOPED_TIMER(local_state._merge_block_timer);
- RETURN_IF_ERROR(local_state._mutable_block.merge(*in_block));
+
RETURN_IF_ERROR(local_state._mutable_block.merge_ignore_overflow(std::move(*in_block)));
}
if (local_state._mutable_block.rows() >
std::numeric_limits<uint32_t>::max()) {
return Status::NotSupported("set operator do not support build
table rows over:" +
@@ -84,26 +88,24 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState*
state, Block* in_block
}
}
- if (eos || local_state._mutable_block.allocated_bytes() >=
BUILD_BLOCK_MAX_SIZE) {
+ if (eos) {
SCOPED_TIMER(local_state._build_timer);
build_block = local_state._mutable_block.to_block();
RETURN_IF_ERROR(_process_build_block(local_state, build_block, state));
local_state._mutable_block.clear();
- if (eos) {
- uint64_t hash_table_size =
local_state._shared_state->get_hash_table_size();
- valid_element_in_hash_tbl = is_intersect ? 0 : hash_table_size;
+ uint64_t hash_table_size =
local_state._shared_state->get_hash_table_size();
+ valid_element_in_hash_tbl = is_intersect ? 0 : hash_table_size;
- // record hash table
- COUNTER_SET(local_state._hash_table_size,
(int64_t)hash_table_size);
- COUNTER_SET(local_state._valid_element_in_hash_table,
valid_element_in_hash_tbl);
+ // record hash table
+ COUNTER_SET(local_state._hash_table_size, (int64_t)hash_table_size);
+ COUNTER_SET(local_state._valid_element_in_hash_table,
valid_element_in_hash_tbl);
-
local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1]
- ->set_ready();
- DCHECK_GT(_child_quantity, 1);
-
RETURN_IF_ERROR(local_state._runtime_filter_producer_helper->send_filter_size(
- state, hash_table_size, local_state._finish_dependency));
- }
+
local_state._shared_state->probe_finished_children_dependency[_cur_child_id + 1]
+ ->set_ready();
+ DCHECK_GT(_child_quantity, 1);
+
RETURN_IF_ERROR(local_state._runtime_filter_producer_helper->send_filter_size(
+ state, hash_table_size, local_state._finish_dependency));
}
return Status::OK();
}
@@ -117,6 +119,10 @@ Status
SetSinkOperatorX<is_intersect>::_process_build_block(
}
materialize_block_inplace(block);
+ // Dispose the overflow of ColumnString
+ for (auto& data : block) {
+ data.column =
std::move(*data.column).mutate()->convert_column_if_overflow();
+ }
ColumnRawPtrs raw_ptrs(_child_exprs.size());
RETURN_IF_ERROR(_extract_build_column(local_state, block, raw_ptrs, rows));
auto st = Status::OK();
diff --git a/be/src/exec/operator/set_source_operator.cpp
b/be/src/exec/operator/set_source_operator.cpp
index 55defd9dacc..75f28b4127b 100644
--- a/be/src/exec/operator/set_source_operator.cpp
+++ b/be/src/exec/operator/set_source_operator.cpp
@@ -187,7 +187,9 @@ void
SetSourceLocalState<is_intersect>::_add_result_columns() {
for (auto& idx : build_col_idx) {
const auto& column = *build_block.get_by_position(idx.second).column;
- column.append_data_by_selector(_mutable_cols[idx.first],
_result_indexs);
+ // use insert_indices_from to support ColumnString64
+ _mutable_cols[idx.first]->insert_indices_from(column,
_result_indexs.data(),
+
&_result_indexs[_result_indexs.size()]);
}
}
template class SetSourceLocalState<true>;
diff --git a/be/test/exec/operator/set_operator_test.cpp
b/be/test/exec/operator/set_operator_test.cpp
index d85e7a04c06..620d13583d7 100644
--- a/be/test/exec/operator/set_operator_test.cpp
+++ b/be/test/exec/operator/set_operator_test.cpp
@@ -384,6 +384,143 @@ TEST_F(ExceptOperatorTest, test_output_null_batsh_size) {
}
}
+TEST_F(IntersectOperatorTest, test_sink_large_string_data_over_4g) {
+ // Test that SetSinkOperatorX can handle string data exceeding 4GB total
size.
+ // This exercises the convert_column_if_overflow path in
_process_build_block.
+ init_op(2, {std::make_shared<DataTypeString>()});
+
+ sink_op->_child_exprs =
+ MockSlotRef::create_mock_contexts(DataTypes
{std::make_shared<DataTypeString>()});
+ probe_sink_ops[0]->_child_exprs =
+ MockSlotRef::create_mock_contexts(DataTypes
{std::make_shared<DataTypeString>()});
+
+ init_local_state();
+
+ // Create a large string (~1MB each) and insert enough rows to exceed 4GB
total.
+ // We need total string data > 4GB to trigger ColumnString offset overflow
+ // and exercise the convert_column_if_overflow path in
_process_build_block.
+ const size_t large_str_size = 1 * 1024 * 1024; // 1MB per string
+ const size_t num_rows = 4200; // ~4.1GB total
+ std::string large_str(large_str_size, 'x');
+
+ auto string_type = std::make_shared<DataTypeString>();
+
+ // Build a block with large strings and sink in batches (non-eos), then
send eos.
+ const size_t rows_per_batch = 500;
+ for (size_t batch_start = 0; batch_start < num_rows; batch_start +=
rows_per_batch) {
+ size_t current_batch_size = std::min(rows_per_batch, num_rows -
batch_start);
+
+ auto col = string_type->create_column();
+ for (size_t i = 0; i < current_batch_size; i++) {
+ // Make each string slightly different to avoid dedup in hash
table.
+ // Modify large_str in-place, insert, then restore to avoid
copying 1MB per row.
+ auto suffix = std::to_string(batch_start + i);
+ // Save original bytes
+ char saved[32];
+ std::memcpy(saved, large_str.data(), suffix.size());
+ // Stamp the suffix
+ std::memcpy(large_str.data(), suffix.data(), suffix.size());
+ col->insert_data(large_str.data(), large_str.size());
+ // Restore original bytes
+ std::memcpy(large_str.data(), saved, suffix.size());
+ }
+
+ Block block;
+ block.insert({std::move(col), string_type, "col0"});
+
+ bool is_last = (batch_start + rows_per_batch >= num_rows);
+ auto st = sink_op->sink(state.get(), &block, is_last);
+ EXPECT_TRUE(st.ok()) << st.to_string();
+ }
+
+ // Verify hash table was built successfully
+ EXPECT_EQ(shared_state->get_hash_table_size(), num_rows);
+
+ // Now probe with a small subset to verify correctness
+ {
+ auto col = string_type->create_column();
+ // Insert string matching row 0
+ auto suffix = std::to_string(0);
+ char saved[32];
+ std::memcpy(saved, large_str.data(), suffix.size());
+ std::memcpy(large_str.data(), suffix.data(), suffix.size());
+ col->insert_data(large_str.data(), large_str.size());
+ std::memcpy(large_str.data(), saved, suffix.size());
+
+ Block block;
+ block.insert({std::move(col), string_type, "col0"});
+ EXPECT_TRUE(probe_sink_ops[0]->sink(states[0].get(), &block, true));
+ }
+
+ // Read from source - for INTERSECT, should get the one matching row
+ {
+ Block block;
+ bool eos = false;
+ EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos));
+ EXPECT_EQ(block.rows(), 1);
+ }
+}
+
+TEST_F(ExceptOperatorTest, test_sink_large_string_data_over_4g) {
+ // Test that SetSinkOperatorX (EXCEPT) can handle string data exceeding
4GB total size.
+ init_op(2, {std::make_shared<DataTypeString>()});
+
+ sink_op->_child_exprs =
+ MockSlotRef::create_mock_contexts(DataTypes
{std::make_shared<DataTypeString>()});
+ probe_sink_ops[0]->_child_exprs =
+ MockSlotRef::create_mock_contexts(DataTypes
{std::make_shared<DataTypeString>()});
+
+ init_local_state();
+
+ auto string_type = std::make_shared<DataTypeString>();
+ const size_t large_str_size = 1 * 1024 * 1024; // 1MB per string
+ const size_t num_rows = 4200; // ~4.1GB total
+ std::string large_str(large_str_size, 'y');
+
+ const size_t rows_per_batch = 100;
+ for (size_t batch_start = 0; batch_start < num_rows; batch_start +=
rows_per_batch) {
+ size_t current_batch_size = std::min(rows_per_batch, num_rows -
batch_start);
+
+ auto col = string_type->create_column();
+ for (size_t i = 0; i < current_batch_size; i++) {
+ auto suffix = std::to_string(batch_start + i);
+ char saved[32];
+ std::memcpy(saved, large_str.data(), suffix.size());
+ std::memcpy(large_str.data(), suffix.data(), suffix.size());
+ col->insert_data(large_str.data(), large_str.size());
+ std::memcpy(large_str.data(), saved, suffix.size());
+ }
+
+ Block block;
+ block.insert({std::move(col), string_type, "col0"});
+
+ bool is_last = (batch_start + rows_per_batch >= num_rows);
+ auto st = sink_op->sink(state.get(), &block, is_last);
+ EXPECT_TRUE(st.ok()) << st.to_string();
+ }
+
+ EXPECT_EQ(shared_state->get_hash_table_size(), num_rows);
+
+ // Probe with empty block - EXCEPT should return all rows
+ {
+ Block block;
+ block.insert({string_type->create_column(), string_type, "col0"});
+ EXPECT_TRUE(probe_sink_ops[0]->sink(states[0].get(), &block, true));
+ }
+
+ // Read from source - for EXCEPT with empty probe, should get all build
rows
+ {
+ size_t total_rows = 0;
+ bool eos = false;
+ while (!eos) {
+ Block block;
+ EXPECT_TRUE(source_op->get_block(state.get(), &block, &eos));
+ total_rows += block.rows();
+ }
+ EXPECT_EQ(total_rows, num_rows);
+ }
+}
+
TEST_F(IntersectOperatorTest, test_extract_probe_column) {
init_op(2,
{std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>()),
std::make_shared<DataTypeNullable>(std::make_shared<DataTypeInt64>())});
diff --git a/regression-test/data/query_p2/test_set_operation_large_string.out
b/regression-test/data/query_p2/test_set_operation_large_string.out
new file mode 100644
index 00000000000..c744ed46620
--- /dev/null
+++ b/regression-test/data/query_p2/test_set_operation_large_string.out
@@ -0,0 +1,14 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !except_subset --
+4208xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+4209xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+
+-- !except_self --
+
+-- !intersect_subset --
+0xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+1xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
+
+-- !intersect_self --
+4210
+
diff --git
a/regression-test/suites/query_p2/test_set_operation_large_string.groovy
b/regression-test/suites/query_p2/test_set_operation_large_string.groovy
new file mode 100644
index 00000000000..adde9418ebe
--- /dev/null
+++ b/regression-test/suites/query_p2/test_set_operation_large_string.groovy
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Test set operations (EXCEPT/INTERSECT) with large string data that exceeds
4GB total.
+// This exercises the convert_column_if_overflow path in
SetSinkOperatorX::_process_build_block.
+suite("test_set_operation_large_string") {
+ def totalRows = 4210
+ sql """ DROP TABLE IF EXISTS test_set_op_large_string """
+ sql """
+ CREATE TABLE test_set_op_large_string (
+ id INT NULL,
+ large_val STRING NULL
+ )
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ // Each row has a ~1MB string. Insert 4200 rows to exceed 4GB total string
data.
+ // Use repeat() to generate large strings efficiently.
+ // Insert in batches to avoid timeout.
+ def batchSize = 500
+ def strSizePerRow = 1048576 // 1MB
+
+ // repeat('x', 1048576) produces a 1MB string per row.
+ // We concat the id to make each row unique.
+ for (int batchStart = 0; batchStart < totalRows; batchStart += batchSize) {
+ def batchEnd = Math.min(batchStart + batchSize, totalRows)
+ sql """
+ INSERT INTO test_set_op_large_string
+ SELECT number, concat(cast(number as string), repeat('x',
${strSizePerRow}))
+ FROM numbers("number" = "${batchEnd}")
+ WHERE number >= ${batchStart};
+ """
+ }
+
+ def rowCount = sql """ SELECT count(*) FROM test_set_op_large_string """
+ log.info("Inserted rows: ${rowCount}")
+ assert rowCount[0][0] == totalRows
+
+ sql " set parallel_pipeline_task_num = 1;"
+ sql " set enable_profile = true;"
+ sql " set batch_size = 128;"
+ sql " set runtime_filter_mode = off;"
+ sql """ set disable_nereids_rules = "INFER_SET_OPERATOR_DISTINCT";"""
+ // Test EXCEPT: all rows from test_set_op_large_string except a small
subset should return most rows
+ qt_except_subset """
+ SELECT substring(large_val, 1, 100) FROM (
+ SELECT large_val FROM test_set_op_large_string
+ EXCEPT
+ SELECT large_val FROM test_set_op_large_string WHERE id < 4208
+ ) t
+ order by 1;
+ """
+
+ // Test EXCEPT: table minus itself should be empty
+ qt_except_self """
+ SELECT * FROM (
+ SELECT large_val FROM test_set_op_large_string
+ EXCEPT
+ SELECT large_val FROM test_set_op_large_string
+ ) t
+ order by 1;
+ """
+
+ // Test INTERSECT: intersection with a small subset should return the
subset
+ qt_intersect_subset """
+ SELECT substring(large_val, 1, 100) FROM (
+ SELECT large_val FROM test_set_op_large_string
+ INTERSECT
+ SELECT large_val FROM test_set_op_large_string WHERE id < 2
+ ) t
+ order by 1;
+ """
+
+ // Test INTERSECT: table with itself should return all rows
+ qt_intersect_self """
+ SELECT count(*) FROM (
+ SELECT large_val FROM test_set_op_large_string
+ INTERSECT
+ SELECT large_val FROM test_set_op_large_string
+ ) t
+ order by 1;
+ """
+
+ sql """ DROP TABLE IF EXISTS test_set_op_large_string """
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]