This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 89857d3780b [cherry-pick](branch-2.1) Pick "Use async group commit rpc
call (#36499)" (#37380)
89857d3780b is described below
commit 89857d3780b9a4c952dbac2532458a5fd8edddc6
Author: abmdocrt <[email protected]>
AuthorDate: Sun Jul 7 18:28:19 2024 +0800
[cherry-pick](branch-2.1) Pick "Use async group commit rpc call (#36499)"
(#37380)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
Pick #36499
---
be/src/service/internal_service.cpp | 19 +++++--------------
1 file changed, 5 insertions(+), 14 deletions(-)
diff --git a/be/src/service/internal_service.cpp
b/be/src/service/internal_service.cpp
index 466eaa03cb5..c6bedc630e8 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -2222,23 +2222,19 @@ void
PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController*
ctx->pipe = pipe;
Status st = _exec_env->new_load_stream_mgr()->put(load_id, ctx);
if (st.ok()) {
- std::mutex mutex;
- std::condition_variable cv;
- bool handled = false;
try {
st = _exec_plan_fragment_impl(
request->exec_plan_fragment_request().request(),
request->exec_plan_fragment_request().version(),
request->exec_plan_fragment_request().compact(),
- [&](RuntimeState* state, Status* status) {
+ [&, response, done, load_id](RuntimeState* state,
Status* status) {
+ brpc::ClosureGuard cb_closure_guard(done);
response->set_label(state->import_label());
response->set_txn_id(state->wal_id());
response->set_loaded_rows(state->num_rows_load_success());
response->set_filtered_rows(state->num_rows_load_filtered());
- st = *status;
- std::unique_lock l(mutex);
- handled = true;
- cv.notify_one();
+ status->to_protobuf(response->mutable_status());
+ _exec_env->new_load_stream_mgr()->remove(load_id);
});
} catch (const Exception& e) {
st = e.to_status();
@@ -2249,6 +2245,7 @@ void
PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController*
if (!st.ok()) {
LOG(WARNING) << "exec plan fragment failed, errmsg=" << st;
} else {
+ closure_guard.release();
for (int i = 0; i < request->data().size(); ++i) {
std::unique_ptr<PDataRow> row(new PDataRow());
row->CopyFrom(request->data(i));
@@ -2259,15 +2256,9 @@ void
PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController*
}
if (st.ok()) {
static_cast<void>(pipe->finish());
- std::unique_lock l(mutex);
- if (!handled) {
- cv.wait(l);
- }
}
}
}
- st.to_protobuf(response->mutable_status());
- _exec_env->new_load_stream_mgr()->remove(load_id);
});
if (!ret) {
_exec_env->new_load_stream_mgr()->remove(load_id);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]