This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4bf15b9788 [fix](load) fix race condition problem when insert
commitinfo (#20823)
4bf15b9788 is described below
commit 4bf15b978841e2cb37cdf3c440f8f4f5f0706803
Author: zhengyu <[email protected]>
AuthorDate: Thu Jun 15 09:53:32 2023 +0800
[fix](load) fix race condition problem when insert commitinfo (#20823)
Signed-off-by: freemandealer <[email protected]>
---
be/src/io/fs/multi_table_pipe.cpp | 15 ++++++++-------
be/src/io/fs/multi_table_pipe.h | 9 +++++----
2 files changed, 13 insertions(+), 11 deletions(-)
diff --git a/be/src/io/fs/multi_table_pipe.cpp
b/be/src/io/fs/multi_table_pipe.cpp
index 773188118d..22884735fd 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -99,7 +99,6 @@ Status MultiTablePipe::dispatch(const std::string& table,
const char* data, size
auto iter = _planned_pipes.find(table);
if (iter != _planned_pipes.end()) {
pipe = iter->second;
- LOG(INFO) << "dispatch for planned pipe: " << pipe.get();
RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size),
"append failed in planned kafka pipe");
} else {
@@ -111,7 +110,6 @@ Status MultiTablePipe::dispatch(const std::string& table,
const char* data, size
} else {
pipe = iter->second;
}
- LOG(INFO) << "dispatch for unplanned pipe: " << pipe.get();
RETURN_NOT_OK_STATUS_WITH_WARN((pipe.get()->*cb)(data, size),
"append failed in unplanned kafka
pipe");
@@ -187,9 +185,9 @@ Status MultiTablePipe::request_and_exec_plans() {
_ctx->multi_table_put_result.params.size());
_unplanned_pipes.clear();
+ _inflight_plan_cnt += _ctx->multi_table_put_result.params.size();
for (auto& plan : _ctx->multi_table_put_result.params) {
// TODO: use pipeline in the future (currently is buggy for load)
- ++_inflight_plan_cnt;
DCHECK_EQ(plan.__isset.table_name, true);
DCHECK(_planned_pipes.find(plan.table_name) != _planned_pipes.end());
putPipe(plan.params.fragment_instance_id,
_planned_pipes[plan.table_name]);
@@ -197,10 +195,12 @@ Status MultiTablePipe::request_and_exec_plans() {
<< " table=" << plan.table_name;
exec_env->fragment_mgr()->exec_plan_fragment(plan,
[this](RuntimeState* state,
Status*
status) {
- --_inflight_plan_cnt;
- _tablet_commit_infos.insert(_tablet_commit_infos.end(),
- state->tablet_commit_infos().begin(),
- state->tablet_commit_infos().end());
+ {
+ std::lock_guard<std::mutex> l(_tablet_commit_infos_lock);
+ _tablet_commit_infos.insert(_tablet_commit_infos.end(),
+
state->tablet_commit_infos().begin(),
+
state->tablet_commit_infos().end());
+ }
_number_total_rows += state->num_rows_load_total();
_number_loaded_rows += state->num_rows_load_success();
_number_filtered_rows += state->num_rows_load_filtered();
@@ -224,6 +224,7 @@ Status MultiTablePipe::request_and_exec_plans() {
_status = *status;
}
+ --_inflight_plan_cnt;
if (_inflight_plan_cnt == 0 && is_consume_finished()) {
_ctx->number_total_rows = _number_total_rows;
_ctx->number_loaded_rows = _number_loaded_rows;
diff --git a/be/src/io/fs/multi_table_pipe.h b/be/src/io/fs/multi_table_pipe.h
index cdf8db052a..b54e39a5a3 100644
--- a/be/src/io/fs/multi_table_pipe.h
+++ b/be/src/io/fs/multi_table_pipe.h
@@ -77,11 +77,12 @@ private:
std::shared_ptr<StreamLoadContext> _ctx;
Status _status; // save the first error status of all executing plan
fragment
#ifndef BE_TEST
+ std::mutex _tablet_commit_infos_lock;
std::vector<TTabletCommitInfo> _tablet_commit_infos; // collect from each
plan fragment
- int64_t _number_total_rows = 0;
- int64_t _number_loaded_rows = 0;
- int64_t _number_filtered_rows = 0;
- int64_t _number_unselected_rows = 0;
+ std::atomic<int64_t> _number_total_rows {0};
+ std::atomic<int64_t> _number_loaded_rows {0};
+ std::atomic<int64_t> _number_filtered_rows {0};
+ std::atomic<int64_t> _number_unselected_rows {0};
#endif
std::mutex _pipe_map_lock;
std::unordered_map<TUniqueId /*instance id*/,
std::shared_ptr<io::StreamLoadPipe>> _pipe_map;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]