This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new acd59d27ccf [pipeline](load) Not blocking in execution threads (#36291)
acd59d27ccf is described below
commit acd59d27ccffb02cc12d9bd02ab766e29bbad267
Author: Gabriel <[email protected]>
AuthorDate: Tue Jun 18 18:54:35 2024 +0800
[pipeline](load) Not blocking in execution threads (#36291)
In group-commit loading tasks, we will wait for a condition variables in
execution threads before finishing. This is harmful for pipeline engine.
---
.../exec/group_commit_block_sink_operator.cpp | 67 ++++++++++++++--------
.../exec/group_commit_block_sink_operator.h | 9 ++-
be/src/runtime/group_commit_mgr.cpp | 14 ++++-
be/src/runtime/group_commit_mgr.h | 21 ++++++-
4 files changed, 83 insertions(+), 28 deletions(-)
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
index 50b314a7dc1..99fd6ef20ab 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.cpp
@@ -64,40 +64,27 @@ Status GroupCommitBlockSinkLocalState::close(RuntimeState*
state, Status close_s
SCOPED_TIMER(_close_timer);
RETURN_IF_ERROR(Base::close(state, close_status));
RETURN_IF_ERROR(close_status);
- int64_t total_rows = state->num_rows_load_total();
- int64_t loaded_rows = state->num_rows_load_total();
- state->set_num_rows_load_total(loaded_rows +
state->num_rows_load_unselected() +
- state->num_rows_load_filtered());
- state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows()
+ total_rows -
- loaded_rows);
- auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
- if (!_is_block_appended) {
- // if not meet the max_filter_ratio, we should return error status
directly
- int64_t num_selected_rows =
- state->num_rows_load_total() -
state->num_rows_load_unselected();
- if (num_selected_rows > 0 &&
- (double)state->num_rows_load_filtered() / num_selected_rows >
p._max_filter_ratio) {
- return Status::DataQualityError("too many filtered rows");
- }
- RETURN_IF_ERROR(_add_blocks(state, true));
- }
- if (_load_block_queue) {
- _remove_estimated_wal_bytes();
- _load_block_queue->remove_load_id(p._load_id);
- }
// wait to wal
auto st = Status::OK();
if (_load_block_queue &&
(_load_block_queue->wait_internal_group_commit_finish ||
_group_commit_mode ==
TGroupCommitMode::SYNC_MODE)) {
std::unique_lock l(_load_block_queue->mutex);
if (!_load_block_queue->process_finish) {
- _load_block_queue->internal_group_commit_finish_cv.wait(l);
+ return Status::InternalError("_load_block_queue is not finished!");
}
st = _load_block_queue->status;
}
return st;
}
+std::string GroupCommitBlockSinkLocalState::debug_string(int
indentation_level) const {
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer, "{}",
Base::debug_string(indentation_level));
+ fmt::format_to(debug_string_buffer, ", _load_block_queue: ({})",
+ _load_block_queue ? _load_block_queue->debug_string() :
"NULL");
+ return fmt::to_string(debug_string_buffer);
+}
+
Status GroupCommitBlockSinkLocalState::_add_block(RuntimeState* state,
std::shared_ptr<vectorized::Block> block) {
if (block->rows() == 0) {
@@ -200,6 +187,10 @@ Status
GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state,
_estimated_wal_bytes = estimated_wal_bytes;
}
}
+ if (_load_block_queue->wait_internal_group_commit_finish ||
+ _group_commit_mode == TGroupCommitMode::SYNC_MODE) {
+ _load_block_queue->append_dependency(_finish_dependency);
+ }
_state->set_import_label(_load_block_queue->label);
_state->set_wal_id(_load_block_queue->txn_id);
} else {
@@ -233,6 +224,7 @@ Status
GroupCommitBlockSinkLocalState::_add_blocks(RuntimeState* state,
}
Status GroupCommitBlockSinkOperatorX::init(const TDataSink& t_sink) {
+ RETURN_IF_ERROR(Base::init(t_sink));
DCHECK(t_sink.__isset.olap_table_sink);
auto& table_sink = t_sink.olap_table_sink;
_tuple_desc_id = table_sink.tuple_id;
@@ -274,10 +266,35 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState*
state, vectorized::Bloc
SCOPED_CONSUME_MEM_TRACKER(local_state._mem_tracker.get());
Status status = Status::OK();
+ auto wind_up = [&]() -> Status {
+ if (eos) {
+ int64_t total_rows = state->num_rows_load_total();
+ int64_t loaded_rows = state->num_rows_load_total();
+ state->set_num_rows_load_total(loaded_rows +
state->num_rows_load_unselected() +
+ state->num_rows_load_filtered());
+
state->update_num_rows_load_filtered(local_state._block_convertor->num_filtered_rows()
+
+ total_rows - loaded_rows);
+ if (!local_state._is_block_appended) {
+ // if not meet the max_filter_ratio, we should return error
status directly
+ int64_t num_selected_rows =
+ state->num_rows_load_total() -
state->num_rows_load_unselected();
+ if (num_selected_rows > 0 &&
+ (double)state->num_rows_load_filtered() /
num_selected_rows >
+ _max_filter_ratio) {
+ return Status::DataQualityError("too many filtered rows");
+ }
+ RETURN_IF_ERROR(local_state._add_blocks(state, true));
+ }
+ local_state._remove_estimated_wal_bytes();
+ local_state._load_block_queue->remove_load_id(_load_id);
+ }
+ return Status::OK();
+ };
+
auto rows = input_block->rows();
auto bytes = input_block->bytes();
if (UNLIKELY(rows == 0)) {
- return status;
+ return wind_up();
}
// update incrementally so that FE can get the progress.
@@ -324,7 +341,9 @@ Status GroupCommitBlockSinkOperatorX::sink(RuntimeState*
state, vectorized::Bloc
block->swap(res_block.to_block());
}
// add block into block queue
- return local_state._add_block(state, block);
+ RETURN_IF_ERROR(local_state._add_block(state, block));
+
+ return wind_up();
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/group_commit_block_sink_operator.h
b/be/src/pipeline/exec/group_commit_block_sink_operator.h
index b65326725b6..f26e65b97da 100644
--- a/be/src/pipeline/exec/group_commit_block_sink_operator.h
+++ b/be/src/pipeline/exec/group_commit_block_sink_operator.h
@@ -34,13 +34,19 @@ class GroupCommitBlockSinkLocalState final : public
PipelineXSinkLocalState<Basi
public:
GroupCommitBlockSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
- : Base(parent, state), _filter_bitmap(1024) {}
+ : Base(parent, state), _filter_bitmap(1024) {
+ _finish_dependency =
+ std::make_shared<Dependency>(parent->operator_id(),
parent->node_id(),
+ parent->get_name() +
"_FINISH_DEPENDENCY", true);
+ }
~GroupCommitBlockSinkLocalState() override;
Status open(RuntimeState* state) override;
Status close(RuntimeState* state, Status exec_status) override;
+ Dependency* finishdependency() override { return _finish_dependency.get();
}
+ std::string debug_string(int indentation_level) const override;
private:
friend class GroupCommitBlockSinkOperatorX;
@@ -66,6 +72,7 @@ private:
TGroupCommitMode::type _group_commit_mode;
Bitmap _filter_bitmap;
int64_t _table_id;
+ std::shared_ptr<Dependency> _finish_dependency;
};
class GroupCommitBlockSinkOperatorX final
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 06cf494c842..6e81e3ae4c3 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -26,6 +26,7 @@
#include "common/compiler_util.h"
#include "common/config.h"
#include "common/status.h"
+#include "pipeline/dependency.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "util/debug_points.h"
@@ -459,8 +460,10 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t
db_id, int64_t table_
{
std::unique_lock l2(load_block_queue->mutex);
load_block_queue->process_finish = true;
+ for (auto dep : load_block_queue->dependencies) {
+ dep->set_always_ready();
+ }
}
- load_block_queue->internal_group_commit_finish_cv.notify_all();
}
_load_block_queues.erase(instance_id);
}
@@ -616,6 +619,15 @@ Status LoadBlockQueue::close_wal() {
return Status::OK();
}
+void LoadBlockQueue::append_dependency(std::shared_ptr<pipeline::Dependency>
finish_dep) {
+ std::lock_guard<std::mutex> lock(mutex);
+ // If not finished, dependencies should be blocked.
+ if (!process_finish) {
+ finish_dep->block();
+ dependencies.push_back(finish_dep);
+ }
+}
+
bool LoadBlockQueue::has_enough_wal_disk_space(size_t estimated_wal_bytes) {
DBUG_EXECUTE_IF("LoadBlockQueue.has_enough_wal_disk_space.low_space", {
return false; });
auto* wal_mgr = ExecEnv::GetInstance()->wal_mgr();
diff --git a/be/src/runtime/group_commit_mgr.h
b/be/src/runtime/group_commit_mgr.h
index c41f6abd6fe..65f9f09670c 100644
--- a/be/src/runtime/group_commit_mgr.h
+++ b/be/src/runtime/group_commit_mgr.h
@@ -41,6 +41,10 @@ class ExecEnv;
class TUniqueId;
class RuntimeState;
+namespace pipeline {
+class Dependency;
+}
+
struct BlockData {
BlockData(const std::shared_ptr<vectorized::Block>& block)
: block(block), block_bytes(block->bytes()) {};
@@ -79,6 +83,19 @@ public:
int be_exe_version);
Status close_wal();
bool has_enough_wal_disk_space(size_t estimated_wal_bytes);
+ void append_dependency(std::shared_ptr<pipeline::Dependency> finish_dep);
+
+ std::string debug_string() const {
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer,
+ "load_instance_id={}, label={}, txn_id={}, "
+ "wait_internal_group_commit_finish={},
data_size_condition={}, "
+ "group_commit_load_count={}, process_finish={}",
+ load_instance_id.to_string(), label, txn_id,
+ wait_internal_group_commit_finish, data_size_condition,
+ group_commit_load_count, process_finish.load());
+ return fmt::to_string(debug_string_buffer);
+ }
UniqueId load_instance_id;
std::string label;
@@ -92,9 +109,9 @@ public:
// the execute status of this internal group commit
std::mutex mutex;
- std::condition_variable internal_group_commit_finish_cv;
- bool process_finish = false;
+ std::atomic<bool> process_finish = false;
Status status = Status::OK();
+ std::vector<std::shared_ptr<pipeline::Dependency>> dependencies;
private:
void _cancel_without_lock(const Status& st);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]