This is an automated email from the ASF dual-hosted git repository.
liutang123 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 63e90d34e4b [fix](compaction) Fix incorrect memory availability check
in RowSourceBuffer during vertical compaction (#63152)
63e90d34e4b is described below
commit 63e90d34e4bd076e3552262a4943981e06dc1bec
Author: Lijia Liu <[email protected]>
AuthorDate: Thu May 14 09:30:59 2026 +0800
[fix](compaction) Fix incorrect memory availability check in
RowSourceBuffer during vertical compaction (#63152)
Exception Log:
```
thread_mem_tracker_mgr.h:248] alloc large memory: 4294967296, not in query
or load, this is just a warning, not prevent memory alloc, stacktrace:
0# doris::ThreadMemTrackerMgr::consume(long, int)
1# Allocator<false, false, false,
DefaultMemoryAllocator>::realloc_impl(void*, unsigned long, unsigned long,
unsigned long)
2# void doris::vectorized::PODArrayBase<2ul, 4096ul,
Allocator<false, false, false, DefaultMemoryAllocator>, 16ul,
15ul>::reserve_for_next_size<>()
3#
doris::vectorized::RowSourcesBuffer::append(std::vector<doris::vectorized::RowSource,
std::allocator<doris::vectorized::RowSource> > const&)
4#
doris::vectorized::VerticalHeapMergeIterator::next_batch(doris::vectorized::Block*)
5#
doris::vectorized::VerticalBlockReader::_direct_next_block(doris::vectorized::Block*,
bool*)
6#
doris::vectorized::VerticalBlockReader::next_block_with_aggregation(doris::vectorized::Block*,
bool*)
7#
doris::Merger::vertical_compact_one_group(std::shared_ptr<doris::BaseTablet>,
doris::ReaderType, doris::TabletSchema const&, bool, std::vector<unsigned int,
std::allocator<unsigned int> > const&, doris::vectorized::RowSourcesBuffer*,
std::vector<std::shared_ptr<doris::RowsetReader>,
std::allocator<std::shared_ptr<doris::RowsetReader> > > const&,
doris::RowsetWriter*, long, doris::Merger::Statistics*, std::vector<unsigned
int, std::allocator<unsigned int> >, long, doris::Co [...]
8#
doris::Merger::vertical_merge_rowsets(std::shared_ptr<doris::BaseTablet>,
doris::ReaderType, doris::TabletSchema const&,
std::vector<std::shared_ptr<doris::RowsetReader>,
std::allocator<std::shared_ptr<doris::RowsetReader> > > const&,
doris::RowsetWriter*, long, long, doris::Merger::Statistics*)
9# doris::Compaction::merge_input_rowsets()
10# doris::CloudCompactionMixin::execute_compact_impl(long)
11# doris::CloudCompactionMixin::execute_compact()
12# doris::CloudCumulativeCompaction::execute_compact()
13# std::_Function_handler<void (),
doris::CloudStorageEngine::_submit_cumulative_compaction_task(std::shared_ptr<doris::CloudTablet>
const&)::$_2>::_M_invoke(std::_Any_data const&)
14# doris::ThreadPool::dispatch_thread()
15# doris::Thread::supervise_thread(void*)
16# ?
17# ?
```
Reason: PaddedPODArray's `allocated_bytes` includes pad_left and
pad_right, which are NOT usable for storing elements.
Co-authored-by: liutang123 <[email protected]>
---
.../storage/iterator/vertical_merge_iterator.cpp | 10 ++-
.../compaction/vertical_compaction_test.cpp | 77 ++++++++++++++++++++++
2 files changed, 85 insertions(+), 2 deletions(-)
diff --git a/be/src/storage/iterator/vertical_merge_iterator.cpp
b/be/src/storage/iterator/vertical_merge_iterator.cpp
index a340adcd3f7..9aedae0b1be 100644
--- a/be/src/storage/iterator/vertical_merge_iterator.cpp
+++ b/be/src/storage/iterator/vertical_merge_iterator.cpp
@@ -69,8 +69,14 @@ uint16_t RowSource::data() const {
Status RowSourcesBuffer::append(const std::vector<RowSource>& row_sources) {
if (_buffer.allocated_bytes() + row_sources.size() * sizeof(UInt16) >
config::vertical_compaction_max_row_source_memory_mb * 1024 * 1024) {
- if (_buffer.allocated_bytes() - _buffer.size() * sizeof(UInt16) <
- row_sources.size() * sizeof(UInt16)) {
+ // Use capacity() - size() to get the truly available element slots.
+ // Note: PODArrayBase::allocated_bytes() includes pad_left and
pad_right,
+ // which are NOT usable for storing elements. Using allocated_bytes()
here
+ // would over-estimate the available space and lead to a missed spill,
+ // causing _buffer to grow beyond the configured memory limit when
+ // push_back triggers reallocation below.
+ size_t available_slots = _buffer.capacity() - _buffer.size();
+ if (available_slots < row_sources.size()) {
VLOG_NOTICE << "RowSourceBuffer is too large, serialize and reset
buffer: "
<< _buffer.allocated_bytes() << ", total size: " <<
_total_size;
// serialize current buffer
diff --git a/be/test/storage/compaction/vertical_compaction_test.cpp
b/be/test/storage/compaction/vertical_compaction_test.cpp
index 3b736857242..9623f230dbb 100644
--- a/be/test/storage/compaction/vertical_compaction_test.cpp
+++ b/be/test/storage/compaction/vertical_compaction_test.cpp
@@ -435,6 +435,83 @@ TEST_F(VerticalCompactionTest, TestRowSourcesBuffer) {
}
}
+// Regression test for RowSourcesBuffer::append spill threshold.
+//
+// Background:
+// PaddedPODArray::allocated_bytes() returns the total allocated memory which
+// INCLUDES pad_left and pad_right. These padding bytes are NOT usable for
+// storing elements. Earlier, append() used `allocated_bytes() - size*sizeof`
+// as "available room" to decide whether to skip spilling. This over-estimates
+// the truly usable space (by pad_left + pad_right bytes), so when the buffer
+// has already crossed the configured memory limit, append() may incorrectly
+// decide that the upcoming push_back will fit without reallocation, skip the
+// spill, and then push_back triggers a reallocation that doubles the buffer,
+// exceeding the configured `vertical_compaction_max_row_source_memory_mb`.
+//
+// This test simulates the case by setting a very small memory limit (1 MB) and
+// repeatedly appending row sources. After the first time the buffer crosses
+// the limit, the next append must trigger a spill (file write + reset) instead
+// of silently growing the in-memory buffer beyond the limit.
+TEST_F(VerticalCompactionTest, TestRowSourcesBufferSpillThreshold) {
+ // 1 MB limit (set in SetUp as well, but make it explicit here).
+ config::vertical_compaction_max_row_source_memory_mb = 1;
+ const size_t mem_limit_bytes =
+
static_cast<size_t>(config::vertical_compaction_max_row_source_memory_mb) *
1024 * 1024;
+
+ RowSourcesBuffer buffer(200, absolute_dir,
ReaderType::READER_CUMULATIVE_COMPACTION);
+
+ // Build a batch of row sources. Use a moderate batch size so that the
+ // buffer's allocated_bytes() can become very close to the limit before
+ // a single append crosses it.
+ constexpr size_t kBatchSize = 4096;
+ std::vector<RowSource> batch;
+ batch.reserve(kBatchSize);
+ for (size_t i = 0; i < kBatchSize; ++i) {
+ batch.emplace_back(static_cast<uint16_t>(i % 8), false);
+ }
+
+ // Total elements that fit in the memory limit (a safe upper bound).
+ // Each element is 2 bytes (UInt16), so ~512K elements per MB.
+ const size_t total_appends = (mem_limit_bytes / sizeof(uint16_t)) * 4 /
kBatchSize + 8;
+
+ size_t expected_total = 0;
+ for (size_t i = 0; i < total_appends; ++i) {
+ ASSERT_TRUE(buffer.append(batch).ok());
+ expected_total += kBatchSize;
+
+ // Invariant: in-memory buffered_size() must never exceed what the
+ // memory limit allows (in elements). Otherwise the spill logic is
+ // broken (the bug described above).
+ // Allow a small slack equal to one batch because the spill check is
+ // performed BEFORE the push_back that crosses the threshold.
+ size_t buffered_elems = buffer.buffered_size();
+ size_t buffered_bytes = buffered_elems * sizeof(uint16_t);
+ // After each append, buffered_bytes should be <= mem_limit + one
batch size.
+ // It must NOT grow unboundedly (e.g., 2x of the limit due to PODArray
+ // reallocation that the buggy version would allow).
+ EXPECT_LE(buffered_bytes, mem_limit_bytes + kBatchSize *
sizeof(uint16_t))
+ << "RowSourcesBuffer in-memory size exceeded the configured
limit, "
+ << "spill threshold logic is broken. iter=" << i
+ << ", buffered_elems=" << buffered_elems;
+ }
+
+ EXPECT_EQ(buffer.total_size(), expected_total);
+
+ // Make sure data is persisted and can be read back correctly.
+ ASSERT_TRUE(buffer.flush().ok());
+ ASSERT_TRUE(buffer.seek_to_begin().ok());
+
+ size_t read_back = 0;
+ while (buffer.has_remaining().ok()) {
+ // Verify that the source num matches the pattern we wrote.
+ auto cur = buffer.current().get_source_num();
+ EXPECT_EQ(cur, (read_back % kBatchSize) % 8);
+ buffer.advance(1);
+ ++read_back;
+ }
+ EXPECT_EQ(read_back, expected_total);
+}
+
TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) {
auto num_input_rowset = 2;
auto num_segments = 2;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]