This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 7d423b3a6a2 [chery-pick](branch-2.1) Pick "[Fix](group commit) Fix
group commit block queue mem estimate fault" (#37379)
7d423b3a6a2 is described below
commit 7d423b3a6a2ffaba18584f8400795ef56838114f
Author: abmdocrt <[email protected]>
AuthorDate: Sun Jul 7 18:27:49 2024 +0800
[chery-pick](branch-2.1) Pick "[Fix](group commit) Fix group commit block
queue mem estimate fault" (#37379)
Pick [Fix](group commit) Fix group commit block queue mem estimate faule
#35314
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
**Problem:** When `group commit=async_mode` and NULL data is imported
into a `variant` type column, it causes incorrect memory statistics for
group commit backpressure, leading to a stuck issue. **Cause:** In group
commit mode, blocks are first added to a queue in batches using `add
block`, and then blocks are retrieved from the queue using `get block`.
To track memory usage during backpressure, we add the block size to the
memory statistics during `add block` and subtract the block size from
the memory statistics during `get block`. However, for `variant` types,
during the `add block` write to WAL, serialization occurs, which can
merge types (e.g., merging `int` and `bigint` into `bigint`), thereby
changing the block size. This results in a discrepancy between the block
size during `get block` and `add block`, causing memory statistics to
overflow.
**Solution:** Record the block size at the time of `add block` and use
this recorded size during `get block` instead of the actual block size.
This ensures consistency in the memory addition and subtraction.
## Further comments
If this is a relatively large or complex change, kick off the discussion
at [[email protected]](mailto:[email protected]) by explaining why
you chose the solution you did and what alternatives you considered,
etc...
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---
be/src/runtime/group_commit_mgr.cpp | 66 +++++++++++++++++++---
be/src/runtime/group_commit_mgr.h | 9 ++-
be/src/vec/core/block.cpp | 19 +++++++
be/src/vec/core/block.h | 2 +
.../insert_p0/test_group_commit_variant.groovy | 43 ++++++++++++++
5 files changed, 131 insertions(+), 8 deletions(-)
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 38e599180e7..d5daf2af530 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -62,9 +62,27 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
RETURN_IF_ERROR(status);
if (block->rows() > 0) {
if (!config::group_commit_wait_replay_wal_finish) {
- _block_queue.push_back(block);
+ _block_queue.emplace_back(block);
_data_bytes += block->bytes();
+ int before_block_queues_bytes = _all_block_queues_bytes->load();
_all_block_queues_bytes->fetch_add(block->bytes(),
std::memory_order_relaxed);
+ std::stringstream ss;
+ ss << "[";
+ for (const auto& id : _load_ids) {
+ ss << id.to_string() << ", ";
+ }
+ ss << "]";
+ VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::add_block). "
+ << "block queue size is " << _block_queue.size() << ",
block rows is "
+ << block->rows() << ", block bytes is " <<
block->bytes()
+ << ", before add block, all block queues bytes is "
+ << before_block_queues_bytes
+ << ", after add block, all block queues bytes is "
+ << _all_block_queues_bytes->load() << ", txn_id=" <<
txn_id
+ << ", label=" << label << ", instance_id=" <<
load_instance_id
+ << ", load_ids=" << ss.str() << ", runtime_state=" <<
runtime_state
+ << ", the block is " << block->dump_data() << ", the
block column size is "
+ << block->columns_bytes();
}
if (write_wal || config::group_commit_wait_replay_wal_finish) {
auto st = _v_wal_writer->write_wal(block.get());
@@ -132,11 +150,29 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, vectorized::Block*
return st;
}
if (!_block_queue.empty()) {
- auto fblock = _block_queue.front();
- block->swap(*fblock.get());
+ const BlockData block_data = _block_queue.front();
+ block->swap(*block_data.block);
*find_block = true;
_block_queue.pop_front();
- _all_block_queues_bytes->fetch_sub(block->bytes(),
std::memory_order_relaxed);
+ int before_block_queues_bytes = _all_block_queues_bytes->load();
+ _all_block_queues_bytes->fetch_sub(block_data.block_bytes,
std::memory_order_relaxed);
+ std::stringstream ss;
+ ss << "[";
+ for (const auto& id : _load_ids) {
+ ss << id.to_string() << ", ";
+ }
+ ss << "]";
+ VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::get_block). "
+ << "block queue size is " << _block_queue.size() << ",
block rows is "
+ << block->rows() << ", block bytes is " << block->bytes()
+ << ", before remove block, all block queues bytes is "
+ << before_block_queues_bytes
+ << ", after remove block, all block queues bytes is "
+ << _all_block_queues_bytes->load() << ", txn_id=" << txn_id
+ << ", label=" << label << ", instance_id=" <<
load_instance_id
+ << ", load_ids=" << ss.str() << ", runtime_state=" <<
runtime_state
+ << ", the block is " << block->dump_data() << ", the block
column size is "
+ << block->columns_bytes();
}
if (_block_queue.empty() && _need_commit && _load_ids.empty()) {
*eos = true;
@@ -176,10 +212,26 @@ void LoadBlockQueue::_cancel_without_lock(const Status&
st) {
<< ", status=" << st.to_string();
status = st;
while (!_block_queue.empty()) {
- {
- auto& future_block = _block_queue.front();
- _all_block_queues_bytes->fetch_sub(future_block->bytes(),
std::memory_order_relaxed);
+ const BlockData& block_data = _block_queue.front().block;
+ int before_block_queues_bytes = _all_block_queues_bytes->load();
+ _all_block_queues_bytes->fetch_sub(block_data.block_bytes,
std::memory_order_relaxed);
+ std::stringstream ss;
+ ss << "[";
+ for (const auto& id : _load_ids) {
+ ss << id.to_string() << ", ";
}
+ ss << "]";
+ VLOG_DEBUG << "[Group Commit Debug]
(LoadBlockQueue::_cancel_without_block). "
+ << "block queue size is " << _block_queue.size() << ",
block rows is "
+ << block_data.block->rows() << ", block bytes is " <<
block_data.block->bytes()
+ << ", before remove block, all block queues bytes is "
+ << before_block_queues_bytes
+ << ", after remove block, all block queues bytes is "
+ << _all_block_queues_bytes->load() << ", txn_id=" << txn_id
+ << ", label=" << label << ", instance_id=" <<
load_instance_id
+ << ", load_ids=" << ss.str() << ", the block is "
+ << block_data.block->dump_data() << ", the block column
size is "
+ << block_data.block->columns_bytes();
_block_queue.pop_front();
}
}
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index 76a890f7a8c..c41f6abd6fe 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -41,6 +41,13 @@ class ExecEnv;
class TUniqueId;
class RuntimeState;
+struct BlockData {
+ BlockData(const std::shared_ptr<vectorized::Block>& block)
+ : block(block), block_bytes(block->bytes()) {};
+ std::shared_ptr<vectorized::Block> block;
+ size_t block_bytes;
+};
+
class LoadBlockQueue {
public:
LoadBlockQueue(const UniqueId& load_instance_id, std::string& label,
int64_t txn_id,
@@ -94,7 +101,7 @@ private:
// the set of load ids of all blocks in this queue
std::set<UniqueId> _load_ids;
- std::list<std::shared_ptr<vectorized::Block>> _block_queue;
+ std::list<BlockData> _block_queue;
// wal
std::string _wal_base_path;
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index dd1a659ae15..d26d219e5d6 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -417,6 +417,25 @@ size_t Block::bytes() const {
return res;
}
+std::string Block::columns_bytes() const {
+ std::stringstream res;
+ res << "column bytes: [";
+ for (const auto& elem : data) {
+ if (!elem.column) {
+ std::stringstream ss;
+ for (const auto& e : data) {
+ ss << e.name + " ";
+ }
+ throw Exception(ErrorCode::INTERNAL_ERROR,
+ "Column {} in block is nullptr, in method bytes.
All Columns are {}",
+ elem.name, ss.str());
+ }
+ res << ", " << elem.column->byte_size();
+ }
+ res << "]";
+ return res.str();
+}
+
size_t Block::allocated_bytes() const {
size_t res = 0;
for (const auto& elem : data) {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index c9b3f2d5b5e..593d37f7ff2 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -202,6 +202,8 @@ public:
/// Approximate number of bytes in memory - for profiling and limits.
size_t bytes() const;
+ std::string columns_bytes() const;
+
/// Approximate number of allocated bytes in memory - for profiling and
limits.
size_t allocated_bytes() const;
diff --git a/regression-test/suites/insert_p0/test_group_commit_variant.groovy
b/regression-test/suites/insert_p0/test_group_commit_variant.groovy
new file mode 100644
index 00000000000..15d8304bd4f
--- /dev/null
+++ b/regression-test/suites/insert_p0/test_group_commit_variant.groovy
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_group_commit_variant") {
+
+ sql "set group_commit=async_mode"
+
+ def testTable = "test_group_commit_variant"
+
+ sql """
+ CREATE TABLE IF NOT EXISTS ${testTable} (
+ k bigint,
+ var variant
+ )
+ UNIQUE KEY(`k`)
+ DISTRIBUTED BY HASH (`k`) BUCKETS 5
+ properties("replication_num" = "1",
+ "disable_auto_compaction" = "false");
+ """
+
+ try {
+ sql "insert into ${testTable} (k) values (1),(2);"
+ sql "insert into ${testTable} (k) values (3),(4);"
+ } catch (Exception e) {
+ // should not throw exception
+ logger.info(e.getMessage())
+ assertTrue(False)
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]