This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new eea36767912 [fix](group commit) fix group commit insert rpc may stuck
(#39391) (#39458)
eea36767912 is described below
commit eea36767912b8174962df8465c431e0a017c067e
Author: meiyi <[email protected]>
AuthorDate: Fri Aug 16 13:19:00 2024 +0800
[fix](group commit) fix group commit insert rpc may stuck (#39391) (#39458)
pick https://github.com/apache/doris/pull/39391
---
be/src/runtime/fragment_mgr.cpp | 2 ++
be/src/runtime/group_commit_mgr.cpp | 2 ++
be/src/service/internal_service.cpp | 23 ++++++++++++++++++---
.../group_commit/test_group_commit_error.groovy | 24 ++++++++++++++++++++++
4 files changed, 48 insertions(+), 3 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 8ceeff10481..c61bb82df75 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -620,6 +620,8 @@ void FragmentMgr::remove_pipeline_context(
template <typename Params>
Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id,
bool pipeline,
std::shared_ptr<QueryContext>& query_ctx) {
+ DBUG_EXECUTE_IF("FragmentMgr._get_query_ctx.failed",
+ { return
Status::InternalError("FragmentMgr._get_query_ctx.failed"); });
if (params.is_simplified_param) {
// Get common components from _query_ctx_map
std::lock_guard<std::mutex> lock(_lock);
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index d97b268fc27..a5bf52d2ca7 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -35,6 +35,8 @@ namespace doris {
Status LoadBlockQueue::add_block(RuntimeState* runtime_state,
std::shared_ptr<vectorized::Block> block,
bool write_wal) {
+ DBUG_EXECUTE_IF("LoadBlockQueue.add_block.failed",
+ { return
Status::InternalError("LoadBlockQueue.add_block.failed"); });
std::unique_lock l(mutex);
RETURN_IF_ERROR(status);
auto start = std::chrono::steady_clock::now();
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 8ab2b06a805..013af8e8030 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -2229,7 +2229,10 @@ void
PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController*
TUniqueId load_id;
load_id.__set_hi(request->load_id().hi());
load_id.__set_lo(request->load_id().lo());
- bool ret = _light_work_pool.try_offer([this, request, response, done,
load_id]() {
+ std::shared_ptr<std::mutex> lock = std::make_shared<std::mutex>();
+ std::shared_ptr<bool> is_done = std::make_shared<bool>(false);
+ bool ret = _light_work_pool.try_offer([this, request, response, done,
load_id, lock,
+ is_done]() {
brpc::ClosureGuard closure_guard(done);
std::shared_ptr<StreamLoadContext> ctx =
std::make_shared<StreamLoadContext>(_exec_env);
auto pipe = std::make_shared<io::StreamLoadPipe>(
@@ -2243,7 +2246,13 @@ void
PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController*
request->exec_plan_fragment_request().request(),
request->exec_plan_fragment_request().version(),
request->exec_plan_fragment_request().compact(),
- [&, response, done, load_id](RuntimeState* state,
Status* status) {
+ [&, response, done, load_id, lock,
is_done](RuntimeState* state,
+ Status*
status) {
+ std::lock_guard<std::mutex> lock1(*lock);
+ if (*is_done) {
+ return;
+ }
+ *is_done = true;
brpc::ClosureGuard cb_closure_guard(done);
response->set_label(state->import_label());
response->set_txn_id(state->wal_id());
@@ -2262,11 +2271,19 @@ void
PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController*
st = Status::Error(ErrorCode::INTERNAL_ERROR,
"_exec_plan_fragment_impl meet unknown
error");
}
- closure_guard.release();
if (!st.ok()) {
LOG(WARNING) << "exec plan fragment failed, load_id=" <<
print_id(load_id)
<< ", errmsg=" << st;
+ std::lock_guard<std::mutex> lock1(*lock);
+ if (*is_done) {
+ closure_guard.release();
+ } else {
+ *is_done = true;
+ st.to_protobuf(response->mutable_status());
+ _exec_env->new_load_stream_mgr()->remove(load_id);
+ }
} else {
+ closure_guard.release();
for (int i = 0; i < request->data().size(); ++i) {
std::unique_ptr<PDataRow> row(new PDataRow());
row->CopyFrom(request->data(i));
diff --git
a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
index 749da71117b..c7f30dfea36 100644
---
a/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
+++
b/regression-test/suites/insert_p0/group_commit/test_group_commit_error.groovy
@@ -52,4 +52,28 @@ suite("test_group_commit_error", "nonConcurrent") {
} finally {
GetDebugPoint().clearDebugPointsForAllBEs()
}
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("FragmentMgr._get_query_ctx.failed")
+ sql """ set group_commit = async_mode """
+ sql """ set enable_nereids_planner = false """
+ sql """ insert into ${tableName} values (3, 3) """
+ assertTrue(false)
+ } catch (Exception e) {
+ logger.info("failed: " + e.getMessage())
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("LoadBlockQueue.add_block.failed")
+ sql """ set group_commit = async_mode """
+ sql """ set enable_nereids_planner = false """
+ sql """ insert into ${tableName} values (4, 4) """
+ assertTrue(false)
+ } catch (Exception e) {
+ logger.info("failed: " + e.getMessage())
+ } finally {
+ GetDebugPoint().clearDebugPointsForAllBEs()
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]