This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 89857d3780b [cherry-pick](branch-2.1) Pick "Use async group commit rpc 
call (#36499)" (#37380)
89857d3780b is described below

commit 89857d3780b9a4c952dbac2532458a5fd8edddc6
Author: abmdocrt <[email protected]>
AuthorDate: Sun Jul 7 18:28:19 2024 +0800

    [cherry-pick](branch-2.1) Pick "Use async group commit rpc call (#36499)" 
(#37380)
    
    ## Proposed changes
    
    Issue Number: close #xxx
    
    <!--Describe your changes.-->
    
    Pick #36499
---
 be/src/service/internal_service.cpp | 19 +++++--------------
 1 file changed, 5 insertions(+), 14 deletions(-)

diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index 466eaa03cb5..c6bedc630e8 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -2222,23 +2222,19 @@ void 
PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController*
         ctx->pipe = pipe;
         Status st = _exec_env->new_load_stream_mgr()->put(load_id, ctx);
         if (st.ok()) {
-            std::mutex mutex;
-            std::condition_variable cv;
-            bool handled = false;
             try {
                 st = _exec_plan_fragment_impl(
                         request->exec_plan_fragment_request().request(),
                         request->exec_plan_fragment_request().version(),
                         request->exec_plan_fragment_request().compact(),
-                        [&](RuntimeState* state, Status* status) {
+                        [&, response, done, load_id](RuntimeState* state, 
Status* status) {
+                            brpc::ClosureGuard cb_closure_guard(done);
                             response->set_label(state->import_label());
                             response->set_txn_id(state->wal_id());
                             
response->set_loaded_rows(state->num_rows_load_success());
                             
response->set_filtered_rows(state->num_rows_load_filtered());
-                            st = *status;
-                            std::unique_lock l(mutex);
-                            handled = true;
-                            cv.notify_one();
+                            status->to_protobuf(response->mutable_status());
+                            _exec_env->new_load_stream_mgr()->remove(load_id);
                         });
             } catch (const Exception& e) {
                 st = e.to_status();
@@ -2249,6 +2245,7 @@ void 
PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController*
             if (!st.ok()) {
                 LOG(WARNING) << "exec plan fragment failed, errmsg=" << st;
             } else {
+                closure_guard.release();
                 for (int i = 0; i < request->data().size(); ++i) {
                     std::unique_ptr<PDataRow> row(new PDataRow());
                     row->CopyFrom(request->data(i));
@@ -2259,15 +2256,9 @@ void 
PInternalServiceImpl::group_commit_insert(google::protobuf::RpcController*
                 }
                 if (st.ok()) {
                     static_cast<void>(pipe->finish());
-                    std::unique_lock l(mutex);
-                    if (!handled) {
-                        cv.wait(l);
-                    }
                 }
             }
         }
-        st.to_protobuf(response->mutable_status());
-        _exec_env->new_load_stream_mgr()->remove(load_id);
     });
     if (!ret) {
         _exec_env->new_load_stream_mgr()->remove(load_id);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to