This is an automated email from the ASF dual-hosted git repository.
mymeiyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7b2c4b91f11 [fix](group commit) fix can not get a block queue (#63722)
7b2c4b91f11 is described below
commit 7b2c4b91f1172cfa49e14db431866eaec2fb3d90
Author: meiyi <[email protected]>
AuthorDate: Wed Jul 1 14:37:19 2026 +0800
[fix](group commit) fix can not get a block queue (#63722)
Under high-concurrency async stream load, group commit may fail to get a
block queue when creating the group commit plan fragment fails or leaves
pending load requests waiting for a queue that is no longer usable. This
change tracks pending create-plan requests per table, adds a background
worker to resubmit group commit plan creation, and introduces
group_commit_create_plan_timeout_ms so waiting requests can be released
after a bounded time.
---
be/src/common/config.cpp | 3 +
be/src/common/config.h | 2 +
.../operator/group_commit_block_sink_operator.cpp | 37 +--
be/src/load/group_commit/group_commit_mgr.cpp | 329 +++++++++++++++++----
be/src/load/group_commit/group_commit_mgr.h | 41 ++-
be/src/load/group_commit/wal/wal_table.cpp | 11 +-
.../group_commit/test_group_commit_error.groovy | 46 ++-
...ommit_stream_load_high_concurrency_async.groovy | 138 +++++++++
8 files changed, 511 insertions(+), 96 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 2d7d2edb821..5f970b11708 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1425,6 +1425,9 @@ DEFINE_mInt32(group_commit_queue_mem_limit, "67108864");
// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10%
can be automatically identified.
DEFINE_String(group_commit_wal_max_disk_limit, "10%");
DEFINE_Bool(group_commit_wait_replay_wal_finish, "false");
+// Max time(ms) to wait for creating group commit plan fragment.
+// 0 means no timeout, default 2min.
+DEFINE_mInt32(group_commit_create_plan_timeout_ms, "120000");
DEFINE_mInt32(scan_thread_nice_value, "0");
DEFINE_mInt32(tablet_schema_cache_recycle_interval, "3600");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 016248f94e0..1cd03d999dd 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1508,6 +1508,8 @@ DECLARE_mInt32(group_commit_queue_mem_limit);
// group_commit_wal_max_disk_limit=1024 or group_commit_wal_max_disk_limit=10%
can be automatically identified.
DECLARE_mString(group_commit_wal_max_disk_limit);
DECLARE_Bool(group_commit_wait_replay_wal_finish);
+// Max time(ms) to wait for creating group commit plan fragment. 0 means no
timeout.
+DECLARE_mInt32(group_commit_create_plan_timeout_ms);
// The configuration item is used to lower the priority of the scanner thread,
// typically employed to ensure CPU scheduling for write operations.
diff --git a/be/src/exec/operator/group_commit_block_sink_operator.cpp
b/be/src/exec/operator/group_commit_block_sink_operator.cpp
index e441882f73b..66cf4aaee06 100644
--- a/be/src/exec/operator/group_commit_block_sink_operator.cpp
+++ b/be/src/exec/operator/group_commit_block_sink_operator.cpp
@@ -86,8 +86,8 @@ Status
GroupCommitBlockSinkLocalState::_initialize_load_queue() {
if (_state->exec_env()->wal_mgr()->is_running()) {
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
p._db_id, p._table_id, p._base_schema_version,
p._schema->indexes().size(),
- p._load_id, _load_block_queue, _state->be_exec_version(),
- _state->query_mem_tracker(), _create_plan_dependency,
_put_block_dependency));
+ p._load_id, _load_block_queue, _state->be_exec_version(),
_create_plan_dependency,
+ _put_block_dependency));
_state->set_import_label(_load_block_queue->label);
_state->set_wal_id(_load_block_queue->txn_id); // wal_id is txn_id
return Status::OK();
@@ -140,17 +140,16 @@ Status
GroupCommitBlockSinkLocalState::_add_block(RuntimeState* state,
block->get_by_position(i).column =
make_nullable(block->get_by_position(i).column);
block->get_by_position(i).type =
make_nullable(block->get_by_position(i).type);
}
- // add block to queue
- auto cur_mutable_block = MutableBlock::create_unique(block->clone_empty());
- {
- IColumn::Selector selector;
- for (auto i = 0; i < block->rows(); i++) {
- selector.emplace_back(i);
- }
-
RETURN_IF_ERROR(block->append_to_block_by_selector(cur_mutable_block.get(),
selector));
- }
+ // Add block to queue. The block has already been converted to
all-nullable columns above
+ // and contains exactly the rows to enqueue (filtering is done by the
caller before
+ // _add_block), so move its columns into a standalone output_block instead
of deep-copying
+ // them. The previous code appended every row through an identity
selector, which duplicated
+ // all column data (notably ColumnString chars) and dominated group-commit
load memory.
+ // Columns are COW shared_ptrs, so swapping into a fresh Block is O(1) and
memory-safe: the
+ // queued block is a distinct object (consumers swap/mutate only that
object) while the
+ // underlying data stays alive via reference counting and is only read
downstream.
std::shared_ptr<Block> output_block = Block::create_shared();
- output_block->swap(cur_mutable_block->to_block());
+ output_block->swap(*block);
if (!_is_block_appended && state->num_rows_load_total() +
state->num_rows_load_unselected() +
state->num_rows_load_filtered()
<=
config::group_commit_memory_rows_for_max_filter_ratio) {
@@ -337,10 +336,10 @@ Status
GroupCommitBlockSinkOperatorX::sink_impl(RuntimeState* state, Block* inpu
};
auto rows = input_block->rows();
- auto bytes = input_block->bytes();
if (UNLIKELY(rows == 0)) {
return wind_up();
}
+ auto bytes = input_block->bytes();
// update incrementally so that FE can get the progress.
// the real 'num_rows_load_total' will be set when sink being closed.
@@ -362,11 +361,9 @@ Status
GroupCommitBlockSinkOperatorX::sink_impl(RuntimeState* state, Block* inpu
local_state._partitions.assign(rows, nullptr);
local_state._filter_bitmap.Reset(rows);
- for (int index = 0; index < rows; index++) {
- local_state._vpartition->find_partition(block.get(), index,
-
local_state._partitions[index]);
- }
for (int row_index = 0; row_index < rows; row_index++) {
+ local_state._vpartition->find_partition(block.get(), row_index,
+
local_state._partitions[row_index]);
if (local_state._partitions[row_index] == nullptr) [[unlikely]] {
local_state._filter_bitmap.Set(row_index, true);
LOG(WARNING) << "no partition for this tuple. tuple="
@@ -393,6 +390,8 @@ Status
GroupCommitBlockSinkOperatorX::sink_impl(RuntimeState* state, Block* inpu
local_state._has_filtered_rows) {
auto cloneBlock = block->clone_without_columns();
auto res_block =
MutableBlock::build_mutable_block(std::move(cloneBlock));
+ std::vector<uint32_t> rows_to_keep;
+ rows_to_keep.reserve(rows);
for (int i = 0; i < rows; ++i) {
if (local_state._block_convertor->filter_map()[i]) {
continue;
@@ -400,8 +399,10 @@ Status
GroupCommitBlockSinkOperatorX::sink_impl(RuntimeState* state, Block* inpu
if (local_state._filter_bitmap.Get(i)) {
continue;
}
- res_block.add_row(block.get(), i);
+ rows_to_keep.emplace_back(i);
}
+ RETURN_IF_ERROR(res_block.add_rows(block.get(),
rows_to_keep.data(),
+ rows_to_keep.data() +
rows_to_keep.size()));
block->swap(res_block.to_block());
}
// add block into block queue
diff --git a/be/src/load/group_commit/group_commit_mgr.cpp
b/be/src/load/group_commit/group_commit_mgr.cpp
index 238901888b6..930c518ae9a 100644
--- a/be/src/load/group_commit/group_commit_mgr.cpp
+++ b/be/src/load/group_commit/group_commit_mgr.cpp
@@ -29,9 +29,12 @@
#include "exec/pipeline/dependency.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
+#include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/thread_context.h"
#include "util/client_cache.h"
#include "util/debug_points.h"
#include "util/thrift_rpc_helper.h"
+#include "util/time.h"
namespace doris {
@@ -99,18 +102,21 @@ Status LoadBlockQueue::add_block(RuntimeState*
runtime_state, std::shared_ptr<Bl
_all_block_queues_bytes->load(std::memory_order_relaxed) >=
config::group_commit_queue_mem_limit) {
group_commit_block_by_memory_counter << 1;
- DCHECK(_load_ids_to_write_dep.find(load_id) !=
_load_ids_to_write_dep.end());
- _load_ids_to_write_dep[load_id]->block();
- VLOG_DEBUG << "block add_block for load_id=" << load_id
- << ", memory=" <<
_all_block_queues_bytes->load(std::memory_order_relaxed)
- << ". inner load_id=" << load_instance_id << ", label="
<< label;
+ auto dep_it = _load_ids_to_write_dep.find(load_id);
+ DCHECK(dep_it != _load_ids_to_write_dep.end());
+ if (dep_it != _load_ids_to_write_dep.end() && dep_it->second) {
+ dep_it->second->block();
+ VLOG_DEBUG << "block add_block for load_id=" << load_id << ",
memory="
+ <<
_all_block_queues_bytes->load(std::memory_order_relaxed)
+ << ". inner load_id=" << load_instance_id << ",
label=" << label;
+ }
}
}
- if (!_need_commit) {
+ if (!_need_commit.load()) {
if (_data_bytes >= _group_commit_data_bytes) {
VLOG_DEBUG << "group commit meets commit condition for data size,
label=" << label
<< ", instance_id=" << load_instance_id << ",
data_bytes=" << _data_bytes;
- _need_commit = true;
+ _need_commit.store(true);
data_size_condition = true;
}
if
(std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now()
-
@@ -118,7 +124,7 @@ Status LoadBlockQueue::add_block(RuntimeState*
runtime_state, std::shared_ptr<Bl
.count() >= _group_commit_interval_ms) {
VLOG_DEBUG << "group commit meets commit condition for time
interval, label=" << label
<< ", instance_id=" << load_instance_id << ",
data_bytes=" << _data_bytes;
- _need_commit = true;
+ _need_commit.store(true);
}
}
for (auto read_dep : _read_deps) {
@@ -141,11 +147,11 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, Block* block, bool
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _start_time)
.count();
- if (!_need_commit && duration >= _group_commit_interval_ms) {
- _need_commit = true;
+ if (!_need_commit.load() && duration >= _group_commit_interval_ms) {
+ _need_commit.store(true);
}
if (_block_queue.empty()) {
- if (_need_commit && duration >= 10 * _group_commit_interval_ms) {
+ if (_need_commit.load() && duration >= 10 * _group_commit_interval_ms)
{
auto last_print_duration =
std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - _last_print_time)
.count();
@@ -157,7 +163,7 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, Block* block, bool
}
}
VLOG_DEBUG << "get_block for inner load_id=" << load_instance_id << ",
but queue is empty";
- if (!_need_commit) {
+ if (!_need_commit.load()) {
get_block_dep->block();
VLOG_DEBUG << "block get_block for inner load_id=" <<
load_instance_id;
}
@@ -175,7 +181,7 @@ Status LoadBlockQueue::get_block(RuntimeState*
runtime_state, Block* block, bool
<< ". txn_id=" << txn_id << ", label=" << label
<< ", instance_id=" << load_instance_id << ", load_ids=" <<
_get_load_ids();
}
- if (_block_queue.empty() && _need_commit &&
_load_ids_to_write_dep.empty()) {
+ if (_block_queue.empty() && _need_commit.load() &&
_load_ids_to_write_dep.empty()) {
*eos = true;
} else {
*eos = false;
@@ -214,9 +220,12 @@ bool LoadBlockQueue::contain_load_id(const UniqueId&
load_id) {
Status LoadBlockQueue::add_load_id(const UniqueId& load_id,
const std::shared_ptr<Dependency>
put_block_dep) {
std::unique_lock l(mutex);
- if (_need_commit) {
- return Status::InternalError<false>("block queue is set need commit,
id=" +
- load_instance_id.to_string());
+ if (_need_commit.load() || !status.ok() || process_finish.load()) {
+ return Status::InternalError<false>(
+ "block queue cannot add load id, id=" +
load_instance_id.to_string() +
+ ", need_commit=" + (_need_commit.load() ? "true" : "false") +
+ ", process_finish=" + (process_finish.load() ? "true" :
"false") +
+ ", queue_status=" + status.to_string());
}
_load_ids_to_write_dep[load_id] = put_block_dep;
group_commit_load_count.fetch_add(1);
@@ -236,7 +245,7 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st)
{
Status::Cancelled("cancel group_commit, label=" + label + ",
status=" + st.to_string());
size_t before_block_queues_bytes = _all_block_queues_bytes->load();
while (!_block_queue.empty()) {
- const BlockData& block_data = _block_queue.front().block;
+ const BlockData& block_data = _block_queue.front();
_all_block_queues_bytes->fetch_sub(block_data.block_bytes,
std::memory_order_relaxed);
_block_queue.pop_front();
}
@@ -258,8 +267,7 @@ void LoadBlockQueue::_cancel_without_lock(const Status& st)
{
Status GroupCommitTable::get_first_block_load_queue(
int64_t table_id, int64_t base_schema_version, int64_t index_size,
const UniqueId& load_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
- std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<Dependency> create_plan_dep,
- std::shared_ptr<Dependency> put_block_dep) {
+ std::shared_ptr<Dependency> create_plan_dep,
std::shared_ptr<Dependency> put_block_dep) {
DCHECK(table_id == _table_id);
std::unique_lock l(_lock);
auto try_to_get_matched_queue = [&]() -> Status {
@@ -292,31 +300,131 @@ Status GroupCommitTable::get_first_block_load_queue(
return Status::OK();
}
create_plan_dep->block();
+ _create_plan_be_exe_version = be_exe_version;
+ if (_create_plan_deps.empty()) {
+ _create_plan_start_time_ms = MonotonicMillis();
+ }
_create_plan_deps.emplace(load_id, std::make_tuple(create_plan_dep,
put_block_dep,
base_schema_version,
index_size));
- if (!_is_creating_plan_fragment) {
- _is_creating_plan_fragment = true;
- RETURN_IF_ERROR(
- _thread_pool->submit_func([&, be_exe_version, mem_tracker, dep
= create_plan_dep] {
- Defer defer {[&, dep = dep]() {
- std::unique_lock l(_lock);
- for (auto it : _create_plan_deps) {
- std::get<0>(it.second)->set_ready();
+ [[maybe_unused]] auto submit_st = _submit_create_group_commit_load();
+ return try_to_get_matched_queue();
+}
+
+Status GroupCommitTable::submit_create_group_commit_load() {
+ std::unique_lock l(_lock);
+ if (_create_plan_deps.empty()) {
+ return Status::OK();
+ }
+ return _submit_create_group_commit_load();
+}
+
+Status GroupCommitTable::_submit_create_group_commit_load() {
+ if (_is_creating_plan_fragment) {
+ return Status::OK();
+ }
+
+ int64_t timeout_ms = config::group_commit_create_plan_timeout_ms;
+ if (timeout_ms > 0 && !_create_plan_deps.empty()) {
+ int64_t now_ms = MonotonicMillis();
+ if (_create_plan_start_time_ms > 0 && now_ms -
_create_plan_start_time_ms > timeout_ms) {
+ std::string last_create_plan_failed_reason =
_create_plan_failed_reason;
+ _create_plan_failed_reason =
+ ". group commit create plan timeout after " +
std::to_string(timeout_ms) + "ms";
+ if (!last_create_plan_failed_reason.empty()) {
+ _create_plan_failed_reason +=
+ ", last create plan error: " +
last_create_plan_failed_reason;
+ }
+ for (const auto& [id, load_info] : _create_plan_deps) {
+ std::get<0>(load_info)->set_ready();
+ }
+ _create_plan_deps.clear();
+ _create_plan_start_time_ms = 0;
+ return Status::OK();
+ }
+ }
+
+ auto mem_tracker = _group_commit_mgr->group_commit_mem_tracker();
+ int be_exe_version = _create_plan_be_exe_version;
+ _is_creating_plan_fragment = true;
+ auto submit_st = _thread_pool->submit_func([&, be_exe_version,
mem_tracker] {
+ std::shared_ptr<LoadBlockQueue> created_load_block_queue;
+ Status create_group_commit_st = Status::OK();
+ std::string create_plan_failed_reason;
+ Defer defer {[&]() {
+ bool need_resubmit = !create_group_commit_st.ok();
+ std::unique_lock l(_lock);
+ _is_creating_plan_fragment = false;
+ _create_plan_failed_reason = create_plan_failed_reason;
+ if (created_load_block_queue && create_group_commit_st.ok() &&
+ !created_load_block_queue->need_commit()) {
+ std::vector<UniqueId> success_load_ids;
+ for (const auto& [id, load_info] : _create_plan_deps) {
+ auto create_dep = std::get<0>(load_info);
+ auto put_dep = std::get<1>(load_info);
+ if (created_load_block_queue->schema_version ==
std::get<2>(load_info) &&
+ created_load_block_queue->index_size ==
std::get<3>(load_info)) {
+ auto st = created_load_block_queue->add_load_id(id,
put_dep);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to add pending load_id
into created "
+ "group commit queue, load_id="
+ << id << ", label=" <<
created_load_block_queue->label
+ << ", status=" << st.to_string();
+ need_resubmit = true;
+ } else {
+ create_dep->set_ready();
+ success_load_ids.emplace_back(id);
}
- _create_plan_deps.clear();
- _is_creating_plan_fragment = false;
- }};
- auto st = _create_group_commit_load(be_exe_version,
mem_tracker);
- if (!st.ok()) {
- LOG(WARNING) << "create group commit load error: " <<
st.to_string();
- _create_plan_failed_reason = ". create group commit
load error: " +
- st.to_string().substr(0,
300);
- } else {
- _create_plan_failed_reason = "";
+ } else if (created_load_block_queue->schema_version >
std::get<2>(load_info) ||
+ (created_load_block_queue->schema_version ==
+ std::get<2>(load_info) &&
+ created_load_block_queue->index_size !=
std::get<3>(load_info))) {
+ // schema version mismatch:
+ // 1. the schema version of created load block queue
is newer than the load request
+ // 2. the index size is not equal
+ // set ready for the load request to let it fail
+ create_dep->set_ready();
+ success_load_ids.emplace_back(id);
}
- }));
- }
- return try_to_get_matched_queue();
+ }
+ for (const auto& id : success_load_ids) {
+ _create_plan_deps.erase(id);
+ }
+ if (_create_plan_deps.empty()) {
+ _create_plan_start_time_ms = 0;
+ }
+ }
+ if (!_create_plan_deps.empty()) {
+ need_resubmit = true;
+ }
+ if (need_resubmit && _group_commit_mgr) {
+ LOG(INFO) << "resubmit create group commit load task for
table: " << _table_id;
+ _group_commit_mgr->add_need_create_plan_table(_table_id);
+ }
+ }};
+ create_group_commit_st =
+ _create_group_commit_load(be_exe_version, mem_tracker,
created_load_block_queue);
+ if (!create_group_commit_st.ok()) {
+ LOG(WARNING) << "create group commit load error: "
+ << create_group_commit_st.to_string();
+ create_plan_failed_reason = ". create group commit load error: " +
+
create_group_commit_st.to_string().substr(0, 300);
+ } else {
+ create_plan_failed_reason = "";
+ }
+ });
+ if (!submit_st.ok()) {
+ _is_creating_plan_fragment = false;
+ _create_plan_failed_reason =
+ ". create group commit load error: submit create group commit
load task failed: " +
+ submit_st.to_string().substr(0, 300);
+ for (const auto& [id, load_info] : _create_plan_deps) {
+ std::get<0>(load_info)->set_ready();
+ }
+ _create_plan_deps.clear();
+ LOG(WARNING) << "submit create group commit load task for table: " <<
_table_id
+ << ", error: " << submit_st.to_string();
+ }
+ return submit_st;
}
void GroupCommitTable::remove_load_id(const UniqueId& load_id) {
@@ -332,8 +440,9 @@ void GroupCommitTable::remove_load_id(const UniqueId&
load_id) {
}
}
-Status GroupCommitTable::_create_group_commit_load(int be_exe_version,
-
std::shared_ptr<MemTrackerLimiter> mem_tracker) {
+Status GroupCommitTable::_create_group_commit_load(
+ int be_exe_version, const std::shared_ptr<MemTrackerLimiter>&
mem_tracker,
+ std::shared_ptr<LoadBlockQueue>& created_load_block_queue) {
Status st = Status::OK();
TStreamLoadPutResult result;
std::string label;
@@ -405,6 +514,7 @@ Status GroupCommitTable::_create_group_commit_load(int
be_exe_version,
_db_id, _table_id, txn_id, label, _exec_env->wal_mgr(),
pipeline_params.fragment.output_sink.olap_table_sink.schema.slot_descs,
be_exe_version));
+ created_load_block_queue = load_block_queue;
std::unique_lock l(_lock);
_load_block_queues.emplace(instance_id, load_block_queue);
@@ -418,11 +528,23 @@ Status GroupCommitTable::_create_group_commit_load(int
be_exe_version,
create_dep->set_ready();
success_load_ids.emplace_back(id);
}
+ } else if (load_block_queue->schema_version >
std::get<2>(load_info) ||
+ (load_block_queue->schema_version ==
std::get<2>(load_info) &&
+ load_block_queue->index_size !=
std::get<3>(load_info))) {
+ // schema version mismatch:
+ // 1. the schema version of created load block queue is
newer than the load request
+ // 2. the index size is not equal
+ // set ready for the load request to let it fail
+ create_dep->set_ready();
+ success_load_ids.emplace_back(id);
}
}
for (const auto& id : success_load_ids) {
_create_plan_deps.erase(id);
}
+ if (_create_plan_deps.empty()) {
+ _create_plan_start_time_ms = 0;
+ }
}
}
st = _exec_plan_fragment(_db_id, _table_id, label, txn_id,
result.pipeline_params);
@@ -482,16 +604,25 @@ Status
GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
client->loadTxnCommit(result, request);
},
config::txn_commit_rpc_timeout_ms);
- result_status = Status::create(result.status);
- // DELETE_BITMAP_LOCK_ERROR will be retried
- if (result_status.ok() ||
!result_status.is<ErrorCode::DELETE_BITMAP_LOCK_ERROR>()) {
- break;
+ if (st.ok()) {
+ result_status = Status::create(result.status);
+ // DELETE_BITMAP_LOCK_ERROR will be retried
+ if (result_status.ok() ||
+ !result_status.is<ErrorCode::DELETE_BITMAP_LOCK_ERROR>()) {
+ break;
+ }
+ LOG_WARNING("Failed to commit txn on group commit")
+ .tag("label", label)
+ .tag("txn_id", txn_id)
+ .tag("retry_times", retry_times)
+ .error(result_status);
+ } else {
+ LOG_WARNING("Failed to commit txn on group commit")
+ .tag("label", label)
+ .tag("txn_id", txn_id)
+ .tag("retry_times", retry_times)
+ .error(st);
}
- LOG_WARNING("Failed to commit txn on group commit")
- .tag("label", label)
- .tag("txn_id", txn_id)
- .tag("retry_times", retry_times)
- .error(result_status);
retry_times++;
}
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.commit_success_and_rpc_error",
@@ -512,7 +643,9 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t
db_id, int64_t table_
[&request, &result](FrontendServiceConnection& client) {
client->loadTxnRollback(result, request);
});
- result_status = Status::create<false>(result.status);
+ if (st.ok()) {
+ result_status = Status::create<false>(result.status);
+ }
DBUG_EXECUTE_IF("LoadBlockQueue._finish_group_commit_load.err_status",
{
std ::string msg = "abort txn";
LOG(INFO) << "debug promise set: " << msg;
@@ -542,6 +675,10 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t
db_id, int64_t table_
}
_load_block_queues.erase(instance_id);
}
+ if (!load_block_queue) {
+ LOG(WARNING) << "finish group commit can not find load block queue,
label=" << label
+ << ", txn_id=" << txn_id << ", instance_id=" <<
print_id(instance_id);
+ }
// status: exec_plan_fragment result
// st: commit txn rpc status
// result_status: commit txn result
@@ -566,11 +703,15 @@ Status
GroupCommitTable::_finish_group_commit_load(int64_t db_id, int64_t table_
<< ", txn_id=" << txn_id << ", instance_id=" << print_id(instance_id)
<< ", exec_plan_fragment status=" << status.to_string()
<< ", commit/abort txn rpc status=" << st.to_string()
- << ", commit/abort txn status=" << result_status.to_string()
- << ", this group commit includes " <<
load_block_queue->group_commit_load_count << " loads"
- << ", flush because meet "
- << (load_block_queue->data_size_condition ? "data size " : "time ") <<
"condition"
- << ", wal space info:" <<
ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
+ << ", commit/abort txn status=" << result_status.to_string();
+ if (load_block_queue) {
+ ss << ", this group commit includes " <<
load_block_queue->group_commit_load_count
+ << " loads, flush because meet "
+ << (load_block_queue->data_size_condition ? "data size " : "time ")
<< "condition";
+ } else {
+ ss << ", load block queue is missing when finishing group commit";
+ }
+ ss << ", wal space info:" <<
ExecEnv::GetInstance()->wal_mgr()->get_wal_dirs_info_string();
if (state) {
if (!state->get_error_log_file_path().empty()) {
ss << ", error_url=" << state->get_error_log_file_path();
@@ -630,35 +771,95 @@ GroupCommitMgr::GroupCommitMgr(ExecEnv* exec_env) :
_exec_env(exec_env) {
.set_max_threads(config::group_commit_insert_threads)
.build(&_thread_pool));
_all_block_queues_bytes = std::make_shared<std::atomic_size_t>(0);
+ _group_commit_mem_tracker =
+ MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::LOAD,
"GroupCommit");
+ _create_plan_thread = std::thread(&GroupCommitMgr::_create_plan_worker,
this);
}
GroupCommitMgr::~GroupCommitMgr() {
+ stop();
LOG(INFO) << "GroupCommitMgr is destoried";
}
void GroupCommitMgr::stop() {
+ {
+ std::lock_guard<std::mutex> l(_need_create_plan_lock);
+ if (_stopped) {
+ return;
+ }
+ _stopped = true;
+ }
+ _need_create_plan_cv.notify_all();
+ if (_create_plan_thread.joinable()) {
+ _create_plan_thread.join();
+ }
_thread_pool->shutdown();
LOG(INFO) << "GroupCommitMgr is stopped";
}
-Status GroupCommitMgr::get_first_block_load_queue(
- int64_t db_id, int64_t table_id, int64_t base_schema_version, int64_t
index_size,
- const UniqueId& load_id, std::shared_ptr<LoadBlockQueue>&
load_block_queue,
- int be_exe_version, std::shared_ptr<MemTrackerLimiter> mem_tracker,
- std::shared_ptr<Dependency> create_plan_dep,
std::shared_ptr<Dependency> put_block_dep) {
+void GroupCommitMgr::add_need_create_plan_table(int64_t table_id) {
+ {
+ std::lock_guard<std::mutex> l(_need_create_plan_lock);
+ if (_stopped) {
+ return;
+ }
+ _need_create_plan_tables.insert(table_id);
+ }
+ _need_create_plan_cv.notify_one();
+}
+
+void GroupCommitMgr::_create_plan_worker() {
+ SCOPED_INIT_THREAD_CONTEXT();
+ while (true) {
+ std::set<int64_t> need_create_plan_tables;
+ {
+ std::unique_lock<std::mutex> l(_need_create_plan_lock);
+ _need_create_plan_cv.wait(
+ l, [this] { return _stopped ||
!_need_create_plan_tables.empty(); });
+ if (_stopped && _need_create_plan_tables.empty()) {
+ return;
+ }
+ need_create_plan_tables.swap(_need_create_plan_tables);
+ }
+ for (const auto table_id : need_create_plan_tables) {
+ std::shared_ptr<GroupCommitTable> group_commit_table;
+ {
+ std::lock_guard<std::mutex> l(_lock);
+ auto it = _table_map.find(table_id);
+ if (it == _table_map.end()) {
+ continue;
+ }
+ group_commit_table = it->second;
+ }
+ auto st = group_commit_table->submit_create_group_commit_load();
+ if (!st.ok()) {
+ LOG(WARNING) << "submit create group commit load task from
worker for table: "
+ << table_id << ", error: " << st.to_string();
+ }
+ }
+ }
+}
+
+Status GroupCommitMgr::get_first_block_load_queue(int64_t db_id, int64_t
table_id,
+ int64_t base_schema_version,
int64_t index_size,
+ const UniqueId& load_id,
+
std::shared_ptr<LoadBlockQueue>& load_block_queue,
+ int be_exe_version,
+ std::shared_ptr<Dependency>
create_plan_dep,
+ std::shared_ptr<Dependency>
put_block_dep) {
std::shared_ptr<GroupCommitTable> group_commit_table;
{
std::lock_guard wlock(_lock);
if (_table_map.find(table_id) == _table_map.end()) {
_table_map.emplace(table_id, std::make_shared<GroupCommitTable>(
_exec_env,
_thread_pool.get(), db_id, table_id,
- _all_block_queues_bytes));
+ _all_block_queues_bytes,
this));
}
group_commit_table = _table_map[table_id];
}
RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
table_id, base_schema_version, index_size, load_id,
load_block_queue, be_exe_version,
- mem_tracker, create_plan_dep, put_block_dep));
+ create_plan_dep, put_block_dep));
return Status::OK();
}
diff --git a/be/src/load/group_commit/group_commit_mgr.h
b/be/src/load/group_commit/group_commit_mgr.h
index bf892a4302c..5ab3831e4f5 100644
--- a/be/src/load/group_commit/group_commit_mgr.h
+++ b/be/src/load/group_commit/group_commit_mgr.h
@@ -25,7 +25,9 @@
#include <future>
#include <memory>
#include <mutex>
+#include <set>
#include <shared_mutex>
+#include <thread>
#include <unordered_map>
#include <utility>
@@ -38,6 +40,7 @@
namespace doris {
class ExecEnv;
+class GroupCommitMgr;
class TUniqueId;
class RuntimeState;
@@ -76,7 +79,7 @@ public:
Status add_load_id(const UniqueId& load_id, const
std::shared_ptr<Dependency> put_block_dep);
Status remove_load_id(const UniqueId& load_id);
void cancel(const Status& st);
- bool need_commit() { return _need_commit; }
+ bool need_commit() const { return _need_commit.load(); }
Status create_wal(int64_t db_id, int64_t tb_id, int64_t wal_id, const
std::string& import_label,
WalManager* wal_manager, std::vector<TSlotDescriptor>&
slot_desc,
@@ -95,8 +98,8 @@ public:
"wait_internal_group_commit_finish={}, data_size_condition={},
"
"group_commit_load_count={}, process_finish={},
_need_commit={}, schema_version={}",
load_instance_id.to_string(), label, txn_id,
wait_internal_group_commit_finish,
- data_size_condition, group_commit_load_count,
process_finish.load(), _need_commit,
- schema_version);
+ data_size_condition, group_commit_load_count,
process_finish.load(),
+ _need_commit.load(), schema_version);
return fmt::to_string(debug_string_buffer);
}
@@ -135,7 +138,7 @@ private:
std::shared_ptr<VWalWriter> _v_wal_writer;
// commit
- bool _need_commit = false;
+ std::atomic<bool> _need_commit = false;
// commit by time interval, can be changed by 'ALTER TABLE my_table SET
("group_commit_interval_ms"="1000");'
int64_t _group_commit_interval_ms;
std::chrono::steady_clock::time_point _start_time;
@@ -152,27 +155,31 @@ private:
class GroupCommitTable {
public:
GroupCommitTable(ExecEnv* exec_env, doris::ThreadPool* thread_pool,
int64_t db_id,
- int64_t table_id, std::shared_ptr<std::atomic_size_t>
all_block_queue_bytes)
+ int64_t table_id, std::shared_ptr<std::atomic_size_t>
all_block_queue_bytes,
+ GroupCommitMgr* group_commit_mgr)
: _exec_env(exec_env),
_thread_pool(thread_pool),
_all_block_queues_bytes(all_block_queue_bytes),
_db_id(db_id),
- _table_id(table_id) {};
+ _table_id(table_id),
+ _group_commit_mgr(group_commit_mgr) {};
Status get_first_block_load_queue(int64_t table_id, int64_t
base_schema_version,
int64_t index_size, const UniqueId&
load_id,
std::shared_ptr<LoadBlockQueue>&
load_block_queue,
int be_exe_version,
- std::shared_ptr<MemTrackerLimiter>
mem_tracker,
std::shared_ptr<Dependency>
create_plan_dep,
std::shared_ptr<Dependency>
put_block_dep);
Status get_load_block_queue(const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>&
load_block_queue,
std::shared_ptr<Dependency> get_block_dep);
void remove_load_id(const UniqueId& load_id);
+ Status submit_create_group_commit_load();
private:
+ Status _submit_create_group_commit_load();
Status _create_group_commit_load(int be_exe_version,
- std::shared_ptr<MemTrackerLimiter>
mem_tracker);
+ const std::shared_ptr<MemTrackerLimiter>&
mem_tracker,
+ std::shared_ptr<LoadBlockQueue>&
created_load_block_queue);
Status _exec_plan_fragment(int64_t db_id, int64_t table_id, const
std::string& label,
int64_t txn_id, const TPipelineFragmentParams&
pipeline_params);
Status _finish_group_commit_load(int64_t db_id, int64_t table_id, const
std::string& label,
@@ -186,6 +193,7 @@ private:
int64_t _db_id;
int64_t _table_id;
+ GroupCommitMgr* _group_commit_mgr = nullptr;
std::mutex _lock;
// fragment_instance_id to load_block_queue
@@ -196,6 +204,8 @@ private:
std::shared_ptr<Dependency>,
int64_t, int64_t>>
_create_plan_deps;
std::string _create_plan_failed_reason;
+ int _create_plan_be_exe_version = 0;
+ int64_t _create_plan_start_time_ms = 0;
};
class GroupCommitMgr {
@@ -213,14 +223,19 @@ public:
int64_t index_size, const UniqueId&
load_id,
std::shared_ptr<LoadBlockQueue>&
load_block_queue,
int be_exe_version,
- std::shared_ptr<MemTrackerLimiter>
mem_tracker,
std::shared_ptr<Dependency>
create_plan_dep,
std::shared_ptr<Dependency>
put_block_dep);
void remove_load_id(int64_t table_id, const UniqueId& load_id);
std::promise<Status> debug_promise;
std::future<Status> debug_future = debug_promise.get_future();
+ void add_need_create_plan_table(int64_t table_id);
+ std::shared_ptr<MemTrackerLimiter> group_commit_mem_tracker() {
+ return _group_commit_mem_tracker;
+ }
private:
+ void _create_plan_worker();
+
ExecEnv* _exec_env = nullptr;
std::unique_ptr<doris::ThreadPool> _thread_pool;
// memory consumption of all tables' load block queues, used for memory
back pressure.
@@ -229,6 +244,12 @@ private:
std::mutex _lock;
// TODO remove table when unused
std::unordered_map<int64_t, std::shared_ptr<GroupCommitTable>> _table_map;
+ std::mutex _need_create_plan_lock;
+ std::condition_variable _need_create_plan_cv;
+ std::set<int64_t> _need_create_plan_tables;
+ bool _stopped = false;
+ std::thread _create_plan_thread;
+ std::shared_ptr<MemTrackerLimiter> _group_commit_mem_tracker;
};
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/load/group_commit/wal/wal_table.cpp
b/be/src/load/group_commit/wal/wal_table.cpp
index 4c6c0e1edb5..5dfd54354c3 100644
--- a/be/src/load/group_commit/wal/wal_table.cpp
+++ b/be/src/load/group_commit/wal/wal_table.cpp
@@ -183,9 +183,14 @@ Status WalTable::_try_abort_txn(int64_t db_id,
std::string& label) {
[&request, &result](FrontendServiceConnection& client) {
client->loadTxnRollback(result, request);
});
- auto result_status = Status::create<false>(result.status);
- LOG(INFO) << "abort label " << label << ", st:" << st << ",
result_status:" << result_status;
- return result_status;
+ if (st.ok()) {
+ auto result_status = Status::create<false>(result.status);
+ LOG(INFO) << "abort label " << label << ", result_status:" <<
result_status;
+ return result_status;
+ } else {
+ LOG(WARNING) << "abort label " << label << ", rpc error:" << st;
+ return st;
+ }
}
Status WalTable::_replay_wal_internal(const std::string& wal) {
diff --git
a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
index cef9bbdbf27..f2797ff7a72 100644
---
a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
+++
b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
@@ -17,6 +17,50 @@
suite("test_group_commit_error", "nonConcurrent") {
def tableName = "test_group_commit_error"
+ def beConfigName = "group_commit_create_plan_timeout_ms"
+ def backendId_to_backendIP = [:]
+ def backendId_to_backendHttpPort = [:]
+ getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+ def get_be_config = { String backend_id, String key ->
+ def (code, out, err) =
show_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id))
+ logger.info("show config: code=" + code + ", out=" + out + ", err=" +
err)
+ assertEquals(code, 0)
+ def configList = parseJson(out.trim())
+ assert configList instanceof List
+ for (Object ele in (List) configList) {
+ assert ele instanceof List<String>
+ if (((List<String>) ele)[0] == key) {
+ return ((List<String>) ele)[2]
+ }
+ }
+ assertTrue(false, "Failed to find BE config: " + key)
+ }
+
+ def set_be_config = { String key, String value ->
+ for (String backend_id : backendId_to_backendIP.keySet()) {
+ def (code, out, err) =
update_be_config(backendId_to_backendIP.get(backend_id),
backendId_to_backendHttpPort.get(backend_id), key, value)
+ logger.info("update config: code=" + code + ", out=" + out + ",
err=" + err)
+ assertEquals(code, 0)
+ }
+ }
+
+ def originBeConfig = [:]
+ for (String backend_id : backendId_to_backendIP.keySet()) {
+ originBeConfig[backend_id] = get_be_config(backend_id, beConfigName)
+ }
+
+ onFinish {
+ for (String backend_id : originBeConfig.keySet()) {
+ def (code, out, err) =
update_be_config(backendId_to_backendIP.get(backend_id),
+ backendId_to_backendHttpPort.get(backend_id),
beConfigName, originBeConfig[backend_id])
+ logger.info("restore config: code=" + code + ", out=" + out + ",
err=" + err)
+ }
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ GetDebugPoint().clearDebugPointsForAllFEs()
+ }
+
+ set_be_config(beConfigName, "20000")
sql """ DROP TABLE IF EXISTS ${tableName} """
sql """
@@ -104,4 +148,4 @@ suite("test_group_commit_error", "nonConcurrent") {
} finally {
GetDebugPoint().clearDebugPointsForAllBEs()
}
-}
\ No newline at end of file
+}
diff --git
a/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load_high_concurrency_async.groovy
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load_high_concurrency_async.groovy
new file mode 100644
index 00000000000..237d7fc649c
--- /dev/null
+++
b/regression-test/suites/load_p0/stream_load/test_group_commit_stream_load_high_concurrency_async.groovy
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.awaitility.Awaitility
+
+import static java.util.concurrent.TimeUnit.SECONDS
+
+import java.util.Collections
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.atomic.AtomicInteger
+
+suite("test_group_commit_stream_load_high_concurrency_async", "p0") {
+ def tableName = "test_group_commit_stream_load_high_concurrency_async"
+ int concurrentClients = 20
+ int loadsPerClient = 20
+ int expectedRows = concurrentClients * loadsPerClient
+ def errors = Collections.synchronizedList(new ArrayList<String>())
+ def stopRequested = new AtomicBoolean(false)
+ def successLoads = new AtomicInteger(0)
+ def getProperty = { property, userName ->
+ def result = sql_return_maparray """SHOW PROPERTY FOR '${userName}'"""
+ result.find {
+ it.Key == property as String
+ }
+ }
+ def originMaxUserConnections = getProperty("max_user_connections",
"root").Value as long
+
+ def waitRowCount = { expected ->
+ Awaitility.await().atMost(300, SECONDS).pollInterval(2,
SECONDS).until({
+ def result = sql "select count(*) from ${tableName}"
+ logger.info("table: ${tableName}, rowCount: ${result}, expected:
${expected}")
+ return result[0][0] == expected
+ })
+ }
+
+ def checkStreamLoadResult = { loadId, result, exception ->
+ if (exception != null) {
+ stopRequested.set(true)
+ errors.add("load ${loadId} exception: ${exception.getMessage()}")
+ return
+ }
+ def json = parseJson(result)
+ if (!"success".equalsIgnoreCase(json.Status?.toString())) {
+ stopRequested.set(true)
+ errors.add("load ${loadId} status=${json.Status},
msg=${json.Message}")
+ return
+ }
+ if (json.GroupCommit != true) {
+ stopRequested.set(true)
+ errors.add("load ${loadId} is not group commit: ${result}")
+ return
+ }
+ if (json.NumberTotalRows != 1 || json.NumberLoadedRows != 1 ||
+ json.NumberFilteredRows != 0 || json.NumberUnselectedRows != 0) {
+ stopRequested.set(true)
+ errors.add("load ${loadId} unexpected counters: ${result}")
+ return
+ }
+ successLoads.incrementAndGet()
+ }
+
+ try {
+ sql """SET PROPERTY FOR 'root' 'max_user_connections' = '1024'"""
+ sql """ DROP TABLE IF EXISTS ${tableName} """
+ sql """
+ CREATE TABLE ${tableName} (
+ id BIGINT NOT NULL,
+ name STRING NULL
+ ) ENGINE=OLAP
+ DUPLICATE KEY(id)
+ DISTRIBUTED BY HASH(id) BUCKETS 1
+ PROPERTIES (
+ "replication_num" = "1",
+ "group_commit_interval_ms" = "5"
+ );
+ """
+
+ def threads = []
+ for (int client = 0; client < concurrentClients; client++) {
+ int clientId = client
+
threads.add(Thread.startDaemon("group-commit-stream-load-${clientId}") {
+ for (int round = 0; round < loadsPerClient; round++) {
+ if (stopRequested.get()) {
+ break
+ }
+ long loadId = clientId * loadsPerClient + round
+ try {
+ streamLoad {
+ table "${tableName}"
+ set 'column_separator', ','
+ set 'group_commit', 'async_mode'
+ unset 'label'
+ inputText "${loadId},name_${loadId}\n"
+ time 60000
+
+ check { result, exception, startTime, endTime ->
+ checkStreamLoadResult(loadId, result,
exception)
+ }
+ }
+ } catch (Exception e) {
+ stopRequested.set(true)
+ errors.add("load ${loadId} streamLoad throws:
${e.getMessage()}")
+ break
+ }
+ }
+ })
+ }
+
+ threads.each { it.join() }
+
+ assertTrue(errors.isEmpty(),
+ "group commit stream load failures: " +
+ errors.subList(0, Math.min(errors.size(), 10)))
+ assertEquals(expectedRows, successLoads.get())
+
+ sql "sync"
+ waitRowCount(expectedRows)
+
+ def result = sql "select count(*) from ${tableName}"
+ assertEquals(expectedRows, result[0][0])
+ } finally {
+ sql """SET PROPERTY FOR 'root' 'max_user_connections' =
'${originMaxUserConnections}'"""
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]