This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bb11955709a [improvement](spill) avoid spill if memory is enough
(#33075)
bb11955709a is described below
commit bb11955709a4b06f44af31465e5d2330ed77a76d
Author: TengJianPing <[email protected]>
AuthorDate: Mon Apr 1 16:14:10 2024 +0800
[improvement](spill) avoid spill if memory is enough (#33075)
---
.../exec/partitioned_aggregation_sink_operator.cpp | 32 ++++++++---------
.../exec/partitioned_aggregation_sink_operator.h | 3 --
.../partitioned_aggregation_source_operator.cpp | 28 ++++++---------
.../exec/partitioned_aggregation_source_operator.h | 2 --
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 41 ++++++++++------------
be/src/pipeline/exec/spill_sort_sink_operator.h | 3 --
.../pipeline/exec/spill_sort_source_operator.cpp | 28 +++------------
be/src/pipeline/exec/spill_sort_source_operator.h | 3 --
be/src/pipeline/pipeline_x/dependency.h | 2 ++
be/src/vec/common/sort/sorter.cpp | 3 +-
be/src/vec/spill/spill_stream.cpp | 17 ++++++---
11 files changed, 66 insertions(+), 96 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 074565b4027..64996724e15 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -71,12 +71,6 @@ Status PartitionedAggSinkLocalState::close(RuntimeState*
state, Status exec_stat
return Status::OK();
}
dec_running_big_mem_op_num(state);
- {
- std::unique_lock<std::mutex> lk(_spill_lock);
- if (_is_spilling) {
- _spill_cv.wait(lk);
- }
- }
return Base::close(state, exec_status);
}
@@ -168,13 +162,17 @@ Status
PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
auto* runtime_state = local_state._runtime_state.get();
RETURN_IF_ERROR(_agg_sink_operator->sink(runtime_state, in_block, false));
if (eos) {
- LOG(INFO) << "agg node " << id() << " sink eos";
- if (revocable_mem_size(state) > 0) {
- RETURN_IF_ERROR(revoke_memory(state));
- } else {
- for (auto& partition :
local_state._shared_state->spill_partitions) {
- RETURN_IF_ERROR(partition->finish_current_spilling(eos));
+ if (local_state._shared_state->is_spilled) {
+ if (revocable_mem_size(state) > 0) {
+ RETURN_IF_ERROR(revoke_memory(state));
+ } else {
+ for (auto& partition :
local_state._shared_state->spill_partitions) {
+ RETURN_IF_ERROR(partition->finish_current_spilling(eos));
+ }
+ local_state._dependency->set_ready_to_read();
+ local_state._finish_dependency->set_ready();
}
+ } else {
local_state._dependency->set_ready_to_read();
local_state._finish_dependency->set_ready();
}
@@ -232,8 +230,10 @@ Status
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
LOG(INFO) << "agg node " << Base::_parent->id() << " revoke_memory"
<< ", eos: " << _eos;
RETURN_IF_ERROR(Base::_shared_state->sink_status);
- DCHECK(!_is_spilling);
- _is_spilling = true;
+ if (!_shared_state->is_spilled) {
+ _shared_state->is_spilled = true;
+ profile()->add_info_string("Spilled", "true");
+ }
// TODO: spill thread may set_ready before the task::execute thread put
the task to blocked state
if (!_eos) {
@@ -243,7 +243,6 @@ Status
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
Status status;
Defer defer {[&]() {
if (!status.ok()) {
- _is_spilling = false;
if (!_eos) {
Base::_dependency->Dependency::set_ready();
}
@@ -272,15 +271,12 @@ Status
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
<< ", eos: " << _eos;
}
{
- std::unique_lock<std::mutex> lk(_spill_lock);
- _is_spilling = false;
if (_eos) {
Base::_dependency->set_ready_to_read();
_finish_dependency->set_ready();
} else {
Base::_dependency->Dependency::set_ready();
}
- _spill_cv.notify_one();
}
}};
auto* runtime_state = _runtime_state.get();
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 5e617386812..542046556ec 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -272,9 +272,6 @@ public:
bool _eos = false;
std::shared_ptr<Dependency> _finish_dependency;
- bool _is_spilling = false;
- std::mutex _spill_lock;
- std::condition_variable _spill_cv;
/// Resources in shared state will be released when the operator is closed,
/// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index b6620458c06..c328598ac44 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -89,12 +89,6 @@ Status PartitionedAggLocalState::close(RuntimeState* state) {
return Status::OK();
}
dec_running_big_mem_op_num(state);
- {
- std::unique_lock<std::mutex> lk(_merge_spill_lock);
- if (_is_merging) {
- _merge_spill_cv.wait(lk);
- }
- }
return Base::close(state);
}
PartitionedAggSourceOperatorX::PartitionedAggSourceOperatorX(ObjectPool* pool,
@@ -133,13 +127,16 @@ Status
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR(local_state._status);
-
RETURN_IF_ERROR(local_state.initiate_merge_spill_partition_agg_data(state));
+ if (local_state._shared_state->is_spilled) {
+
RETURN_IF_ERROR(local_state.initiate_merge_spill_partition_agg_data(state));
- /// When `_is_merging` is true means we are reading spilled data and
merging the data into hash table.
- if (local_state._is_merging) {
- return Status::OK();
+ /// When `_is_merging` is true means we are reading spilled data and
merging the data into hash table.
+ if (local_state._is_merging) {
+ return Status::OK();
+ }
}
+ // not spilled in sink or current partition still has data
auto* runtime_state = local_state._runtime_state.get();
RETURN_IF_ERROR(_agg_source_operator->get_block(runtime_state, block,
eos));
if (local_state._runtime_state) {
@@ -148,7 +145,8 @@ Status
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
local_state.update_profile(source_local_state->profile());
}
if (*eos) {
- if (!local_state._shared_state->spill_partitions.empty()) {
+ if (local_state._shared_state->is_spilled &&
+ !local_state._shared_state->spill_partitions.empty()) {
*eos = false;
}
}
@@ -220,12 +218,8 @@ Status
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
}
Base::_shared_state->in_mem_shared_state->aggregate_data_container
->init_once();
- {
- std::unique_lock<std::mutex>
lk(_merge_spill_lock);
- _is_merging = false;
- _dependency->Dependency::set_ready();
- _merge_spill_cv.notify_one();
- }
+ _is_merging = false;
+ _dependency->Dependency::set_ready();
}};
bool has_agg_data = false;
auto& parent = Base::_parent->template cast<Parent>();
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index ac63402f227..eff1e7179c8 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -60,8 +60,6 @@ protected:
std::future<Status> _spill_merge_future;
bool _current_partition_eos = true;
bool _is_merging = false;
- std::mutex _merge_spill_lock;
- std::condition_variable _merge_spill_cv;
/// Resources in shared state will be released when the operator is closed,
/// but there may be asynchronous spilling tasks at this time, which can
lead to conflicts.
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 0ddc6daa79c..9c3ae278ff6 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -74,12 +74,6 @@ Status SpillSortSinkLocalState::open(RuntimeState* state) {
return Status::OK();
}
Status SpillSortSinkLocalState::close(RuntimeState* state, Status
execsink_status) {
- {
- std::unique_lock<std::mutex> lk(_spill_lock);
- if (_is_spilling) {
- _spill_cv.wait(lk);
- }
- }
auto& parent = Base::_parent->template cast<Parent>();
if (parent._enable_spill) {
dec_running_big_mem_op_num(state);
@@ -179,9 +173,16 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState*
state, vectorized::Bloc
local_state._shared_state->in_mem_shared_state->sorter->data_size());
if (eos) {
if (_enable_spill) {
- if (revocable_mem_size(state) > 0) {
- RETURN_IF_ERROR(revoke_memory(state));
+ if (local_state._shared_state->is_spilled) {
+ if (revocable_mem_size(state) > 0) {
+ RETURN_IF_ERROR(revoke_memory(state));
+ } else {
+ local_state._dependency->set_ready_to_read();
+ local_state._finish_dependency->set_ready();
+ }
} else {
+ RETURN_IF_ERROR(
+
local_state._shared_state->in_mem_shared_state->sorter->prepare_for_read());
local_state._dependency->set_ready_to_read();
local_state._finish_dependency->set_ready();
}
@@ -194,8 +195,10 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState*
state, vectorized::Bloc
return Status::OK();
}
Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
- DCHECK(!_is_spilling);
- _is_spilling = true;
+ if (!_shared_state->is_spilled) {
+ _shared_state->is_spilled = true;
+ profile()->add_info_string("Spilled", "true");
+ }
LOG(INFO) << "sort node " << Base::_parent->id() << " revoke_memory"
<< ", eos: " << _eos;
@@ -251,17 +254,12 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
_shared_state->clear();
}
- {
- std::unique_lock<std::mutex> lk(_spill_lock);
- _spilling_stream.reset();
- _is_spilling = false;
- if (_eos) {
- _dependency->set_ready_to_read();
- _finish_dependency->set_ready();
- } else {
- _dependency->Dependency::set_ready();
- }
- _spill_cv.notify_one();
+ _spilling_stream.reset();
+ if (_eos) {
+ _dependency->set_ready_to_read();
+ _finish_dependency->set_ready();
+ } else {
+ _dependency->Dependency::set_ready();
}
}};
@@ -296,7 +294,6 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState*
state) {
return Status::OK();
});
if (!status.ok()) {
- _is_spilling = false;
_spilling_stream->end_spill(status);
if (!_eos) {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index ae5a3bcb8c7..d66215411aa 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -62,11 +62,8 @@ private:
RuntimeProfile::Counter* _spill_merge_sort_timer = nullptr;
bool _eos = false;
- bool _is_spilling = false;
vectorized::SpillStreamSPtr _spilling_stream;
std::shared_ptr<Dependency> _finish_dependency;
- std::mutex _spill_lock;
- std::condition_variable _spill_cv;
};
class SpillSortSinkOperatorX final : public
DataSinkOperatorX<SpillSortSinkLocalState> {
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 56c20c853be..6ae30a482f7 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -58,20 +58,10 @@ Status SpillSortLocalState::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
- {
- std::unique_lock<std::mutex> lk(_merge_spill_lock);
- if (_is_merging) {
- _merge_spill_cv.wait(lk);
- }
- }
if (Base::_shared_state->enable_spill) {
dec_running_big_mem_op_num(state);
}
RETURN_IF_ERROR(Base::close(state));
- for (auto& stream : _current_merging_streams) {
-
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
- }
- _current_merging_streams.clear();
return Status::OK();
}
int SpillSortLocalState::_calc_spill_blocks_to_merge() const {
@@ -81,14 +71,11 @@ int SpillSortLocalState::_calc_spill_blocks_to_merge()
const {
Status SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState*
state) {
auto& parent = Base::_parent->template cast<Parent>();
LOG(INFO) << "sort node " << _parent->node_id() << " merge spill data";
- DCHECK(!_is_merging);
- _is_merging = true;
_dependency->Dependency::block();
Status status;
Defer defer {[&]() {
if (!status.ok()) {
- _is_merging = false;
_dependency->Dependency::set_ready();
}
}};
@@ -111,12 +98,7 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
} else {
LOG(INFO) << "sort node " << _parent->node_id() << " merge
spill data finish";
}
- {
- std::unique_lock<std::mutex> lk(_merge_spill_lock);
- _is_merging = false;
- _dependency->Dependency::set_ready();
- _merge_spill_cv.notify_one();
- }
+ _dependency->Dependency::set_ready();
}};
vectorized::Block merge_sorted_block;
vectorized::SpillStreamSPtr tmp_stream;
@@ -258,15 +240,15 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState*
state, vectorized::Bloc
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR(local_state._status);
- if (!local_state.Base::_shared_state->enable_spill) {
- RETURN_IF_ERROR(
-
_sort_source_operator->get_block(local_state._runtime_state.get(), block, eos));
- } else {
+ if (local_state.Base::_shared_state->enable_spill &&
local_state._shared_state->is_spilled) {
if (!local_state._merger) {
return local_state.initiate_merge_sort_spill_streams(state);
} else {
RETURN_IF_ERROR(local_state._merger->get_next(block, eos));
}
+ } else {
+ RETURN_IF_ERROR(
+
_sort_source_operator->get_block(local_state._runtime_state.get(), block, eos));
}
local_state.reached_limit(block, eos);
return Status::OK();
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h
b/be/src/pipeline/exec/spill_sort_source_operator.h
index 8132dd5a56c..a20eb57889b 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.h
+++ b/be/src/pipeline/exec/spill_sort_source_operator.h
@@ -65,9 +65,6 @@ protected:
int64_t _external_sort_bytes_threshold = 134217728; // 128M
std::vector<vectorized::SpillStreamSPtr> _current_merging_streams;
std::unique_ptr<vectorized::VSortedRunMerger> _merger;
- bool _is_merging = false;
- std::mutex _merge_spill_lock;
- std::condition_variable _merge_spill_cv;
std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
// counters for spill merge sort
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 622c175edcb..3ea096a81d6 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -441,6 +441,7 @@ struct PartitionedAggSharedState : public BasicSharedState,
size_t partition_count;
size_t max_partition_index;
Status sink_status;
+ bool is_spilled = false;
std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions;
size_t get_partition_index(size_t hash_value) const {
@@ -514,6 +515,7 @@ struct SpillSortSharedState : public BasicSharedState,
SortSharedState* in_mem_shared_state = nullptr;
bool enable_spill = false;
+ bool is_spilled = false;
Status sink_status;
std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr;
diff --git a/be/src/vec/common/sort/sorter.cpp
b/be/src/vec/common/sort/sorter.cpp
index 53fc2011232..db3cca8bf09 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -63,6 +63,7 @@ void MergeSorterState::reset() {
cursors_.swap(empty_cursors);
std::vector<Block> empty_blocks(0);
sorted_blocks_.swap(empty_blocks);
+ unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty());
in_mem_sorted_bocks_size_ = 0;
}
Status MergeSorterState::add_sorted_block(Block& block) {
@@ -70,8 +71,8 @@ Status MergeSorterState::add_sorted_block(Block& block) {
if (0 == rows) {
return Status::OK();
}
- sorted_blocks_.emplace_back(std::move(block));
in_mem_sorted_bocks_size_ += block.bytes();
+ sorted_blocks_.emplace_back(std::move(block));
num_rows_ += rows;
return Status::OK();
}
diff --git a/be/src/vec/spill/spill_stream.cpp
b/be/src/vec/spill/spill_stream.cpp
index d08b63df40b..f245f8fa309 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -68,8 +68,14 @@ void SpillStream::close() {
read_promise_.reset();
}
- (void)writer_->close();
- (void)reader_->close();
+ if (writer_) {
+ (void)writer_->close();
+ writer_.reset();
+ }
+ if (reader_) {
+ (void)reader_->close();
+ reader_.reset();
+ }
}
const std::string& SpillStream::get_spill_root_dir() const {
@@ -100,13 +106,16 @@ Status SpillStream::spill_block(const Block& block, bool
eof) {
size_t written_bytes = 0;
RETURN_IF_ERROR(writer_->write(block, written_bytes));
if (eof) {
- return writer_->close();
+ RETURN_IF_ERROR(writer_->close());
+ writer_.reset();
}
return Status::OK();
}
Status SpillStream::spill_eof() {
- return writer_->close();
+ RETURN_IF_ERROR(writer_->close());
+ writer_.reset();
+ return Status::OK();
}
Status SpillStream::read_next_block_sync(Block* block, bool* eos) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]