This is an automated email from the ASF dual-hosted git repository.

mymeiyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b2c4b91f11 [fix](group commit) fix can not get a block queue (#63722)
7b2c4b91f11 is described below

commit 7b2c4b91f1172cfa49e14db431866eaec2fb3d90
Author: meiyi <[email protected]>
AuthorDate: Wed Jul 1 14:37:19 2026 +0800

    [fix](group commit) fix can not get a block queue (#63722)
    
    Under high-concurrency async stream load, group commit may fail to get a
    block queue when creating the group commit plan fragment fails or leaves
    pending load requests waiting for a queue that is no longer usable. This
    change tracks pending create-plan requests per table, adds a background
    worker to resubmit group commit plan creation, and introduces
    group_commit_create_plan_timeout_ms so waiting requests can be released
    after a bounded time.
---
 be/src/common/config.cpp                           |   3 +
 be/src/common/config.h                             |   2 +
 .../operator/group_commit_block_sink_operator.cpp  |  37 +--
 be/src/load/group_commit/group_commit_mgr.cpp      | 329 +++++++++++++++++----
 be/src/load/group_commit/group_commit_mgr.h        |  41 ++-
 be/src/load/group_commit/wal/wal_table.cpp         |  11 +-
 .../group_commit/test_group_commit_error.groovy    |  46 ++-
 ...ommit_stream_load_high_concurrency_async.groovy | 138 +++++++++
 8 files changed, 511 insertions(+), 96 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 2d7d2edb821..5f970b11708 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1425,6 +1425,9 @@ DEFINE_mInt32(group_commit_queue_mem_limit, "67108864");
 // group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% 
can be automatically identified.
 DEFINE_String(group_commit_wal_max_disk_limit, "10%");
 DEFINE_Bool(group_commit_wait_replay_wal_finish, "false");
+// Max time(ms) to wait for creating group commit plan fragment.
+// 0 means no timeout, default 2min.
+DEFINE_mInt32(group_commit_create_plan_timeout_ms, "120000");
 
 DEFINE_mInt32(scan_thread_nice_value, "0");
 DEFINE_mInt32(tablet_schema_cache_recycle_interval, "3600");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 016248f94e0..1cd03d999dd 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1508,6 +1508,8 @@ DECLARE_mInt32(group_commit_queue_mem_limit);
 // group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10% 
can be automatically identified.
 DECLARE_mString(group_commit_wal_max_disk_limit);
 DECLARE_Bool(group_commit_wait_replay_wal_finish);
+// Max time(ms) to wait for creating group commit plan fragment. 0 means no 
timeout.
+DECLARE_mInt32(group_commit_create_plan_timeout_ms);
 
 // The configuration item is used to lower the priority of the scanner thread,
 // typically employed to ensure CPU scheduling for write operations.
diff --git a/be/src/exec/operator/group_commit_block_sink_operator.cpp 
b/be/src/exec/operator/group_commit_block_sink_operator.cpp
index e441882f73b..66cf4aaee06 100644
--- a/be/src/exec/operator/group_commit_block_sink_operator.cpp
+++ b/be/src/exec/operator/group_commit_block_sink_operator.cpp
@@ -86,8 +86,8 @@ Status 
GroupCommitBlockSinkLocalState::_initialize_load_queue() {
     if (_state->exec_env()->wal_mgr()->is_running()) {
         
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
                 p._db_id, p._table_id, p._base_schema_version, 
p._schema->indexes().size(),
-                p._load_id, _load_block_queue, _state->be_exec_version(),
-                _state->query_mem_tracker(), _create_plan_dependency, 
_put_block_dependency));
+                p._load_id, _load_block_queue, _state->be_exec_version(), 
_create_plan_dependency,
+                _put_block_dependency));
         _state->set_import_label(_load_block_queue->label);
         _state->set_wal_id(_load_block_queue->txn_id); // wal_id is txn_id
         return Status::OK();
@@ -140,17 +140,16 @@ Status 
GroupCommitBlockSinkLocalState::_add_block(RuntimeState* state,
         block->get_by_position(i).column = 
make_nullable(block->get_by_position(i).column);
         block->get_by_position(i).type = 
make_nullable(block->get_by_position(i).type);
     }
-    // add block to queue
-    auto cur_mutable_block = MutableBlock::create_unique(block->clone_empty());
-    {
-        IColumn::Selector selector;
-        for (auto i = 0; i < block->rows(); i++) {
-            selector.emplace_back(i);
-        }
-        
RETURN_IF_ERROR(block->append_to_block_by_selector(cur_mutable_block.get(), 
selector));
-    }
+    // Add block to queue. The block has already been converted to 
all-nullable columns above
+    // and contains exactly the rows to enqueue (filtering is done by the 
caller before
+    // _add_block), so move its columns into a standalone output_block instead 
of deep-copying
+    // them. The previous code appended every row through an identity 
selector, which duplicated
+    // all column data (notably ColumnString chars) and dominated group-commit 
load memory.
+    // Columns are COW shared_ptrs, so swapping into a fresh Block is O(1) and 
memory-safe: the
+    // queued block is a distinct object (consumers swap/mutate only that 
object) while the
+    // underlying data stays alive via reference counting and is only read 
downstream.
     std::shared_ptr<Block> output_block = Block::create_shared();
