This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 7d423b3a6a2 [chery-pick](branch-2.1) Pick "[Fix](group commit) Fix 
group commit block queue mem estimate fault" (#37379)
7d423b3a6a2 is described below

commit 7d423b3a6a2ffaba18584f8400795ef56838114f
Author: abmdocrt <[email protected]>
AuthorDate: Sun Jul 7 18:27:49 2024 +0800

    [chery-pick](branch-2.1) Pick "[Fix](group commit) Fix group commit block 
queue mem estimate fault" (#37379)
    
    Pick [Fix](group commit) Fix group commit block queue mem estimate faule
    #35314
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    **Problem:** When `group commit=async_mode` and NULL data is imported
    into a `variant` type column, it causes incorrect memory statistics for
    group commit backpressure, leading to a stuck issue. **Cause:** In group
    commit mode, blocks are first added to a queue in batches using `add
    block`, and then blocks are retrieved from the queue using `get block`.
    To track memory usage during backpressure, we add the block size to the
    memory statistics during `add block` and subtract the block size from
    the memory statistics during `get block`. However, for `variant` types,
    during the `add block` write to WAL, serialization occurs, which can
    merge types (e.g., merging `int` and `bigint` into `bigint`), thereby
    changing the block size. This results in a discrepancy between the block
    size during `get block` and `add block`, causing memory statistics to
    overflow.
    **Solution:** Record the block size at the time of `add block` and use
    this recorded size during `get block` instead of the actual block size.
    This ensures consistency in the memory addition and subtraction.
    
    ## Further comments
    
    If this is a relatively large or complex change, kick off the discussion
    at [[email protected]](mailto:[email protected]) by explaining why
    you chose the solution you did and what alternatives you considered,
    etc...
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
---
 be/src/runtime/group_commit_mgr.cpp                | 66 +++++++++++++++++++---
 be/src/runtime/group_commit_mgr.h                  |  9 ++-
 be/src/vec/core/block.cpp                          | 19 +++++++
 be/src/vec/core/block.h                            |  2 +
 .../insert_p0/test_group_commit_variant.groovy     | 43 ++++++++++++++
 5 files changed, 131 insertions(+), 8 deletions(-)

diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 38e599180e7..d5daf2af530 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -62,9 +62,27 @@ Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
     RETURN_IF_ERROR(status);
     if (block->rows() > 0) {
         if (!config::group_commit_wait_replay_wal_finish) {
-            _block_queue.push_back(block);
+            _block_queue.emplace_back(block);
             _data_bytes += block->bytes();
+            int before_block_queues_bytes = _all_block_queues_bytes->load();
             _all_block_queues_bytes->fetch_add(block->bytes(), 
std::memory_order_relaxed);
+            std::stringstream ss;
+            ss << "[";
+            for (const auto& id : _load_ids) {
+                ss << id.to_string() << ", ";
+            }
+            ss << "]";
+            VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::add_block). "
+                       << "block queue size is " << _block_queue.size() << ", 
block rows is "
+                       << block->rows() << ", block bytes is " << 
block->bytes()
+                       << ", before add block, all block queues bytes is "
+                       << before_block_queues_bytes
+                       << ", after add block, all block queues bytes is "
+                       << _all_block_queues_bytes->load() << ", txn_id=" << 
txn_id
+                       << ", label=" << label << ", instance_id=" << 
load_instance_id
+                       << ", load_ids=" << ss.str() << ", runtime_state=" << 
runtime_state
+                       << ", the block is " << block->dump_data() << ", the 
block column size is "
+                       << block->columns_bytes();
         }
         if (write_wal || config::group_commit_wait_replay_wal_finish) {
             auto st = _v_wal_writer->write_wal(block.get());
@@ -132,11 +150,29 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, vectorized::Block*
         return st;
     }
     if (!_block_queue.empty()) {
-        auto fblock = _block_queue.front();
-        block->swap(*fblock.get());
+        const BlockData block_data = _block_queue.front();
+        block->swap(*block_data.block);
         *find_block = true;
         _block_queue.pop_front();
-        _all_block_queues_bytes->fetch_sub(block->bytes(), 
std::memory_order_relaxed);
+        int before_block_queues_bytes = _all_block_queues_bytes->load();
+        _all_block_queues_bytes->fetch_sub(block_data.block_bytes, 
std::memory_order_relaxed);
+        std::stringstream ss;
+        ss << "[";
+        for (const auto& id : _load_ids) {
+            ss << id.to_string() << ", ";
+        }
+        ss << "]";
+        VLOG_DEBUG << "[Group Commit Debug] (LoadBlockQueue::get_block). "
+                   << "block queue size is " << _block_queue.size() << ", 
block rows is "
+                   << block->rows() << ", block bytes is " << block->bytes()
+                   << ", before remove block, all block queues bytes is "
+                   << before_block_queues_bytes
+                   << ", after remove block, all block queues bytes is "
+                   << _all_block_queues_bytes->load() << ", txn_id=" << txn_id
+                   << ", label=" << label << ", instance_id=" << 
load_instance_id
+                   << ", load_ids=" << ss.str() << ", runtime_state=" << 
runtime_state
+                   << ", the block is " << block->dump_data() << ", the block 
column size is "
+                   << block->columns_bytes();
     }
     if (_block_queue.empty() && _need_commit && _load_ids.empty()) {
         *eos = true;
@@ -176,10 +212,26 @@ void LoadBlockQueue::_cancel_without_lock(const Status& 
st) {
               << ", status=" << st.to_string();
     status = st;
     while (!_block_queue.empty()) {
-        {
-            auto& future_block = _block_queue.front();
-            _all_block_queues_bytes->fetch_sub(future_block->bytes(), 
std::memory_order_relaxed);
+        const BlockData& block_data = _block_queue.front().block;
+        int before_block_queues_bytes = _all_block_queues_bytes->load();
+        _all_block_queues_bytes->fetch_sub(block_data.block_bytes, 
std::memory_order_relaxed);
+        std::stringstream ss;
+        ss << "[";
+        for (const auto& id : _load_ids) {
+            ss << id.to_string() << ", ";
         }
+        ss << "]";
+        VLOG_DEBUG << "[Group Commit Debug] 
(LoadBlockQueue::_cancel_without_block). "
+                   << "block queue size is " << _block_queue.size() << ", 
block rows is "
+                   << block_data.block->rows() << ", block bytes is " << 
block_data.block->bytes()
+                   << ", before remove block, all block queues bytes is "
+                   << before_block_queues_bytes
+                   << ", after remove block, all block queues bytes is "
+                   << _all_block_queues_bytes->load() << ", txn_id=" << txn_id
+                   << ", label=" << label << ", instance_id=" << 
load_instance_id
+                   << ", load_ids=" << ss.str() << ", the block is "
+                   << block_data.block->dump_data() << ", the block column 
size is "
+                   << block_data.block->columns_bytes();
         _block_queue.pop_front();
     }
 }
diff --git a/be/src/runtime/group_commit_mgr.h 
b/be/src/runtime/group_commit_mgr.h
index 76a890f7a8c..c41f6abd6fe 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -41,6 +41,13 @@ class ExecEnv;
 class TUniqueId;
 class RuntimeState;
 
+struct BlockData {
+    BlockData(const std::shared_ptr<vectorized::Block>& block)
+            : block(block), block_bytes(block->bytes()) {};
+    std::shared_ptr<vectorized::Block> block;
+    size_t block_bytes;
+};
+
 class LoadBlockQueue {
 public:
     LoadBlockQueue(const UniqueId& load_instance_id, std::string& label, 
int64_t txn_id,
@@ -94,7 +101,7 @@ private:
 
     // the set of load ids of all blocks in this queue
     std::set<UniqueId> _load_ids;
-    std::list<std::shared_ptr<vectorized::Block>> _block_queue;
+    std::list<BlockData> _block_queue;
 
     // wal
     std::string _wal_base_path;
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index dd1a659ae15..d26d219e5d6 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -417,6 +417,25 @@ size_t Block::bytes() const {
     return res;
 }
 
+std::string Block::columns_bytes() const {
+    std::stringstream res;
+    res << "column bytes: [";
+    for (const auto& elem : data) {
+        if (!elem.column) {
+            std::stringstream ss;
+            for (const auto& e : data) {
+                ss << e.name + " ";
+            }
+            throw Exception(ErrorCode::INTERNAL_ERROR,
+                            "Column {} in block is nullptr, in method bytes. 
All Columns are {}",
+                            elem.name, ss.str());
+        }
+        res << ", " << elem.column->byte_size();
+    }
+    res << "]";
+    return res.str();
+}
+
 size_t Block::allocated_bytes() const {
     size_t res = 0;
     for (const auto& elem : data) {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index c9b3f2d5b5e..593d37f7ff2 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -202,6 +202,8 @@ public:
     /// Approximate number of bytes in memory - for profiling and limits.
     size_t bytes() const;
 
+    std::string columns_bytes() const;
+
     /// Approximate number of allocated bytes in memory - for profiling and 
limits.
     size_t allocated_bytes() const;
 
diff --git a/regression-test/suites/insert_p0/test_group_commit_variant.groovy 
b/regression-test/suites/insert_p0/test_group_commit_variant.groovy
new file mode 100644
index 00000000000..15d8304bd4f
--- /dev/null
+++ b/regression-test/suites/insert_p0/test_group_commit_variant.groovy
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_group_commit_variant") {
+
+    sql "set group_commit=async_mode"
+
+    def testTable = "test_group_commit_variant"
+
+    sql """
+            CREATE TABLE IF NOT EXISTS ${testTable} (
+                k bigint,
+                var variant
+                )
+                UNIQUE KEY(`k`)
+                DISTRIBUTED BY HASH (`k`) BUCKETS 5
+                properties("replication_num" = "1",
+                "disable_auto_compaction" = "false");
+        """
+    
+    try {
+        sql "insert into ${testTable} (k) values (1),(2);"
+        sql "insert into ${testTable} (k) values (3),(4);"
+    } catch (Exception e) {
+        // should not throw exception
+        logger.info(e.getMessage())
+        assertTrue(False)
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to