This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bf15b9788 [fix](load) fix race condition problem when insert 
commitinfo (#20823)
4bf15b9788 is described below

commit 4bf15b978841e2cb37cdf3c440f8f4f5f0706803
Author: zhengyu <[email protected]>
AuthorDate: Thu Jun 15 09:53:32 2023 +0800

    [fix](load) fix race condition problem when insert commitinfo (#20823)
    
    Signed-off-by: freemandealer <[email protected]>
---
 be/src/io/fs/multi_table_pipe.cpp | 15 ++++++++-------
 be/src/io/fs/multi_table_pipe.h   |  9 +++++----
 2 files changed, 13 insertions(+), 11 deletions(-)

diff --git a/be/src/io/fs/multi_table_pipe.cpp 
b/be/src/io/fs/multi_table_pipe.cpp
index 773188118d..22884735fd 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -99,7 +99,6 @@ Status MultiTablePipe::dispatch(const std::string& table, 
const char* data, size
     auto iter = _planned_pipes.find(table);
     if (iter != _planned_pipes.end()) {
         pipe = iter->second;
-        LOG(INFO) << "dispatch for planned pipe: " << pipe.get();
         RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size),
                                        "append failed in planned kafka pipe");
     } else {
@@ -111,7 +110,6 @@ Status MultiTablePipe::dispatch(const std::string& table, 
const char* data, size
         } else {
             pipe = iter->second;
         }
-        LOG(INFO) << "dispatch for unplanned pipe: " << pipe.get();
         RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size),
                                        "append failed in unplanned kafka 
pipe");
 
@@ -187,9 +185,9 @@ Status MultiTablePipe::request_and_exec_plans() {
                              _ctx->multi_table_put_result.params.size());
     _unplanned_pipes.clear();
 
+    _inflight_plan_cnt += _ctx->multi_table_put_result.params.size();
     for (auto& plan : _ctx->multi_table_put_result.params) {
         // TODO: use pipeline in the future (currently is buggy for load)
-        ++_inflight_plan_cnt;
         DCHECK_EQ(plan.__isset.table_name, true);
         DCHECK(_planned_pipes.find(plan.table_name) != _planned_pipes.end());
         putPipe(plan.params.fragment_instance_id, 
_planned_pipes[plan.table_name]);
@@ -197,10 +195,12 @@ Status MultiTablePipe::request_and_exec_plans() {
                   << " table=" << plan.table_name;
         exec_env->fragment_mgr()->exec_plan_fragment(plan, 
[this](RuntimeState* state,
                                                                   Status* 
status) {
-            --_inflight_plan_cnt;
-            _tablet_commit_infos.insert(_tablet_commit_infos.end(),
-                                        state->tablet_commit_infos().begin(),
-                                        state->tablet_commit_infos().end());
+            {
+                std::lock_guard<std::mutex> l(_tablet_commit_infos_lock);
+                _tablet_commit_infos.insert(_tablet_commit_infos.end(),
+                                            
state->tablet_commit_infos().begin(),
+                                            
state->tablet_commit_infos().end());
+            }
             _number_total_rows += state->num_rows_load_total();
             _number_loaded_rows += state->num_rows_load_success();
             _number_filtered_rows += state->num_rows_load_filtered();
@@ -224,6 +224,7 @@ Status MultiTablePipe::request_and_exec_plans() {
                 _status = *status;
             }
 
+            --_inflight_plan_cnt;
             if (_inflight_plan_cnt == 0 && is_consume_finished()) {
                 _ctx->number_total_rows = _number_total_rows;
                 _ctx->number_loaded_rows = _number_loaded_rows;
diff --git a/be/src/io/fs/multi_table_pipe.h b/be/src/io/fs/multi_table_pipe.h
index cdf8db052a..b54e39a5a3 100644
--- a/be/src/io/fs/multi_table_pipe.h
+++ b/be/src/io/fs/multi_table_pipe.h
@@ -77,11 +77,12 @@ private:
     std::shared_ptr<StreamLoadContext> _ctx;
     Status _status; // save the first error status of all executing plan 
fragment
 #ifndef BE_TEST
+    std::mutex _tablet_commit_infos_lock;
     std::vector<TTabletCommitInfo> _tablet_commit_infos; // collect from each 
plan fragment
-    int64_t _number_total_rows = 0;
-    int64_t _number_loaded_rows = 0;
-    int64_t _number_filtered_rows = 0;
-    int64_t _number_unselected_rows = 0;
+    std::atomic<int64_t> _number_total_rows {0};
+    std::atomic<int64_t> _number_loaded_rows {0};
+    std::atomic<int64_t> _number_filtered_rows {0};
+    std::atomic<int64_t> _number_unselected_rows {0};
 #endif
     std::mutex _pipe_map_lock;
     std::unordered_map<TUniqueId /*instance id*/, 
std::shared_ptr<io::StreamLoadPipe>> _pipe_map;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to