-    output_block->swap(cur_mutable_block->to_block());
+    output_block->swap(*block);
     if (!_is_block_appended && state->num_rows_load_total() + 
state->num_rows_load_unselected() +
                                                state->num_rows_load_filtered() 
<=
                                        
config::group_commit_memory_rows_for_max_filter_ratio) {
@@ -337,10 +336,10 @@ Status 
GroupCommitBlockSinkOperatorX::sink_impl(RuntimeState* state, Block* inpu
     };
 
     auto rows = input_block->rows();
-    auto bytes = input_block->bytes();
     if (UNLIKELY(rows == 0)) {
         return wind_up();
     }
+    auto bytes = input_block->bytes();
 
     // update incrementally so that FE can get the progress.
     // the real 'num_rows_load_total' will be set when sink being closed.
@@ -362,11 +361,9 @@ Status 
GroupCommitBlockSinkOperatorX::sink_impl(RuntimeState* state, Block* inpu
         local_state._partitions.assign(rows, nullptr);
         local_state._filter_bitmap.Reset(rows);
 
-        for (int index = 0; index < rows; index++) {
-            local_state._vpartition->find_partition(block.get(), index,
-                                                    
local_state._partitions[index]);
-        }
         for (int row_index = 0; row_index < rows; row_index++) {
+            local_state._vpartition->find_partition(block.get(), row_index,
+                                                    
local_state._partitions[row_index]);
             if (local_state._partitions[row_index] == nullptr) [[unlikely]] {
                 local_state._filter_bitmap.Set(row_index, true);
                 LOG(WARNING) << "no partition for this tuple. tuple="
@@ -393,6 +390,8 @@ Status 
GroupCommitBlockSinkOperatorX::sink_impl(RuntimeState* state, Block* inpu
             local_state._has_filtered_rows) {
             auto cloneBlock = block->clone_without_columns();
             auto res_block = 
MutableBlock::build_mutable_block(std::move(cloneBlock));
+            std::vector<uint32_t> rows_to_keep;
+            rows_to_keep.reserve(rows);
             for (int i = 0; i < rows; ++i) {
                 if (local_state._block_convertor->filter_map()[i]) {
                     continue;
@@ -400,8 +399,10 @@ Status 
GroupCommitBlockSinkOperatorX::sink_impl(RuntimeState* state, Block* inpu
                 if (local_state._filter_bitmap.Get(i)) {
                     continue;
                 }
-                res_block.add_row(block.get(), i);
+                rows_to_keep.emplace_back(i);
             }
+            RETURN_IF_ERROR(res_block.add_rows(block.get(), 
rows_to_keep.data(),
+                                               rows_to_keep.data() + 
rows_to_keep.size()));
             block->swap(res_block.to_block());
         }
         // add block into block queue
diff --git a/be/src/load/group_commit/group_commit_mgr.cpp 
b/be/src/load/group_commit/group_commit_mgr.cpp
index 238901888b6..930c518ae9a 100644
--- a/be/src/load/group_commit/group_commit_mgr.cpp
+++ b/be/src/load/group_commit/group_commit_mgr.cpp
@@ -29,9 +29,12 @@
 #include "exec/pipeline/dependency.h"
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
+#include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/thread_context.h"
 #include "util/client_cache.h"
 #include "util/debug_points.h"
 #include "util/thrift_rpc_helper.h"
+#include "util/time.h"
 
 namespace doris {
 
@@ -99,18 +102,21 @@ Status LoadBlockQueue::add_block(RuntimeState* 
runtime_state, std::shared_ptr<Bl
             _all_block_queues_bytes->load(std::memory_order_relaxed) >=
                     config::group_commit_queue_mem_limit) {
             group_commit_block_by_memory_counter << 1;
-            DCHECK(_load_ids_to_write_dep.find(load_id) != 
_load_ids_to_write_dep.end());
-            _load_ids_to_write_dep[load_id]->block();
-            VLOG_DEBUG << "block add_block for load_id=" << load_id
-                       << ", memory=" << 
_all_block_queues_bytes->load(std::memory_order_relaxed)
-                       << ". inner load_id=" << load_instance_id << ", label=" 
<< label;
+            auto dep_it = _load_ids_to_write_dep.find(load_id);
+            DCHECK(dep_it != _load_ids_to_write_dep.end());
+            if (dep_it != _load_ids_to_write_dep.end() && dep_it->second) {
+                dep_it->second->block();
+                VLOG_DEBUG << "block add_block for load_id=" << load_id << ", 
memory="
+                           << 
_all_block_queues_bytes->load(std::memory_order_relaxed)
+                           << ". inner load_id=" << load_instance_id << ", 
label=" << label;
+            }
         }
     }
-    if (!_need_commit) {
+    if (!_need_commit.load()) {
         if (_data_bytes >= _group_commit_data_bytes) {
             VLOG_DEBUG << "group commit meets commit condition for data size, 
label=" << label
                        << ", instance_id=" << load_instance_id << ", 
data_bytes=" << _data_bytes;
-            _need_commit = true;
+            _need_commit.store(true);
             data_size_condition = true;
         }
         if 
(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
 -
@@ -118,7 +124,7 @@ Status LoadBlockQueue::add_block(RuntimeState* 
runtime_state, std::shared_ptr<Bl
                     .count() >= _group_commit_interval_ms) {
             VLOG_DEBUG << "group commit meets commit condition for time 
interval, label=" << label
                        << ", instance_id=" << load_instance_id << ", 
data_bytes=" << _data_bytes;
-            _need_commit = true;
+            _need_commit.store(true);
         }
     }
     for (auto read_dep : _read_deps) {
@@ -141,11 +147,11 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, Block* block, bool
     auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
                             std::chrono::steady_clock::now() - _start_time)
                             .count();
-    if (!_need_commit && duration >= _group_commit_interval_ms) {
-        _need_commit = true;
+    if (!_need_commit.load() && duration >= _group_commit_interval_ms) {
+        _need_commit.store(true);
     }
     if (_block_queue.empty()) {
-        if (_need_commit && duration >= 10 * _group_commit_interval_ms) {
+        if (_need_commit.load() && duration >= 10 * _group_commit_interval_ms) 
{
             auto last_print_duration = 
std::chrono::duration_cast<std::chrono::milliseconds>(
                                                
std::chrono::steady_clock::now() - _last_print_time)
                                                .count();
@@ -157,7 +163,7 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, Block* block, bool
             }
         }
         VLOG_DEBUG << "get_block for inner load_id=" << load_instance_id << ", 
but queue is empty";
-        if (!_need_commit) {
+        if (!_need_commit.load()) {
             get_block_dep->block();
             VLOG_DEBUG << "block get_block for inner load_id=" << 
load_instance_id;
         }
@@ -175,7 +181,7 @@ Status LoadBlockQueue::get_block(RuntimeState* 
runtime_state, Block* block, bool
                    << ". txn_id=" << txn_id << ", label=" << label
                    << ", instance_id=" << load_instance_id << ", load_ids=" << 
_get_load_ids();
     }
-    if (_block_queue.empty() && _need_commit && 
_load_ids_to_write_dep.empty()) {
+    if (_block_queue.empty() && _need_commit.load() && 
_load_ids_to_write_dep.empty()) {
         *eos = true;
     } else {
         *eos = false;
@@ -214,9 +220,12 @@ bool LoadBlockQueue::contain_load_id(const UniqueId& 
load_id) {
 Status LoadBlockQueue::add_load_id(const UniqueId& load_id,
                                    const std::shared_ptr<Dependency> 
put_block_dep) {
     std::unique_lock l(mutex);
-    if (_need_commit) {
-        return Status::InternalError<false>("block queue is set need commit, 
id=" +
-                                            load_instance_id.to_string());
+    if (_need_commit.load() || !status.ok() || process_finish.load()) {
+        return Status::InternalError<false>(
+                "block queue cannot add load id, id=" + 
load_instance_id.to_string() +
+                ", need_commit=" + (_need_commit.load() ? "true" : "false") +
+                ", process_finish=" + (process_finish.load() ? "true" : 
"false") +
+                ", queue_status=" + status.to_string());
     }
     _load_ids_to_write_dep[load_id] = put_block_dep;
     group_commit_load_count.fetch_add(1);
@@ -236,7 +245,7 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) 
{
             Status::Cancelled("cancel group_commit, label=" + label + ", 
status=" + st.to_string());
     size_t before_block_queues_bytes = _all_block_queues_bytes->load();
     while (!_block_queue.empty()) {
-        const BlockData& block_data = _block_queue.front().block;
+        const BlockData& block_data = _block_queue.front();
         _all_block_queues_bytes->fetch_sub(block_data.block_bytes, 
std::memory_order_relaxed);
         _block_queue.pop_front();
     }
@@ -258,8 +267,7 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st) 
{
 Status GroupCommitTable::get_first_block_load_queue(
         int64_t table_id, int64_t base_schema_version, int64_t index_size, 
const UniqueId& load_id,
         std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
-        std::shared_ptr<MemTrackerLimiter> mem_tracker, 
std::shared_ptr<Dependency> create_plan_dep,
-        std::shared_ptr<Dependency> put_block_dep) {
+        std::shared_ptr<Dependency> create_plan_dep, 
std::shared_ptr<Dependency> put_block_dep) {
     DCHECK(table_id == _table_id);
     std::unique_lock l(_lock);
     auto try_to_get_matched_queue = [&]() -> Status {
@@ -292,31 +300,131 @@ Status GroupCommitTable::get_first_block_load_queue(
         return Status::OK();
     }
     create_plan_dep->block();
+    _create_plan_be_exe_version = be_exe_version;
+    if (_create_plan_deps.empty()) {
+        _create_plan_start_time_ms = MonotonicMillis();
+    }
     _create_plan_deps.emplace(load_id, std::make_tuple(create_plan_dep, 
put_block_dep,
                                                        base_schema_version, 
index_size));
-    if (!_is_creating_plan_fragment) {
-        _is_creating_plan_fragment = true;
-        RETURN_IF_ERROR(
-                _thread_pool->submit_func([&, be_exe_version, mem_tracker, dep 
= create_plan_dep] {
-                    Defer defer {[&, dep = dep]() {
-                        std::unique_lock l(_lock);
-                        for (auto it : _create_plan_deps) {
-                            std::get<0>(it.second)->set_ready();
+    [[maybe_unused]] auto submit_st = _submit_create_group_commit_load();
+    return try_to_get_matched_queue();
+}
+
+Status GroupCommitTable::submit_create_group_commit_load() {
+    std::unique_lock l(_lock);
+    if (_create_plan_deps.empty()) {
+        return Status::OK();
+    }
+    return _submit_create_group_commit_load();
+}
+
+Status GroupCommitTable::_submit_create_group_commit_load() {
+    if (_is_creating_plan_fragment) {
+        return Status::OK();
+    }
+
+    int64_t timeout_ms = config::group_commit_create_plan_timeout_ms;
+    if (timeout_ms > 0 && !_create_plan_deps.empty()) {
+        int64_t now_ms = MonotonicMillis();
+        if (_create_plan_start_time_ms > 0 && now_ms - 
_create_plan_start_time_ms > timeout_ms) {
+            std::string last_create_plan_failed_reason = 
_create_plan_failed_reason;
+            _create_plan_failed_reason =
+                    ". group commit create plan timeout after " + 
std::to_string(timeout_ms) + "ms";
+            if (!last_create_plan_failed_reason.empty()) {
+                _create_plan_failed_reason +=
+                        ", last create plan error: " + 
last_create_plan_failed_reason;
+            }
+            for (const auto& [id, load_info] : _create_plan_deps) {
+                std::get<0>(load_info)->set_ready();
+            }
+            _create_plan_deps.clear();
+            _create_plan_start_time_ms = 0;
+            return Status::OK();
+        }
+    }
+
+    auto mem_tracker = _group_commit_mgr->group_commit_mem_tracker();
+    int be_exe_version = _create_plan_be_exe_version;
+    _is_creating_plan_fragment = true;
+    auto submit_st = _thread_pool->submit_func([&, be_exe_version, 
mem_tracker] {
+        std::shared_ptr<LoadBlockQueue> created_load_block_queue;
+        Status create_group_commit_st = Status::OK();
+        std::string create_plan_failed_reason;
+        Defer defer {[&]() {
+            bool need_resubmit = !create_group_commit_st.ok();
+            std::unique_lock l(_lock);
+            _is_creating_plan_fragment = false;
+            _create_plan_failed_reason = create_plan_failed_reason;
+            if (created_load_block_queue && create_group_commit_st.ok() &&
+                !created_load_block_queue->need_commit()) {
+                std::vector<UniqueId> success_load_ids;
+                for (const auto& [id, load_info] : _create_plan_deps) {
+                    auto create_dep = std::get<0>(load_info);
+                    auto put_dep = std::get<1>(load_info);
+                    if (created_load_block_queue->schema_version == 
std::get<2>(load_info) &&
+                        created_load_block_queue->index_size == 
std::get<3>(load_info)) {
+                        auto st = created_load_block_queue->add_load_id(id, 
put_dep);
+                        if (!st.ok()) {
+                            LOG(WARNING) << "failed to add pending load_id 
into created "
+                                            "group commit queue, load_id="
+                                         << id << ", label=" << 
created_load_block_queue->label
+                                         << ", status=" << st.to_string();
+                            need_resubmit = true;
+                        } else {
+                            create_dep->set_ready();
+                            success_load_ids.emplace_back(id);
                         }
-                        _create_plan_deps.clear();
-                        _is_creating_plan_fragment = false;
-                    }};
-                    auto st = _create_group_commit_load(be_exe_version, 
mem_tracker);
-                    if (!st.ok()) {
-                        LOG(WARNING) << "create group commit load error: " << 
st.to_string();
-                        _create_plan_failed_reason = ". create group commit 
load error: " +
-                                                     st.to_string().substr(0, 
300);
-                    } else {
-                        _create_plan_failed_reason = "";
+                    } else if (created_load_block_queue->schema_version > 
std::get<2>(load_info) ||
+                               (created_load_block_queue->schema_version ==
+                                        std::get<2>(load_info) &&
+                                created_load_block_queue->index_size != 
std::get<3>(load_info))) {
+                        // schema version mismatch:
+                        //   1. the schema version of created load block queue 
is newer than the load request
+                        //   2. the index size is not equal
+                        // set ready for the load request to let it fail
+                        create_dep->set_ready();
+                        success_load_ids.emplace_back(id);
                     }
-                }));
-    }
-    return try_to_get_matched_queue();
+                }
+                for (const auto& id : success_load_ids) {
+                    _create_plan_deps.erase(id);
+                }
+                if (_create_plan_deps.empty()) {
+                    _create_plan_start_time_ms = 0;
+                }
+            }
+            if (!_create_plan_deps.empty()) {
+                need_resubmit = true;
+            }
+            if (need_resubmit && _group_commit_mgr) {
+                LOG(INFO) << "resubmit create group commit load task for 
table: " << _table_id;
+                _group_commit_mgr->add_need_create_plan_table(_table_id);
+            }
+        }};
+        create_group_commit_st =
+                _create_group_commit_load(be_exe_version, mem_tracker, 
created_load_block_queue);
+        if (!create_group_commit_st.ok()) {
+            LOG(WARNING) << "create group commit load error: "
+                         << create_group_commit_st.to_string();
+            create_plan_failed_reason = ". create group commit load error: " +
+                                        
create_group_commit_st.to_string().substr(0, 300);
+        } else {
+            create_plan_failed_reason = "";
+        }
+    });
+    if (!submit_st.ok()) {
+        _is_creating_plan_fragment = false;
+        _create_plan_failed_reason =
+                ". create group commit load error: submit create group commit 
load task failed: " +
+                submit_st.to_string().substr(0, 300);
+        for (const auto& [id, load_info] : _create_plan_deps) {
+            std::get<0>(load_info)->set_ready();
+        }
+        _create_plan_deps.clear();
+        LOG(WARNING) << "submit create group commit load task for table: " << 
_table_id
+                     << ", error: " << submit_st.to_string();
+    }
+    return submit_st;
 }
 
 void GroupCommitTable::remove_load_id(const UniqueId& load_id) {
@@ -332,8 +440,9 @@ void GroupCommitTable::remove_load_id(const UniqueId& 
load_id) {
     }
 }
 
-Status GroupCommitTable::_create_group_commit_load(int be_exe_version,
-                                                   
std::shared_ptr<MemTrackerLimiter> mem_tracker) {
+Status GroupCommitTable::_create_group_commit_load(
+        int be_exe_version, const std::shared_ptr<MemTrackerLimiter>& 
mem_tracker,
+        std::shared_ptr<LoadBlockQueue>& created_load_block_queue) {
     Status st = Status::OK();
     TStreamLoadPutResult result;
     std::string label;
@@ -405,6 +514,7 @@ Status GroupCommitTable::_create_group_commit_load(int 
be_exe_version,
                     _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(),
                     
pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs,
                     be_exe_version));
+            created_load_block_queue = load_block_queue;
 
             std::unique_lock l(_lock);
             _load_block_queues.emplace(instance_id, load_block_queue);
@@ -418,11 +528,23 @@ Status GroupCommitTable::_create_group_commit_load(int 
be_exe_version,
                         create_dep->set_ready();
                         success_load_ids.emplace_back(id);
                     }
+                } else if (load_block_queue->schema_version > 
std::get<2>(load_info) ||
+                           (load_block_queue->schema_version == 
std::get<2>(load_info) &&
+                            load_block_queue->index_size != 
std::get<3>(load_info))) {
+                    // schema version mismatch:
+                    //   1. the schema version of created load block queue is 
newer than the load request
+                    //   2. the index size is not equal
+                    // set ready for the load request to let it fail
+                    create_dep->set_ready();
+                    success_load_ids.emplace_back(id);
                 }
             }
             for (const auto& id : success_load_ids) {
                 _create_plan_deps.erase(id);
             }
+            if (_create_plan_deps.empty()) {
+                _create_plan_start_time_ms = 0;
+            }
         }
     }
     st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, 
result.pipeline_params);
@@ -482,16 +604,25 @@ Status 
GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
                         client->loadTxnCommit(result, request);
                     },
                     config::txn_commit_rpc_timeout_ms);
-            result_status = Status::create(result.status);
-            // DELETE_BITMAP_LOCK_ERROR will be retried
-            if (result_status.ok() || 
!result_status.is<ErrorCode::DELETE_BITMAP_LOCK_ERROR>()) {
-                break;
+            if (st.ok()) {
+                result_status = Status::create(result.status);
+                // DELETE_BITMAP_LOCK_ERROR will be retried
+                if (result_status.ok() ||
+                    !result_status.is<ErrorCode::DELETE_BITMAP_LOCK_ERROR>()) {
+                    break;
+                }
+                LOG_WARNING("Failed to commit txn on group commit")
+                        .tag("label", label)
+                        .tag("txn_id", txn_id)
+                        .tag("retry_times", retry_times)
+                        .error(result_status);
+            } else {
+                LOG_WARNING("Failed to commit txn on group commit")
+                        .tag("label", label)
+                        .tag("txn_id", txn_id)
+                        .tag("retry_times", retry_times)
+                        .error(st);
             }
-            LOG_WARNING("Failed to commit txn on group commit")
-                    .tag("label", label)
-                    .tag("txn_id", txn_id)
-                    .tag("retry_times", retry_times)
-                    .error(result_status);
             retry_times++;
         }
         
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_success_and_rpc_error",
@@ -512,7 +643,9 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
                 [&request, &result](FrontendServiceConnection& client) {
                     client->loadTxnRollback(result, request);
                 });
-        result_status = Status::create<false>(result.status);
+        if (st.ok()) {
+            result_status = Status::create<false>(result.status);
+        }
         DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status", 
{
             std ::string msg = "abort txn";
             LOG(INFO) << "debug promise set: " << msg;
@@ -542,6 +675,10 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t 
db_id, int64_t table_
         }
         _load_block_queues.erase(instance_id);
     }
+    if (!load_block_queue) {
+        LOG(WARNING) << "finish group commit can not find load block queue, 
label=" << label
+                     << ", txn_id=" << txn_id << ", instance_id=" << 
print_id(instance_id);
+    }
     // status: exec_plan_fragment result
     // st: commit txn rpc status
     // result_status: commit txn result
@@ -566,11 +703,15 @@ Status 
GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
        << ", txn_id=" << txn_id << ", instance_id=" << print_id(instance_id)
        << ", exec_plan_fragment status=" << status.to_string()
        << ", commit/abort txn rpc status=" << st.to_string()
-       << ", commit/abort txn status=" << result_status.to_string()
-       << ", this group commit includes " << 
load_block_queue->group_commit_load_count << " loads"
-       << ", flush because meet "
-       << (load_block_queue->data_size_condition ? "data size " : "time ") << 
"condition"
-       << ", wal space info:" << 
ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
+       << ", commit/abort txn status=" << result_status.to_string();
+    if (load_block_queue) {
+        ss << ", this group commit includes " << 
load_block_queue->group_commit_load_count
+           << " loads, flush because meet "
+           << (load_block_queue->data_size_condition ? "data size " : "time ") 
<< "condition";
+    } else {
+        ss << ", load block queue is missing when finishing group commit";
+    }
+    ss << ", wal space info:" << 
ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
     if (state) {
         if (!state->get_error_log_file_path().empty()) {
             ss << ", error_url=" << state->get_error_log_file_path();
@@ -630,35 +771,95 @@ GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) : 
_exec_env(exec_env) {
                               
.set_max_threads(config::group_commit_insert_threads)
                               .build(&_thread_pool));
     _all_block_queues_bytes = std::make_shared<std::atomic_size_t>(0);
+    _group_commit_mem_tracker =
+            MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD, 
"GroupCommit");
+    _create_plan_thread = std::thread(&GroupCommitMgr::_create_plan_worker, 
this);
 }
 
 GroupCommitMgr::~GroupCommitMgr() {
+    stop();
     LOG(INFO) << "GroupCommitMgr is destoried";
 }
 
 void GroupCommitMgr::stop() {
+    {
+        std::lock_guard<std::mutex> l(_need_create_plan_lock);
+        if (_stopped) {
+            return;
+        }
+        _stopped = true;
+    }
+    _need_create_plan_cv.notify_all();
+    if (_create_plan_thread.joinable()) {
+        _create_plan_thread.join();
+    }
     _thread_pool->shutdown();
     LOG(INFO) << "GroupCommitMgr is stopped";
 }
 
-Status GroupCommitMgr::get_first_block_load_queue(
-        int64_t db_id, int64_t table_id, int64_t base_schema_version, int64_t 
index_size,
-        const UniqueId& load_id, std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
-        int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker,
-        std::shared_ptr<Dependency> create_plan_dep, 
std::shared_ptr<Dependency> put_block_dep) {
+void GroupCommitMgr::add_need_create_plan_table(int64_t table_id) {
+    {
+        std::lock_guard<std::mutex> l(_need_create_plan_lock);
+        if (_stopped) {
+            return;
+        }
+        _need_create_plan_tables.insert(table_id);
+    }
+    _need_create_plan_cv.notify_one();
+}
+
+void GroupCommitMgr::_create_plan_worker() {
+    SCOPED_INIT_THREAD_CONTEXT();
+    while (true) {
+        std::set<int64_t> need_create_plan_tables;
+        {
+            std::unique_lock<std::mutex> l(_need_create_plan_lock);
+            _need_create_plan_cv.wait(
+                    l, [this] { return _stopped || 
!_need_create_plan_tables.empty(); });
+            if (_stopped && _need_create_plan_tables.empty()) {
+                return;
+            }
+            need_create_plan_tables.swap(_need_create_plan_tables);
+        }
+        for (const auto table_id : need_create_plan_tables) {
+            std::shared_ptr<GroupCommitTable> group_commit_table;
+            {
+                std::lock_guard<std::mutex> l(_lock);
+                auto it = _table_map.find(table_id);
+                if (it == _table_map.end()) {
+                    continue;
+                }
+                group_commit_table = it->second;
+            }
+            auto st = group_commit_table->submit_create_group_commit_load();
+            if (!st.ok()) {
+                LOG(WARNING) << "submit create group commit load task from 
worker for table: "
+                             << table_id << ", error: " << st.to_string();
+            }
+        }
+    }
+}
+
+Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t 
table_id,
+                                                  int64_t base_schema_version, 
int64_t index_size,
+                                                  const UniqueId& load_id,
+                                                  
std::shared_ptr<LoadBlockQueue>& load_block_queue,
+                                                  int be_exe_version,
+                                                  std::shared_ptr<Dependency> 
create_plan_dep,
+                                                  std::shared_ptr<Dependency> 
put_block_dep) {
     std::shared_ptr<GroupCommitTable> group_commit_table;
     {
         std::lock_guard wlock(_lock);
         if (_table_map.find(table_id) == _table_map.end()) {
             _table_map.emplace(table_id, std::make_shared<GroupCommitTable>(
                                                  _exec_env, 
_thread_pool.get(), db_id, table_id,
-                                                 _all_block_queues_bytes));
+                                                 _all_block_queues_bytes, 
this));
         }
         group_commit_table = _table_map[table_id];
     }
     RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
             table_id, base_schema_version, index_size, load_id, 
load_block_queue, be_exe_version,
-            mem_tracker, create_plan_dep, put_block_dep));
+            create_plan_dep, put_block_dep));
     return Status::OK();
 }
 
diff --git a/be/src/load/group_commit/group_commit_mgr.h 
b/be/src/load/group_commit/group_commit_mgr.h
index bf892a4302c..5ab3831e4f5 100644
--- a/be/src/load/group_commit/group_commit_mgr.h
+++ b/be/src/load/group_commit/group_commit_mgr.h
@@ -25,7 +25,9 @@
 #include <future>
 #include <memory>
 #include <mutex>
+#include <set>
 #include <shared_mutex>
+#include <thread>
 #include <unordered_map>
 #include <utility>
 
@@ -38,6 +40,7 @@
 
 namespace doris {
 class ExecEnv;
+class GroupCommitMgr;
 class TUniqueId;
 class RuntimeState;
 
@@ -76,7 +79,7 @@ public:
     Status add_load_id(const UniqueId& load_id, const 
std::shared_ptr<Dependency> put_block_dep);
     Status remove_load_id(const UniqueId& load_id);
     void cancel(const Status& st);
-    bool need_commit() { return _need_commit; }
+    bool need_commit() const { return _need_commit.load(); }
 
     Status create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const 
std::string& import_label,
                       WalManager* wal_manager, std::vector<TSlotDescriptor>& 
slot_desc,
@@ -95,8 +98,8 @@ public:
                 "wait_internal_group_commit_finish={}, data_size_condition={}, 
"
                 "group_commit_load_count={}, process_finish={}, 
_need_commit={}, schema_version={}",
                 load_instance_id.to_string(), label, txn_id, 
wait_internal_group_commit_finish,
-                data_size_condition, group_commit_load_count, 
process_finish.load(), _need_commit,
-                schema_version);
+                data_size_condition, group_commit_load_count, 
process_finish.load(),
+                _need_commit.load(), schema_version);
         return fmt::to_string(debug_string_buffer);
     }
 
@@ -135,7 +138,7 @@ private:
     std::shared_ptr<VWalWriter> _v_wal_writer;
 
     // commit
-    bool _need_commit = false;
+    std::atomic<bool> _need_commit = false;
     // commit by time interval, can be changed by 'ALTER TABLE my_table SET 
("group_commit_interval_ms"="1000");'
     int64_t _group_commit_interval_ms;
     std::chrono::steady_clock::time_point _start_time;
@@ -152,27 +155,31 @@ private:
 class GroupCommitTable {
 public:
     GroupCommitTable(ExecEnv* exec_env, doris::ThreadPool* thread_pool, 
int64_t db_id,
-                     int64_t table_id, std::shared_ptr<std::atomic_size_t> 
all_block_queue_bytes)
+                     int64_t table_id, std::shared_ptr<std::atomic_size_t> 
all_block_queue_bytes,
+                     GroupCommitMgr* group_commit_mgr)
             : _exec_env(exec_env),
               _thread_pool(thread_pool),
               _all_block_queues_bytes(all_block_queue_bytes),
               _db_id(db_id),
-              _table_id(table_id) {};
+              _table_id(table_id),
+              _group_commit_mgr(group_commit_mgr) {};
     Status get_first_block_load_queue(int64_t table_id, int64_t 
base_schema_version,
                                       int64_t index_size, const UniqueId& 
load_id,
                                       std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
                                       int be_exe_version,
-                                      std::shared_ptr<MemTrackerLimiter> 
mem_tracker,
                                       std::shared_ptr<Dependency> 
create_plan_dep,
                                       std::shared_ptr<Dependency> 
put_block_dep);
     Status get_load_block_queue(const TUniqueId& instance_id,
                                 std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
                                 std::shared_ptr<Dependency> get_block_dep);
     void remove_load_id(const UniqueId& load_id);
+    Status submit_create_group_commit_load();
 
 private:
+    Status _submit_create_group_commit_load();
     Status _create_group_commit_load(int be_exe_version,
-                                     std::shared_ptr<MemTrackerLimiter> 
mem_tracker);
+                                     const std::shared_ptr<MemTrackerLimiter>& 
mem_tracker,
+                                     std::shared_ptr<LoadBlockQueue>& 
created_load_block_queue);
     Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const 
std::string& label,
                                int64_t txn_id, const TPipelineFragmentParams& 
pipeline_params);
     Status _finish_group_commit_load(int64_t db_id, int64_t table_id, const 
std::string& label,
@@ -186,6 +193,7 @@ private:
 
     int64_t _db_id;
     int64_t _table_id;
+    GroupCommitMgr* _group_commit_mgr = nullptr;
 
     std::mutex _lock;
     // fragment_instance_id to load_block_queue
@@ -196,6 +204,8 @@ private:
                                             std::shared_ptr<Dependency>, 
int64_t, int64_t>>
             _create_plan_deps;
     std::string _create_plan_failed_reason;
+    int _create_plan_be_exe_version = 0;
+    int64_t _create_plan_start_time_ms = 0;
 };
 
 class GroupCommitMgr {
@@ -213,14 +223,19 @@ public:
                                       int64_t index_size, const UniqueId& 
load_id,
                                       std::shared_ptr<LoadBlockQueue>& 
load_block_queue,
                                       int be_exe_version,
-                                      std::shared_ptr<MemTrackerLimiter> 
mem_tracker,
                                       std::shared_ptr<Dependency> 
create_plan_dep,
                                       std::shared_ptr<Dependency> 
put_block_dep);
     void remove_load_id(int64_t table_id, const UniqueId& load_id);
     std::promise<Status> debug_promise;
     std::future<Status> debug_future = debug_promise.get_future();
+    void add_need_create_plan_table(int64_t table_id);
+    std::shared_ptr<MemTrackerLimiter> group_commit_mem_tracker() {
+        return _group_commit_mem_tracker;
+    }
 
 private:
+    void _create_plan_worker();
+
     ExecEnv* _exec_env = nullptr;
     std::unique_ptr<doris::ThreadPool> _thread_pool;
     // memory consumption of all tables' load block queues, used for memory 
back pressure.
@@ -229,6 +244,12 @@ private:
     std::mutex _lock;
     // TODO remove table when unused
     std::unordered_map<int64_t, std::shared_ptr<GroupCommitTable>> _table_map;
+    std::mutex _need_create_plan_lock;
+    std::condition_variable _need_create_plan_cv;
+    std::set<int64_t> _need_create_plan_tables;
+    bool _stopped = false;
+    std::thread _create_plan_thread;
+    std::shared_ptr<MemTrackerLimiter> _group_commit_mem_tracker;
 };
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/load/group_commit/wal/wal_table.cpp 
b/be/src/load/group_commit/wal/wal_table.cpp
index 4c6c0e1edb5..5dfd54354c3 100644
--- a/be/src/load/group_commit/wal/wal_table.cpp
+++ b/be/src/load/group_commit/wal/wal_table.cpp
@@ -183,9 +183,14 @@ Status WalTable::_try_abort_txn(int64_t db_id, 
std::string& label) {
             [&request, &result](FrontendServiceConnection& client) {
                 client->loadTxnRollback(result, request);
             });
-    auto result_status = Status::create<false>(result.status);
-    LOG(INFO) << "abort label " << label << ", st:" << st << ", 
result_status:" << result_status;
-    return result_status;
+    if (st.ok()) {
+        auto result_status = Status::create<false>(result.status);
+        LOG(INFO) << "abort label " << label << ", result_status:" << 
result_status;
+        return result_status;
+    } else {
+        LOG(WARNING) << "abort label " << label << ", rpc error:" << st;
+        return st;
+    }
 }
 
 Status WalTable::_replay_wal_internal(const std::string& wal) {
diff --git 
a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy 
b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
index cef9bbdbf27..f2797ff7a72 100644
--- 
a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
+++ 
b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
@@ -17,6 +17,50 @@
 
 suite("test_group_commit_error", "nonConcurrent") {
     def tableName = "test_group_commit_error"
+    def beConfigName = "group_commit_create_plan_timeout_ms"
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+    def get_be_config = { String backend_id, String key ->
+        def (code, out, err) = 
show_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id))
+        logger.info("show config: code=" + code + ", out=" + out + ", err=" + 
err)
+        assertEquals(code, 0)
+        def configList = parseJson(out.trim())
+        assert configList instanceof List
+        for (Object ele in (List) configList) {
+            assert ele instanceof List<String>
+            if (((List<String>) ele)[0] == key) {
+                return ((List<String>) ele)[2]
+            }
+        }
+        assertTrue(false, "Failed to find BE config: " + key)
+    }
+
+    def set_be_config = { String key, String value ->
+        for (String backend_id : backendId_to_backendIP.keySet()) {
+            def (code, out, err) = 
update_be_config(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), key, value)
+            logger.info("update config: code=" + code + ", out=" + out + ", 
err=" + err)
+            assertEquals(code, 0)
+        }
+    }
+
+    def originBeConfig = [:]
+    for (String backend_id : backendId_to_backendIP.keySet()) {
+        originBeConfig[backend_id] = get_be_config(backend_id, beConfigName)
+    }
+
+    onFinish {
+        for (String backend_id : originBeConfig.keySet()) {
+            def (code, out, err) = 
update_be_config(backendId_to_backendIP.get(backend_id),
+                    backendId_to_backendHttpPort.get(backend_id), 
beConfigName, originBeConfig[backend_id])
+            logger.info("restore config: code=" + code + ", out=" + out + ", 
err=" + err)
+        }
+        GetDebugPoint().clearDebugPointsForAllBEs()
+        GetDebugPoint().clearDebugPointsForAllFEs()
+    }
+
+    set_be_config(beConfigName, "20000")
 
     sql """ DROP TABLE IF EXISTS ${tableName} """
     sql """
@@ -104,4 +148,4 @@ suite("test_group_commit_error", "nonConcurrent") {
     } finally {
         GetDebugPoint().clearDebugPointsForAllBEs()
     }
-}
\ No newline at end of file
+}
diff --git 
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load_high_concurrency_async.groovy
 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load_high_concurrency_async.groovy
new file mode 100644
index 00000000000..237d7fc649c
--- /dev/null
+++ 
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load_high_concurrency_async.groovy
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+import java.util.Collections
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicInteger
+
+suite("test_group_commit_stream_load_high_concurrency_async", "p0") {
+    def tableName = "test_group_commit_stream_load_high_concurrency_async"
+    int concurrentClients = 20
+    int loadsPerClient = 20
+    int expectedRows = concurrentClients * loadsPerClient
+    def errors = Collections.synchronizedList(new ArrayList<String>())
+    def stopRequested = new AtomicBoolean(false)
+    def successLoads = new AtomicInteger(0)
+    def getProperty = { property, userName ->
+        def result = sql_return_maparray """SHOW PROPERTY FOR '${userName}'"""
+        result.find {
+            it.Key == property as String
+        }
+    }
+    def originMaxUserConnections = getProperty("max_user_connections", 
"root").Value as long
+
+    def waitRowCount = { expected ->
+        Awaitility.await().atMost(300, SECONDS).pollInterval(2, 
SECONDS).until({
+            def result = sql "select count(*) from ${tableName}"
+            logger.info("table: ${tableName}, rowCount: ${result}, expected: 
${expected}")
+            return result[0][0] == expected
+        })
+    }
+
+    def checkStreamLoadResult = { loadId, result, exception ->
+        if (exception != null) {
+            stopRequested.set(true)
+            errors.add("load ${loadId} exception: ${exception.getMessage()}")
+            return
+        }
+        def json = parseJson(result)
+        if (!"success".equalsIgnoreCase(json.Status?.toString())) {
+            stopRequested.set(true)
+            errors.add("load ${loadId} status=${json.Status}, 
msg=${json.Message}")
+            return
+        }
+        if (json.GroupCommit != true) {
+            stopRequested.set(true)
+            errors.add("load ${loadId} is not group commit: ${result}")
+            return
+        }
+        if (json.NumberTotalRows != 1 || json.NumberLoadedRows != 1 ||
+            json.NumberFilteredRows != 0 || json.NumberUnselectedRows != 0) {
+            stopRequested.set(true)
+            errors.add("load ${loadId} unexpected counters: ${result}")
+            return
+        }
+        successLoads.incrementAndGet()
+    }
+
+    try {
+        sql """SET PROPERTY FOR 'root' 'max_user_connections' = '1024'"""
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+            CREATE TABLE ${tableName} (
+                id BIGINT NOT NULL,
+                name STRING NULL
+            ) ENGINE=OLAP
+            DUPLICATE KEY(id)
+            DISTRIBUTED BY HASH(id) BUCKETS 1
+            PROPERTIES (
+                "replication_num" = "1",
+                "group_commit_interval_ms" = "5"
+            );
+        """
+
+        def threads = []
+        for (int client = 0; client < concurrentClients; client++) {
+            int clientId = client
+            
threads.add(Thread.startDaemon("group-commit-stream-load-${clientId}") {
+                for (int round = 0; round < loadsPerClient; round++) {
+                    if (stopRequested.get()) {
+                        break
+                    }
+                    long loadId = clientId * loadsPerClient + round
+                    try {
+                        streamLoad {
+                            table "${tableName}"
+                            set 'column_separator', ','
+                            set 'group_commit', 'async_mode'
+                            unset 'label'
+                            inputText "${loadId},name_${loadId}\n"
+                            time 60000
+
+                            check { result, exception, startTime, endTime ->
+                                checkStreamLoadResult(loadId, result, 
exception)
+                            }
+                        }
+                    } catch (Exception e) {
+                        stopRequested.set(true)
+                        errors.add("load ${loadId} streamLoad throws: 
${e.getMessage()}")
+                        break
+                    }
+                }
+            })
+        }
+
+        threads.each { it.join() }
+
+        assertTrue(errors.isEmpty(),
+                "group commit stream load failures: " +
+                        errors.subList(0, Math.min(errors.size(), 10)))
+        assertEquals(expectedRows, successLoads.get())
+
+        sql "sync"
+        waitRowCount(expectedRows)
+
+        def result = sql "select count(*) from ${tableName}"
+        assertEquals(expectedRows, result[0][0])
+    } finally {
+        sql """SET PROPERTY FOR 'root' 'max_user_connections' = 
'${originMaxUserConnections}'"""
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to