This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/spill_and_reserve by this push:
new ed6843e06e8 refactor SpillRunnable (#41991)
ed6843e06e8 is described below
commit ed6843e06e8aa7b3896334cfa9c40381c4de1e85
Author: TengJianPing <[email protected]>
AuthorDate: Thu Oct 17 14:08:23 2024 +0800
refactor SpillRunnable (#41991)
## Proposed changes
Issue Number: close #xxx
<!--Describe your changes.-->
---
be/src/pipeline/dependency.cpp | 1 -
be/src/pipeline/dependency.h | 8 +-
be/src/pipeline/exec/operator.h | 16 +-
.../exec/partitioned_aggregation_sink_operator.cpp | 62 ++++---
.../partitioned_aggregation_source_operator.cpp | 70 ++++----
.../exec/partitioned_aggregation_source_operator.h | 1 -
.../exec/partitioned_hash_join_probe_operator.cpp | 168 +++++++------------
.../exec/partitioned_hash_join_probe_operator.h | 6 -
.../exec/partitioned_hash_join_sink_operator.cpp | 178 +++++++--------------
.../exec/partitioned_hash_join_sink_operator.h | 13 +-
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 78 ++++-----
.../pipeline/exec/spill_sort_source_operator.cpp | 81 +++++-----
be/src/pipeline/exec/spill_sort_source_operator.h | 1 -
be/src/pipeline/exec/spill_utils.h | 129 ++++++++++++---
be/src/vec/spill/spill_stream.cpp | 1 +
be/src/vec/spill/spill_stream.h | 4 +-
be/src/vec/spill/spill_writer.cpp | 11 +-
be/src/vec/spill/spill_writer.h | 6 +-
18 files changed, 384 insertions(+), 450 deletions(-)
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index cdde0c6330c..6294903fc1e 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -352,7 +352,6 @@ Status AggSpillPartition::get_spill_stream(RuntimeState*
state, int node_id,
std::numeric_limits<int32_t>::max(),
std::numeric_limits<size_t>::max(), profile));
spill_streams_.emplace_back(spilling_stream_);
spill_stream = spilling_stream_;
- spill_stream->set_write_counters(profile);
return Status::OK();
}
void AggSpillPartition::close() {
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 20251aed294..1a89ca4f02c 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -429,16 +429,18 @@ private:
struct BasicSpillSharedState {
virtual ~BasicSpillSharedState() = default;
+ AtomicStatus _spill_status;
+
// These two counters are shared to spill source operators as the initial
value
// of 'SpillWriteFileCurrentSize' and 'SpillWriteFileCurrentCount'.
// Total bytes of spill data written to disk file(after serialized)
- RuntimeProfile::Counter* _spill_write_file_data_size = nullptr;
+ RuntimeProfile::Counter* _spill_write_file_total_size = nullptr;
RuntimeProfile::Counter* _spill_file_total_count = nullptr;
void setup_shared_profile(RuntimeProfile* sink_profile) {
_spill_file_total_count =
ADD_COUNTER_WITH_LEVEL(sink_profile,
"SpillWriteFileTotalCount", TUnit::UNIT, 1);
- _spill_write_file_data_size =
+ _spill_write_file_total_size =
ADD_COUNTER_WITH_LEVEL(sink_profile,
"SpillWriteFileTotalSize", TUnit::BYTES, 1);
}
@@ -466,7 +468,6 @@ struct PartitionedAggSharedState : public BasicSharedState,
size_t partition_count_bits;
size_t partition_count;
size_t max_partition_index;
- Status sink_status;
bool is_spilled = false;
std::atomic_bool is_closed = false;
std::deque<std::shared_ptr<AggSpillPartition>> spill_partitions;
@@ -547,7 +548,6 @@ struct SpillSortSharedState : public BasicSharedState,
bool enable_spill = false;
bool is_spilled = false;
std::atomic_bool is_closed = false;
- Status sink_status;
std::shared_ptr<BasicSharedState> in_mem_shared_state_sptr;
std::deque<vectorized::SpillStreamSPtr> sorted_streams;
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index da0cc008f06..b93dc925a1b 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -312,7 +312,7 @@ public:
ADD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteBlockCount", TUnit::UNIT, 1);
_spill_write_block_data_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteBlockDataSize", TUnit::BYTES, 1);
- _spill_write_file_data_size =
+ _spill_write_file_total_size =
ADD_COUNTER_WITH_LEVEL(Base::profile(),
"SpillWriteFileTotalSize", TUnit::BYTES, 1);
_spill_write_rows_count =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteRows",
TUnit::UNIT, 1);
@@ -359,14 +359,16 @@ public:
if (_copy_shared_spill_profile) {
_copy_shared_spill_profile = false;
const auto* spill_shared_state = (const
BasicSpillSharedState*)Base::_shared_state;
- COUNTER_SET(_spill_file_current_size,
-
spill_shared_state->_spill_write_file_data_size->value());
- COUNTER_SET(_spill_file_current_count,
- spill_shared_state->_spill_file_total_count->value());
+ COUNTER_UPDATE(_spill_file_current_size,
+
spill_shared_state->_spill_write_file_total_size->value());
+ COUNTER_UPDATE(_spill_file_current_count,
+
spill_shared_state->_spill_file_total_count->value());
Base::_shared_state->update_spill_stream_profiles(Base::profile());
}
}
+ std::atomic_int _spilling_task_count {0};
+
// Total time of spill, including spill task scheduling time,
// serialize block time, write disk file time,
// and read disk file time, deserialize block time etc.
@@ -390,7 +392,7 @@ public:
// Total bytes of spill data in Block format(in memory format)
RuntimeProfile::Counter* _spill_write_block_data_size = nullptr;
// Total bytes of spill data written to disk file(after serialized)
- RuntimeProfile::Counter* _spill_write_file_data_size = nullptr;
+ RuntimeProfile::Counter* _spill_write_file_total_size = nullptr;
RuntimeProfile::Counter* _spill_write_rows_count = nullptr;
RuntimeProfile::Counter* _spill_file_total_count = nullptr;
RuntimeProfile::Counter* _spill_file_current_count = nullptr;
@@ -749,6 +751,8 @@ public:
COUNTER_SET(_spill_min_rows_of_partition, min_rows);
}
+ std::atomic_int _spilling_task_count {0};
+
std::vector<int64_t> _rows_in_partitions;
// Total time of spill, including spill task scheduling time,
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index f5c09459f85..cd4eba2abfe 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -178,7 +178,9 @@ Status
PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized:
local_state.inc_running_big_mem_op_num(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
- RETURN_IF_ERROR(local_state.Base::_shared_state->sink_status);
+ if (!local_state._shared_state->_spill_status.ok()) {
+ return local_state._shared_state->_spill_status.status();
+ }
local_state._eos = eos;
auto* runtime_state = local_state._runtime_state.get();
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::sink", {
@@ -219,7 +221,7 @@ Status PartitionedAggSinkOperatorX::revoke_memory(
size_t PartitionedAggSinkOperatorX::revocable_mem_size(RuntimeState* state)
const {
auto& local_state = get_local_state(state);
- if (!local_state.Base::_shared_state->sink_status.ok()) {
+ if (!local_state._shared_state->_spill_status.ok()) {
return UINT64_MAX;
}
auto* runtime_state = local_state._runtime_state.get();
@@ -270,7 +272,9 @@ Status PartitionedAggSinkLocalState::revoke_memory(
<< Base::_parent->node_id()
<< " revoke_memory, size: " <<
_parent->revocable_mem_size(state)
<< ", eos: " << _eos;
- RETURN_IF_ERROR(Base::_shared_state->sink_status);
+ if (!_shared_state->_spill_status.ok()) {
+ return _shared_state->_spill_status.status();
+ }
if (!_shared_state->is_spilled) {
_shared_state->is_spilled = true;
profile()->add_info_string("Spilled", "true");
@@ -292,8 +296,6 @@ Status PartitionedAggSinkLocalState::revoke_memory(
auto query_id = state->query_id();
- MonotonicStopWatch submit_timer;
- submit_timer.start();
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_submit_func",
{
status = Status::Error<INTERNAL_ERROR>(
"fault_inject partitioned_agg_sink revoke_memory submit_func
failed");
@@ -301,32 +303,28 @@ Status PartitionedAggSinkLocalState::revoke_memory(
});
state->get_query_ctx()->increase_revoking_tasks_count();
- auto spill_runnable = std::make_shared<SpillRunnable>(
- state, _profile, true, _shared_state->shared_from_this(),
- [this, &parent, state, query_id, size_to_revoke, spill_context,
submit_timer] {
- auto submit_elapsed_time = submit_timer.elapsed_time();
- _spill_write_wait_in_queue_timer->update(submit_elapsed_time);
- exec_time_counter()->update(submit_elapsed_time);
- _spill_total_timer->update(submit_elapsed_time);
-
- SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_spill_total_timer);
- SCOPED_TIMER(_spill_write_timer);
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
+ _spilling_task_count = 1;
+ auto spill_runnable = std::make_shared<SpillRunnable>(
+ state, spill_context, _spilling_task_count, _profile, submit_timer,
+ _shared_state->shared_from_this(), Base::_spill_dependency, true,
true,
+ [this, &parent, state, query_id, size_to_revoke, spill_context] {
+ Status status;
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_sink::revoke_memory_cancel", {
- auto st = Status::InternalError(
+ status = Status::InternalError(
"fault_inject partitioned_agg_sink "
"revoke_memory canceled");
-
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, st);
- return st;
+
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, status);
+ return status;
});
Defer defer {[&]() {
- if (!_shared_state->sink_status.ok() ||
state->is_cancelled()) {
- if (!_shared_state->sink_status.ok()) {
- LOG(WARNING)
- << "query " << print_id(query_id) << " agg
node "
- << Base::_parent->node_id()
- << " revoke_memory error: " <<
Base::_shared_state->sink_status;
+ if (!status.ok() || state->is_cancelled()) {
+ if (!status.ok()) {
+ LOG(WARNING) << "query " << print_id(query_id) <<
" agg node "
+ << Base::_parent->node_id()
+ << " revoke_memory error: " << status;
}
_shared_state->close();
} else {
@@ -340,15 +338,10 @@ Status PartitionedAggSinkLocalState::revoke_memory(
_finish_dependency->set_ready();
}
state->get_query_ctx()->decrease_revoking_tasks_count();
- Base::_spill_dependency->Dependency::set_ready();
-
- if (spill_context) {
- spill_context->on_task_finished();
- }
}};
auto* runtime_state = _runtime_state.get();
auto* agg_data =
parent._agg_sink_operator->get_agg_data(runtime_state);
- Base::_shared_state->sink_status = std::visit(
+ status = std::visit(
vectorized::Overload {
[&](std::monostate& arg) -> Status {
return Status::InternalError("Unit hash
table");
@@ -359,10 +352,9 @@ Status PartitionedAggSinkLocalState::revoke_memory(
state, agg_method, hash_table,
size_to_revoke, _eos));
}},
agg_data->method_variant);
- RETURN_IF_ERROR(Base::_shared_state->sink_status);
- Base::_shared_state->sink_status =
-
parent._agg_sink_operator->reset_hash_table(runtime_state);
- return Base::_shared_state->sink_status;
+ RETURN_IF_ERROR(status);
+ status =
parent._agg_sink_operator->reset_hash_table(runtime_state);
+ return status;
});
return
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 1b49c0d3768..b20df4df598 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -138,21 +138,24 @@ Status
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
bool* eos) {
auto& local_state = get_local_state(state);
local_state.copy_shared_spill_profile();
+ Status status;
Defer defer {[&]() {
- if (!local_state._status.ok() || *eos) {
+ if (!status.ok() || *eos) {
local_state._shared_state->close();
}
}};
local_state.inc_running_big_mem_op_num(state);
SCOPED_TIMER(local_state.exec_time_counter());
- RETURN_IF_ERROR(local_state._status);
+ status = local_state._shared_state->_spill_status.status();
+ RETURN_IF_ERROR(status);
if (local_state._shared_state->is_spilled &&
local_state._need_to_merge_data_for_current_partition) {
if (local_state._blocks.empty() &&
!local_state._current_partition_eos) {
bool has_recovering_data = false;
- RETURN_IF_ERROR(local_state.recover_blocks_from_disk(state,
has_recovering_data));
+ status = local_state.recover_blocks_from_disk(state,
has_recovering_data);
+ RETURN_IF_ERROR(status);
*eos = !has_recovering_data;
return Status::OK();
} else if (!local_state._blocks.empty()) {
@@ -161,8 +164,9 @@ Status
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
auto block = std::move(local_state._blocks.front());
merged_rows += block.rows();
local_state._blocks.erase(local_state._blocks.begin());
-
RETURN_IF_ERROR(_agg_source_operator->merge_with_serialized_key_helper<false>(
- local_state._runtime_state.get(), &block));
+ status =
_agg_source_operator->merge_with_serialized_key_helper<false>(
+ local_state._runtime_state.get(), &block);
+ RETURN_IF_ERROR(status);
}
local_state._estimate_memory_usage +=
_agg_source_operator->get_estimated_memory_size_for_merging(
@@ -179,8 +183,8 @@ Status
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
// not spilled in sink or current partition still has data
auto* runtime_state = local_state._runtime_state.get();
local_state._shared_state->in_mem_shared_state->aggregate_data_container->init_once();
- local_state._status = _agg_source_operator->get_block(runtime_state,
block, eos);
- RETURN_IF_ERROR(local_state._status);
+ status = _agg_source_operator->get_block(runtime_state, block, eos);
+ RETURN_IF_ERROR(status);
if (local_state._runtime_state) {
auto* source_local_state =
local_state._runtime_state->get_local_state(_agg_source_operator->operator_id());
@@ -191,7 +195,8 @@ Status
PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized:
!local_state._shared_state->spill_partitions.empty()) {
local_state._current_partition_eos = false;
local_state._need_to_merge_data_for_current_partition = true;
-
RETURN_IF_ERROR(local_state._shared_state->in_mem_shared_state->reset_hash_table());
+ status =
local_state._shared_state->in_mem_shared_state->reset_hash_table();
+ RETURN_IF_ERROR(status);
*eos = false;
}
}
@@ -236,15 +241,15 @@ Status
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
has_data = true;
auto spill_func = [this, state, query_id] {
+ Status status;
Defer defer {[&]() {
- if (!_status.ok() || state->is_cancelled()) {
- if (!_status.ok()) {
+ if (!status.ok() || state->is_cancelled()) {
+ if (!status.ok()) {
LOG(WARNING) << "query " << print_id(query_id) << " agg
node "
- << _parent->node_id() << " recover agg data
error: " << _status;
+ << _parent->node_id() << " recover agg data
error: " << status;
}
_shared_state->close();
}
- _spill_dependency->Dependency::set_ready();
}};
bool has_agg_data = false;
size_t accumulated_blocks_size = 0;
@@ -258,15 +263,15 @@ Status
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
{
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::recover_spill_data",
{
- _status =
Status::Error<INTERNAL_ERROR>(
+ status =
Status::Error<INTERNAL_ERROR>(
"fault_inject
partitioned_agg_source "
"recover_spill_data
failed");
});
- if (_status.ok()) {
- _status = stream->read_next_block_sync(&block,
&eos);
+ if (status.ok()) {
+ status = stream->read_next_block_sync(&block,
&eos);
}
}
- RETURN_IF_ERROR(_status);
+ RETURN_IF_ERROR(status);
if (!block.empty()) {
has_agg_data = true;
@@ -288,34 +293,20 @@ Status
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
}
}
}
- return _status;
+ return status;
};
- MonotonicStopWatch submit_timer;
- submit_timer.start();
- auto exception_catch_func = [spill_func, query_id, submit_timer, this]() {
- auto submit_elapsed_time = submit_timer.elapsed_time();
- _spill_read_wait_in_queue_timer->update(submit_elapsed_time);
- exec_time_counter()->update(submit_elapsed_time);
- _spill_total_timer->update(submit_elapsed_time);
-
- SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_spill_total_timer);
- SCOPED_TIMER(_spill_recover_time);
-
+ auto exception_catch_func = [spill_func, query_id]() {
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::merge_spill_data_cancel",
{
auto st = Status::InternalError(
"fault_inject partitioned_agg_source "
"merge spill data canceled");
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id, st);
- return;
+ return st;
});
auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func();
}); }();
-
- if (!status.ok()) {
- _status = status;
- }
+ return status;
};
DBUG_EXECUTE_IF("fault_inject::partitioned_agg_source::submit_func", {
@@ -323,9 +314,14 @@ Status
PartitionedAggLocalState::recover_blocks_from_disk(RuntimeState* state, b
"fault_inject partitioned_agg_source submit_func failed");
});
_spill_dependency->block();
+
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
+ _spilling_task_count = 1;
return
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
- std::make_shared<SpillRunnable>(state, _runtime_profile.get(),
false,
- _shared_state->shared_from_this(),
- exception_catch_func));
+ std::make_shared<SpillRunnable>(state, nullptr,
_spilling_task_count,
+ _runtime_profile.get(),
submit_timer,
+ _shared_state->shared_from_this(),
_spill_dependency,
+ false, false,
exception_catch_func));
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index 53e683ada20..783e7ed3760 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -53,7 +53,6 @@ protected:
std::unique_ptr<RuntimeState> _runtime_state;
bool _opened = false;
- Status _status;
std::unique_ptr<std::promise<Status>> _spill_merge_promise;
std::future<Status> _spill_merge_future;
bool _current_partition_eos = true;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 6df2a7ca4ab..f1ca6f8d7de 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -23,6 +23,7 @@
#include <algorithm>
#include <utility>
+#include "common/exception.h"
#include "common/logging.h"
#include "common/status.h"
#include "pipeline/pipeline_task.h"
@@ -205,7 +206,6 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
state, spilling_stream, print_id(state->query_id()),
"hash_probe",
_parent->node_id(),
std::numeric_limits<int32_t>::max(),
std::numeric_limits<size_t>::max(),
_runtime_profile.get()));
- spilling_stream->set_write_counters(_runtime_profile.get());
}
auto merged_block =
vectorized::MutableBlock::create_unique(blocks[0].clone_empty());
@@ -243,36 +243,17 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
return Status::OK();
};
- MonotonicStopWatch submit_timer;
- submit_timer.start();
-
- auto exception_catch_func = [query_id, spill_func, spill_context,
submit_timer, this]() {
- auto submit_elapsed_time = submit_timer.elapsed_time();
- _spill_write_wait_in_queue_timer->update(submit_elapsed_time);
- exec_time_counter()->update(submit_elapsed_time);
- _spill_total_timer->update(submit_elapsed_time);
-
- SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_spill_total_timer);
- SCOPED_TIMER(_spill_write_timer);
-
+ auto exception_catch_func = [query_id, spill_func, spill_context]() {
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::spill_probe_blocks_cancel",
{
- ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
- query_id, Status::InternalError("fault_inject
partitioned_hash_join_probe "
- "spill_probe_blocks
canceled"));
- return;
+ auto status = Status::InternalError(
+ "fault_inject partitioned_hash_join_probe "
+ "spill_probe_blocks canceled");
+ ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id,
status);
+ return status;
});
auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func();
}); }();
-
- if (!status.ok()) {
- _spill_status_ok = false;
- _spill_status = std::move(status);
- }
- _spill_dependency->set_ready();
- if (spill_context) {
- spill_context->on_non_sink_task_finished();
- }
+ return status;
};
if (spill_context) {
@@ -284,9 +265,14 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(
"fault_inject partitioned_hash_join_probe spill_probe_blocks
submit_func failed");
});
- auto spill_runnable = std::make_shared<SpillRunnable>(state,
_runtime_profile.get(), true,
-
_shared_state->shared_from_this(),
-
exception_catch_func);
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
+
+ _spilling_task_count = 1;
+ auto spill_runnable = std::make_shared<SpillRunnable>(
+ state, spill_context, _spilling_task_count,
_runtime_profile.get(), submit_timer,
+ _shared_state->shared_from_this(), _spill_dependency, false, true,
+ exception_catch_func);
return spill_io_pool->submit(std::move(spill_runnable));
}
@@ -314,39 +300,26 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
}
spilled_stream->set_read_counters(profile());
- std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
- _shared_state->shared_from_this();
-
auto query_id = state->query_id();
- auto read_func = [this, query_id, state, spilled_stream = spilled_stream,
shared_state_holder,
- partition_index] {
- auto shared_state_sptr = shared_state_holder.lock();
- if (!shared_state_sptr || state->is_cancelled()) {
- LOG(INFO) << "query: " << print_id(query_id)
- << " execution_context released, maybe query was
cancelled.";
- return;
- }
-
+ auto read_func = [this, query_id, state, spilled_stream = spilled_stream,
partition_index] {
SCOPED_TIMER(_recovery_build_timer);
bool eos = false;
VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: "
<< _parent->node_id()
<< ", task id: " << state->task_id() << ", partition: " <<
partition_index
<< ", recoverying build data";
+ Status status;
while (!eos) {
vectorized::Block block;
- Status st;
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks",
{
- st = Status::Error<INTERNAL_ERROR>(
+ status = Status::Error<INTERNAL_ERROR>(
"fault_inject partitioned_hash_join_probe
recover_build_blocks failed");
});
- if (st.ok()) {
- st = spilled_stream->read_next_block_sync(&block, &eos);
+ if (status.ok()) {
+ status = spilled_stream->read_next_block_sync(&block, &eos);
}
- if (!st.ok()) {
- _spill_status_ok = false;
- _spill_status = std::move(st);
+ if (!status.ok()) {
break;
}
COUNTER_UPDATE(_recovery_build_rows, block.rows());
@@ -365,10 +338,8 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
_recovered_build_block =
vectorized::MutableBlock::create_unique(std::move(block));
} else {
DCHECK_EQ(_recovered_build_block->columns(), block.columns());
- st = _recovered_build_block->merge(std::move(block));
- if (!st.ok()) {
- _spill_status_ok = false;
- _spill_status = std::move(st);
+ status = _recovered_build_block->merge(std::move(block));
+ if (!status.ok()) {
break;
}
}
@@ -381,43 +352,29 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
if (eos) {
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
- shared_state_sptr->spilled_streams[partition_index].reset();
+ _shared_state->spilled_streams[partition_index].reset();
VLOG_DEBUG << "query: " << print_id(state->query_id())
<< ", node: " << _parent->node_id() << ", task id: " <<
state->task_id()
<< ", partition: " << partition_index;
}
+ return status;
};
- MonotonicStopWatch submit_timer;
- submit_timer.start();
-
- auto exception_catch_func = [read_func, query_id, submit_timer, this]() {
- auto submit_elapsed_time = submit_timer.elapsed_time();
- _spill_read_wait_in_queue_timer->update(submit_elapsed_time);
- exec_time_counter()->update(submit_elapsed_time);
- _spill_total_timer->update(submit_elapsed_time);
-
- SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_spill_total_timer);
- SCOPED_TIMER(_spill_recover_time);
-
+ auto exception_catch_func = [read_func, query_id]() {
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_build_blocks_cancel",
{
- ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
- query_id, Status::InternalError("fault_inject
partitioned_hash_join_probe "
- "recover_build_blocks
canceled"));
- return;
+ auto status = Status::InternalError(
+ "fault_inject partitioned_hash_join_probe "
+ "recover_build_blocks canceled");
+ ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id,
status);
+ return status;
});
auto status = [&]() {
- RETURN_IF_CATCH_EXCEPTION(read_func());
+ RETURN_IF_ERROR_OR_CATCH_EXCEPTION(read_func());
return Status::OK();
}();
- if (!status.ok()) {
- _spill_status_ok = false;
- _spill_status = std::move(status);
- }
- _spill_dependency->set_ready();
+ return status;
};
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
@@ -440,9 +397,15 @@ Status
PartitionedHashJoinProbeLocalState::recover_build_blocks_from_disk(Runtim
"fault_inject partitioned_hash_join_probe "
"recovery_build_blocks submit_func failed");
});
- auto spill_runnable = std::make_shared<SpillRunnable>(state,
_runtime_profile.get(), false,
-
_shared_state->shared_from_this(),
-
exception_catch_func);
+
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
+
+ _spilling_task_count = 1;
+ auto spill_runnable = std::make_shared<SpillRunnable>(
+ state, nullptr, _spilling_task_count, _runtime_profile.get(),
submit_timer,
+ _shared_state->shared_from_this(), _spill_dependency, false, false,
+ exception_catch_func);
VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", node: " <<
_parent->node_id()
<< ", task id: " << state->task_id() << ", partition: " <<
partition_index
<< " recover_build_blocks_from_disk submit func";
@@ -500,8 +463,7 @@ Status
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
while (!eos && !_state->is_cancelled() && st.ok()) {
st = spilled_stream->read_next_block_sync(&block, &eos);
if (!st.ok()) {
- _spill_status_ok = false;
- _spill_status = std::move(st);
+ break;
} else {
COUNTER_UPDATE(_recovery_probe_rows, block.rows());
COUNTER_UPDATE(_recovery_probe_blocks, 1);
@@ -519,37 +481,24 @@ Status
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilled_stream);
spilled_stream.reset();
}
+ return st;
};
- MonotonicStopWatch submit_timer;
- submit_timer.start();
- auto exception_catch_func = [read_func, query_id, submit_timer, this]() {
- auto submit_elapsed_time = submit_timer.elapsed_time();
- _spill_read_wait_in_queue_timer->update(submit_elapsed_time);
- exec_time_counter()->update(submit_elapsed_time);
- _spill_total_timer->update(submit_elapsed_time);
-
- SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_spill_total_timer);
- SCOPED_TIMER(_spill_recover_time);
-
+ auto exception_catch_func = [read_func, query_id]() {
DBUG_EXECUTE_IF("fault_inject::partitioned_hash_join_probe::recover_probe_blocks_cancel",
{
- ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
- query_id, Status::InternalError("fault_inject
partitioned_hash_join_probe "
- "recover_probe_blocks
canceled"));
- return;
+ auto status = Status::InternalError(
+ "fault_inject partitioned_hash_join_probe "
+ "recover_probe_blocks canceled");
+ ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id,
status);
+ return status;
});
auto status = [&]() {
- RETURN_IF_CATCH_EXCEPTION(read_func());
+ RETURN_IF_ERROR_OR_CATCH_EXCEPTION(read_func());
return Status::OK();
}();
- if (!status.ok()) {
- _spill_status_ok = false;
- _spill_status = std::move(status);
- }
- _spill_dependency->set_ready();
+ return status;
};
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
@@ -562,8 +511,12 @@ Status
PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
"fault_inject partitioned_hash_join_probe "
"recovery_probe_blocks submit_func failed");
});
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
+ _spilling_task_count = 1;
return spill_io_pool->submit(std::make_shared<SpillRunnable>(
- state, _runtime_profile.get(), false,
_shared_state->shared_from_this(),
+ state, nullptr, _spilling_task_count, _runtime_profile.get(),
submit_timer,
+ _shared_state->shared_from_this(), _spill_dependency, false, false,
exception_catch_func));
}
@@ -759,9 +712,8 @@ Status
PartitionedHashJoinProbeOperatorX::_setup_internal_operators(
Status PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
vectorized::Block*
output_block, bool* eos) const {
auto& local_state = get_local_state(state);
- if (!local_state._spill_status_ok) {
- DCHECK_NE(local_state._spill_status.code(), 0);
- return local_state._spill_status;
+ if (!local_state._shared_state->_spill_status.ok()) {
+ return local_state._shared_state->_spill_status.status();
}
const auto partition_index = local_state._partition_cursor;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 6ecbdd01e49..0ef54f36c2f 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -74,12 +74,6 @@ private:
std::unique_ptr<vectorized::Block> _child_block;
bool _child_eos {false};
- std::mutex _spill_lock;
- Status _spill_status;
-
- std::atomic<int> _spilling_task_count {0};
- std::atomic<bool> _spill_status_ok {true};
-
std::vector<std::unique_ptr<vectorized::MutableBlock>> _partitioned_blocks;
std::unique_ptr<vectorized::MutableBlock> _recovered_build_block;
std::map<uint32_t, std::vector<vectorized::Block>> _probe_blocks;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index 8c9990da1ef..d3d010e0d7c 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -73,7 +73,6 @@ Status PartitionedHashJoinSinkLocalState::open(RuntimeState*
state) {
state, spilling_stream, print_id(state->query_id()),
fmt::format("hash_build_sink_{}", i), _parent->node_id(),
std::numeric_limits<int32_t>::max(),
std::numeric_limits<size_t>::max(), _profile));
- spilling_stream->set_write_counters(_profile);
}
return p._partitioner->clone(state, _partitioner);
}
@@ -203,21 +202,6 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
std::for_each(partitions_indexes.begin(), partitions_indexes.end(),
[](std::vector<uint32_t>& indices) {
indices.reserve(reserved_size); });
- auto flush_rows = [&state,
this](std::unique_ptr<vectorized::MutableBlock>& partition_block,
- vectorized::SpillStreamSPtr&
spilling_stream) {
- auto block = partition_block->to_block();
- auto status = spilling_stream->spill_block(state, block, false);
-
- if (!status.ok()) {
- std::unique_lock<std::mutex> lock(_spill_lock);
- _spill_status = status;
- _spill_status_ok = false;
- _spill_dependency->set_ready();
- return false;
- }
- return true;
- };
-
size_t total_rows = build_block.rows();
size_t offset = 1;
while (offset < total_rows) {
@@ -261,22 +245,14 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
int64_t old_mem = partition_block->allocated_bytes();
{
SCOPED_TIMER(_partition_shuffle_timer);
- Status st = partition_block->add_rows(&sub_block, begin,
end);
- if (!st.ok()) {
- std::unique_lock<std::mutex> lock(_spill_lock);
- _spill_status = st;
- _spill_status_ok = false;
- _spill_dependency->set_ready();
- return;
- }
+ RETURN_IF_ERROR(partition_block->add_rows(&sub_block,
begin, end));
partitions_indexes[partition_idx].clear();
}
int64_t new_mem = partition_block->allocated_bytes();
if (partition_block->rows() >= reserved_size || is_last_block)
{
- if (!flush_rows(partition_block, spilling_stream)) {
- return;
- }
+ auto block = partition_block->to_block();
+ RETURN_IF_ERROR(spilling_stream->spill_block(state, block,
false));
partition_block =
vectorized::MutableBlock::create_unique(build_block.clone_empty());
COUNTER_UPDATE(_memory_used_counter, -new_mem);
@@ -287,40 +263,20 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
}
}
- _spill_dependency->set_ready();
+ return Status::OK();
};
- MonotonicStopWatch submit_timer;
- submit_timer.start();
- auto exception_catch_func = [spill_func, spill_context, submit_timer,
this]() mutable {
- auto submit_elapsed_time = submit_timer.elapsed_time();
- _spill_write_wait_in_queue_timer->update(submit_elapsed_time);
- exec_time_counter()->update(submit_elapsed_time);
- _spill_total_timer->update(submit_elapsed_time);
-
- SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_spill_total_timer);
- SCOPED_TIMER(_spill_write_timer);
-
- auto status = [&]() {
- RETURN_IF_CATCH_EXCEPTION(spill_func());
- return Status::OK();
- }();
-
- if (!status.ok()) {
- std::unique_lock<std::mutex> lock(_spill_lock);
- _spill_status = status;
- _spill_status_ok = false;
- _spill_dependency->set_ready();
- }
-
- if (spill_context) {
- spill_context->on_task_finished();
- }
+ auto exception_catch_func = [spill_func]() mutable {
+ auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return spill_func());
}();
+ return status;
};
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
+ _spilling_task_count = 1;
auto spill_runnable = std::make_shared<SpillRunnable>(
- state, _profile, true, _shared_state->shared_from_this(),
exception_catch_func);
+ state, spill_context, _spilling_task_count, _profile, submit_timer,
+ _shared_state->shared_from_this(), _spill_dependency, true, true,
exception_catch_func);
auto* thread_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
@@ -340,7 +296,7 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", task: " <<
state->task_id()
<< " hash join sink " << _parent->node_id() << " revoke_memory"
<< ", eos: " << _child_eos;
- DCHECK_EQ(_spilling_streams_count, 0);
+ DCHECK_EQ(_spilling_task_count, 0);
CHECK_EQ(_spill_dependency->is_blocked_by(nullptr), nullptr);
if (!_shared_state->need_to_spill) {
@@ -349,20 +305,40 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
return _revoke_unpartitioned_block(state, spill_context);
}
- _spilling_streams_count = _shared_state->partitioned_build_blocks.size();
+ _spilling_task_count = _shared_state->partitioned_build_blocks.size();
auto query_id = state->query_id();
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
DCHECK(spill_io_pool != nullptr);
+ auto spill_fin_cb = [this, state, query_id, spill_context]() {
+ Status status;
+ if (_child_eos) {
+ VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) <<
", hash join sink "
+ << _parent->node_id() << " set_ready_to_read"
+ << ", task id: " << state->task_id();
+ std::for_each(_shared_state->partitioned_build_blocks.begin(),
+ _shared_state->partitioned_build_blocks.end(),
[&](auto& block) {
+ if (block) {
+ COUNTER_UPDATE(_in_mem_rows_counter,
block->rows());
+ }
+ });
+ auto st = _finish_spilling();
+ if (status.ok()) {
+ status = st;
+ }
+ _dependency->set_ready_to_read();
+ }
+ return status;
+ };
for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size();
++i) {
vectorized::SpillStreamSPtr& spilling_stream =
_shared_state->spilled_streams[i];
auto& mutable_block = _shared_state->partitioned_build_blocks[i];
if (!mutable_block ||
mutable_block->allocated_bytes() <
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
- --_spilling_streams_count;
+ --_spilling_task_count;
continue;
}
@@ -380,53 +356,40 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
// so that when a stream finished, it should desc -1
state->get_query_ctx()->increase_revoking_tasks_count();
auto spill_runnable = std::make_shared<SpillRunnable>(
- state, _profile, true, _shared_state->shared_from_this(),
- [this, query_id, spilling_stream, i, submit_timer,
spill_context] {
- auto submit_elapsed_time = submit_timer.elapsed_time();
-
_spill_write_wait_in_queue_timer->update(submit_elapsed_time);
- exec_time_counter()->update(submit_elapsed_time);
- _spill_total_timer->update(submit_elapsed_time);
-
- SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_spill_total_timer);
- SCOPED_TIMER(_spill_write_timer);
-
+ state, spill_context, _spilling_task_count, _profile,
submit_timer,
+ _shared_state->shared_from_this(), _spill_dependency, true,
true,
+ [this, query_id, spilling_stream, i, spill_context] {
DBUG_EXECUTE_IF(
"fault_inject::partitioned_hash_join_sink::revoke_memory_cancel", {
-
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
- query_id, Status::InternalError(
- "fault_inject
partitioned_hash_join_sink "
- "revoke_memory
canceled"));
- return;
+ auto status = Status::InternalError(
+ "fault_inject
partitioned_hash_join_sink "
+ "revoke_memory canceled");
+
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id,
+
status);
+ return status;
});
SCOPED_TIMER(_spill_build_timer);
auto status = [&]() {
RETURN_IF_CATCH_EXCEPTION(
- _spill_to_disk(i, spilling_stream,
spill_context));
- return Status::OK();
+ return _spill_to_disk(i, spilling_stream,
spill_context));
}();
_state->get_query_ctx()->decrease_revoking_tasks_count();
- if (!status.ok()) {
- std::unique_lock<std::mutex> lock(_spill_lock);
- _spill_dependency->set_ready();
- _spill_status_ok = false;
- _spill_status = std::move(status);
- }
- });
+ return status;
+ },
+ spill_fin_cb);
if (st.ok()) {
st = spill_io_pool->submit(std::move(spill_runnable));
}
if (!st.ok()) {
- --_spilling_streams_count;
+ --_spilling_task_count;
return st;
}
}
- std::unique_lock<std::mutex> lock(_spill_lock);
- if (_spilling_streams_count > 0) {
+ if (_spilling_task_count > 0) {
_spill_dependency->block();
} else if (_child_eos) {
VLOG_DEBUG << "query:" << print_id(state->query_id()) << ", hash join
sink "
@@ -501,50 +464,24 @@ Status
PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,
return Status::OK();
}
-void PartitionedHashJoinSinkLocalState::_spill_to_disk(
+Status PartitionedHashJoinSinkLocalState::_spill_to_disk(
uint32_t partition_index, const vectorized::SpillStreamSPtr&
spilling_stream,
const std::shared_ptr<SpillContext>& spill_context) {
auto& partitioned_block =
_shared_state->partitioned_build_blocks[partition_index];
- if (_spill_status_ok) {
+ Status status = _shared_state->_spill_status.status();
+ if (status.ok()) {
auto block = partitioned_block->to_block();
int64_t block_mem_usage = block.allocated_bytes();
Defer defer {[&]() { COUNTER_UPDATE(memory_used_counter(),
-block_mem_usage); }};
partitioned_block =
vectorized::MutableBlock::create_unique(block.clone_empty());
- auto st = spilling_stream->spill_block(state(), block, false);
- if (!st.ok()) {
- _spill_status_ok = false;
- std::lock_guard<std::mutex> l(_spill_status_lock);
- _spill_status = st;
- }
+ status = spilling_stream->spill_block(state(), block, false);
}
VLOG_DEBUG << "query: " << print_id(_state->query_id()) << ", task: " <<
_state->task_id()
<< ", join sink " << _parent->node_id() << " revoke done";
- auto num = _spilling_streams_count.fetch_sub(1);
- DCHECK_GE(_spilling_streams_count, 0);
- if (num == 1) {
- std::unique_lock<std::mutex> lock(_spill_lock);
- _spill_dependency->set_ready();
- if (_child_eos) {
- VLOG_DEBUG << "query:" << print_id(this->state()->query_id()) <<
", hash join sink "
- << _parent->node_id() << " set_ready_to_read"
- << ", task id: " << state()->task_id();
- std::for_each(_shared_state->partitioned_build_blocks.begin(),
- _shared_state->partitioned_build_blocks.end(),
[&](auto& block) {
- if (block) {
- COUNTER_UPDATE(_in_mem_rows_counter,
block->rows());
- }
- });
- _spill_status = _finish_spilling();
- _dependency->set_ready_to_read();
- }
-
- if (spill_context) {
- spill_context->on_task_finished();
- }
- }
+ return status;
}
PartitionedHashJoinSinkOperatorX::PartitionedHashJoinSinkOperatorX(
@@ -634,9 +571,8 @@ Status PartitionedHashJoinSinkOperatorX::sink(RuntimeState*
state, vectorized::B
CHECK_EQ(local_state._spill_dependency->is_blocked_by(nullptr), nullptr);
local_state.inc_running_big_mem_op_num(state);
SCOPED_TIMER(local_state.exec_time_counter());
- if (!local_state._spill_status_ok) {
- DCHECK_NE(local_state._spill_status.code(), 0);
- return local_state._spill_status;
+ if (!local_state._shared_state->_spill_status.ok()) {
+ return local_state._shared_state->_spill_status.status();
}
local_state._child_eos = eos;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index aaa6d64adf9..8a844e69963 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -53,9 +53,9 @@ protected:
PartitionedHashJoinSinkLocalState(DataSinkOperatorXBase* parent,
RuntimeState* state)
:
PipelineXSpillSinkLocalState<PartitionedHashJoinSharedState>(parent, state) {}
- void _spill_to_disk(uint32_t partition_index,
- const vectorized::SpillStreamSPtr& spilling_stream,
- const std::shared_ptr<SpillContext>& spill_context);
+ Status _spill_to_disk(uint32_t partition_index,
+ const vectorized::SpillStreamSPtr& spilling_stream,
+ const std::shared_ptr<SpillContext>& spill_context);
Status _partition_block(RuntimeState* state, vectorized::Block* in_block,
size_t begin,
size_t end);
@@ -67,17 +67,10 @@ protected:
friend class PartitionedHashJoinSinkOperatorX;
- std::atomic_int _spilling_streams_count {0};
- std::atomic<bool> _spill_status_ok {true};
- std::mutex _spill_lock;
-
vectorized::Block _pending_block;
bool _child_eos {false};
- Status _spill_status;
- std::mutex _spill_status_lock;
-
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
std::unique_ptr<RuntimeProfile> _internal_runtime_profile;
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 5d8355b865d..48a1b2b6ec3 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -144,7 +144,7 @@ Status SpillSortSinkOperatorX::revoke_memory(RuntimeState*
state,
size_t SpillSortSinkOperatorX::revocable_mem_size(RuntimeState* state) const {
auto& local_state = get_local_state(state);
- if (!local_state.Base::_shared_state->sink_status.ok()) {
+ if (!local_state._shared_state->_spill_status.ok()) {
return UINT64_MAX;
}
return
_sort_sink_operator->get_revocable_mem_size(local_state._runtime_state.get());
@@ -155,7 +155,9 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState*
state, vectorized::Bloc
auto& local_state = get_local_state(state);
local_state.inc_running_big_mem_op_num(state);
SCOPED_TIMER(local_state.exec_time_counter());
- RETURN_IF_ERROR(local_state.Base::_shared_state->sink_status);
+ if (!local_state._shared_state->_spill_status.ok()) {
+ return local_state._shared_state->_spill_status.status();
+ }
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
if (in_block->rows() > 0) {
local_state._shared_state->update_spill_block_batch_row_count(in_block);
@@ -198,14 +200,15 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
VLOG_DEBUG << "query " << print_id(state->query_id()) << " sort node "
<< Base::_parent->node_id() << " revoke_memory"
<< ", eos: " << _eos;
- RETURN_IF_ERROR(Base::_shared_state->sink_status);
+ if (!_shared_state->_spill_status.ok()) {
+ return _shared_state->_spill_status.status();
+ }
auto status =
ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
state, _spilling_stream, print_id(state->query_id()), "sort",
_parent->node_id(),
_shared_state->spill_block_batch_row_count,
SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES, profile());
RETURN_IF_ERROR(status);
- _spilling_stream->set_write_counters(_profile);
_shared_state->sorted_streams.emplace_back(_spilling_stream);
@@ -218,12 +221,12 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
auto query_id = state->query_id();
auto spill_func = [this, state, query_id, &parent] {
+ Status status;
Defer defer {[&]() {
- if (!_shared_state->sink_status.ok() || state->is_cancelled()) {
- if (!_shared_state->sink_status.ok()) {
+ if (!status.ok() || state->is_cancelled()) {
+ if (!status.ok()) {
LOG(WARNING) << "query " << print_id(query_id) << " sort
node "
- << _parent->node_id()
- << " revoke memory error: " <<
_shared_state->sink_status;
+ << _parent->node_id() << " revoke memory
error: " << status;
}
_shared_state->close();
} else {
@@ -231,7 +234,7 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState*
state,
<< " revoke memory finish";
}
- if (!_shared_state->sink_status.ok()) {
+ if (!status.ok()) {
_shared_state->close();
}
@@ -240,14 +243,11 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
if (_eos) {
_dependency->set_ready_to_read();
_finish_dependency->set_ready();
- } else {
- _spill_dependency->Dependency::set_ready();
}
}};
- _shared_state->sink_status =
-
parent._sort_sink_operator->prepare_for_spill(_runtime_state.get());
- RETURN_IF_ERROR(_shared_state->sink_status);
+ status =
parent._sort_sink_operator->prepare_for_spill(_runtime_state.get());
+ RETURN_IF_ERROR(status);
auto* sink_local_state = _runtime_state->get_sink_local_state();
update_profile(sink_local_state->profile());
@@ -257,13 +257,13 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
while (!eos && !state->is_cancelled()) {
{
SCOPED_TIMER(_spill_merge_sort_timer);
- _shared_state->sink_status =
parent._sort_sink_operator->merge_sort_read_for_spill(
+ status = parent._sort_sink_operator->merge_sort_read_for_spill(
_runtime_state.get(), &block,
_shared_state->spill_block_batch_row_count,
&eos);
}
- RETURN_IF_ERROR(_shared_state->sink_status);
- _shared_state->sink_status = _spilling_stream->spill_block(state,
block, eos);
- RETURN_IF_ERROR(_shared_state->sink_status);
+ RETURN_IF_ERROR(status);
+ status = _spilling_stream->spill_block(state, block, eos);
+ RETURN_IF_ERROR(status);
block.clear_column_data();
}
parent._sort_sink_operator->reset(_runtime_state.get());
@@ -271,33 +271,18 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
return Status::OK();
};
- MonotonicStopWatch submit_timer;
- submit_timer.start();
-
- auto exception_catch_func = [this, query_id, spill_context, submit_timer,
spill_func]() {
- auto submit_elapsed_time = submit_timer.elapsed_time();
- _spill_write_wait_in_queue_timer->update(submit_elapsed_time);
- exec_time_counter()->update(submit_elapsed_time);
- _spill_total_timer->update(submit_elapsed_time);
-
- SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_spill_total_timer);
- SCOPED_TIMER(_spill_write_timer);
-
+ auto exception_catch_func = [query_id, spill_context, spill_func]() {
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_cancel",
{
- ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
- query_id, Status::InternalError("fault_inject
spill_sort_sink "
- "revoke_memory canceled"));
- return;
+ auto status = Status::InternalError(
+ "fault_inject spill_sort_sink "
+ "revoke_memory canceled");
+ ExecEnv::GetInstance()->fragment_mgr()->cancel_query(query_id,
status);
+ return status;
});
- _shared_state->sink_status = [&]() {
- RETURN_IF_CATCH_EXCEPTION({ return spill_func(); });
- }();
+ auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func();
}); }();
- if (spill_context) {
- spill_context->on_task_finished();
- }
+ return status;
};
DBUG_EXECUTE_IF("fault_inject::spill_sort_sink::revoke_memory_submit_func", {
@@ -307,10 +292,15 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state,
});
if (status.ok()) {
state->get_query_ctx()->increase_revoking_tasks_count();
+
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
+ _spilling_task_count = 1;
status =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
- std::make_shared<SpillRunnable>(state, _profile, true,
-
_shared_state->shared_from_this(),
- exception_catch_func));
+ std::make_shared<SpillRunnable>(
+ state, spill_context, _spilling_task_count, _profile,
submit_timer,
+ _shared_state->shared_from_this(), _spill_dependency,
true, true,
+ exception_catch_func));
}
if (!status.ok()) {
if (!_eos) {
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index 5cc124caaea..2308c49d893 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -81,11 +81,12 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
auto spill_func = [this, state, query_id, &parent] {
SCOPED_TIMER(_spill_merge_sort_timer);
+ Status status;
Defer defer {[&]() {
- if (!_status.ok() || state->is_cancelled()) {
- if (!_status.ok()) {
+ if (!status.ok() || state->is_cancelled()) {
+ if (!status.ok()) {
LOG(WARNING) << "query " << print_id(query_id) << " sort
node "
- << _parent->node_id() << " merge spill data
error: " << _status;
+ << _parent->node_id() << " merge spill data
error: " << status;
}
_shared_state->close();
for (auto& stream : _current_merging_streams) {
@@ -96,7 +97,6 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
VLOG_DEBUG << "query " << print_id(query_id) << " sort node "
<< _parent->node_id()
<< " merge spill data finish";
}
- _spill_dependency->Dependency::set_ready();
}};
vectorized::Block merge_sorted_block;
vectorized::SpillStreamSPtr tmp_stream;
@@ -108,11 +108,11 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
<< ", curren merge max stream count: " <<
max_stream_count;
{
SCOPED_TIMER(Base::_spill_recover_time);
- _status = _create_intermediate_merger(
+ status = _create_intermediate_merger(
max_stream_count,
parent._sort_source_operator->get_sort_description(_runtime_state.get()));
}
- RETURN_IF_ERROR(_status);
+ RETURN_IF_ERROR(status);
// all the remaining streams can be merged in a run
if (_shared_state->sorted_streams.empty()) {
@@ -120,12 +120,11 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
}
{
- _status =
ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
+ status =
ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
state, tmp_stream, print_id(state->query_id()),
"sort", _parent->node_id(),
_shared_state->spill_block_batch_row_count,
SpillSortSharedState::SORT_BLOCK_SPILL_BATCH_BYTES,
profile());
- RETURN_IF_ERROR(_status);
- tmp_stream->set_write_counters(profile());
+ RETURN_IF_ERROR(status);
_shared_state->sorted_streams.emplace_back(tmp_stream);
@@ -135,24 +134,24 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
{
SCOPED_TIMER(Base::_spill_recover_time);
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::recover_spill_data", {
- _status = Status::Error<INTERNAL_ERROR>(
+ status = Status::Error<INTERNAL_ERROR>(
"fault_inject spill_sort_source "
"recover_spill_data failed");
});
- if (_status.ok()) {
- _status = _merger->get_next(&merge_sorted_block,
&eos);
+ if (status.ok()) {
+ status = _merger->get_next(&merge_sorted_block,
&eos);
}
}
- RETURN_IF_ERROR(_status);
- _status = tmp_stream->spill_block(state,
merge_sorted_block, eos);
- if (_status.ok()) {
+ RETURN_IF_ERROR(status);
+ status = tmp_stream->spill_block(state,
merge_sorted_block, eos);
+ if (status.ok()) {
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::spill_merged_data", {
- _status = Status::Error<INTERNAL_ERROR>(
+ status = Status::Error<INTERNAL_ERROR>(
"fault_inject spill_sort_source "
"spill_merged_data failed");
});
}
- RETURN_IF_ERROR(_status);
+ RETURN_IF_ERROR(status);
}
}
for (auto& stream : _current_merging_streams) {
@@ -163,20 +162,9 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
return Status::OK();
};
- MonotonicStopWatch submit_timer;
- submit_timer.start();
-
- auto exception_catch_func = [this, spill_func, submit_timer]() {
- auto submit_elapsed_time = submit_timer.elapsed_time();
- _spill_read_wait_in_queue_timer->update(submit_elapsed_time);
- exec_time_counter()->update(submit_elapsed_time);
- _spill_total_timer->update(submit_elapsed_time);
-
- SCOPED_TIMER(exec_time_counter());
- SCOPED_TIMER(_spill_total_timer);
- SCOPED_TIMER(_spill_recover_time);
-
- _status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); });
}();
+ auto exception_catch_func = [spill_func]() {
+ auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func();
}); }();
+ return status;
};
DBUG_EXECUTE_IF("fault_inject::spill_sort_source::merge_sort_spill_data_submit_func",
{
@@ -184,10 +172,15 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
"fault_inject spill_sort_source "
"merge_sort_spill_data submit_func failed");
});
+
+ MonotonicStopWatch submit_timer;
+ submit_timer.start();
+ _spilling_task_count = 1;
return
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit(
- std::make_shared<SpillRunnable>(state, _runtime_profile.get(),
false,
- _shared_state->shared_from_this(),
- exception_catch_func));
+ std::make_shared<SpillRunnable>(state, nullptr,
_spilling_task_count,
+ _runtime_profile.get(),
submit_timer,
+ _shared_state->shared_from_this(),
_spill_dependency,
+ false, false,
exception_catch_func));
}
Status SpillSortLocalState::_create_intermediate_merger(
@@ -261,8 +254,9 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState*
state, vectorized::Bloc
bool* eos) {
auto& local_state = get_local_state(state);
local_state.copy_shared_spill_profile();
+ Status status;
Defer defer {[&]() {
- if (!local_state._status.ok() || *eos) {
+ if (!status.ok() || *eos) {
local_state._shared_state->close();
for (auto& stream : local_state._current_merging_streams) {
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
@@ -272,20 +266,21 @@ Status SpillSortSourceOperatorX::get_block(RuntimeState*
state, vectorized::Bloc
}};
local_state.inc_running_big_mem_op_num(state);
SCOPED_TIMER(local_state.exec_time_counter());
- RETURN_IF_ERROR(local_state._status);
+ status = local_state._shared_state->_spill_status.status();
+ RETURN_IF_ERROR(status);
if (local_state._shared_state->is_spilled) {
if (!local_state._merger) {
- local_state._status =
local_state.initiate_merge_sort_spill_streams(state);
- return local_state._status;
+ status = local_state.initiate_merge_sort_spill_streams(state);
+ return status;
} else {
- local_state._status = local_state._merger->get_next(block, eos);
- RETURN_IF_ERROR(local_state._status);
+ SCOPED_TIMER(local_state._spill_total_timer);
+ status = local_state._merger->get_next(block, eos);
+ RETURN_IF_ERROR(status);
}
} else {
- local_state._status =
-
_sort_source_operator->get_block(local_state._runtime_state.get(), block, eos);
- RETURN_IF_ERROR(local_state._status);
+ status =
_sort_source_operator->get_block(local_state._runtime_state.get(), block, eos);
+ RETURN_IF_ERROR(status);
}
local_state.reached_limit(block, eos);
return Status::OK();
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.h
b/be/src/pipeline/exec/spill_sort_source_operator.h
index 7536dd15e92..a7b8e8efde8 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.h
+++ b/be/src/pipeline/exec/spill_sort_source_operator.h
@@ -54,7 +54,6 @@ protected:
std::unique_ptr<RuntimeState> _runtime_state;
bool _opened = false;
- Status _status;
int64_t _external_sort_bytes_threshold = 134217728; // 128M
std::vector<vectorized::SpillStreamSPtr> _current_merging_streams;
diff --git a/be/src/pipeline/exec/spill_utils.h
b/be/src/pipeline/exec/spill_utils.h
index 2ea5cedcdb0..687779badbb 100644
--- a/be/src/pipeline/exec/spill_utils.h
+++ b/be/src/pipeline/exec/spill_utils.h
@@ -79,21 +79,40 @@ private:
class SpillRunnable : public Runnable {
public:
- SpillRunnable(RuntimeState* state, RuntimeProfile* profile, bool is_write,
- const std::shared_ptr<BasicSharedState>& shared_state,
std::function<void()> func)
- : _state(state),
+ SpillRunnable(RuntimeState* state, std::shared_ptr<SpillContext>
spill_context,
+ std::atomic_int& spilling_task_count, RuntimeProfile*
profile,
+ MonotonicStopWatch submit_timer,
+ const std::shared_ptr<BasicSpillSharedState>& shared_state,
+ std::shared_ptr<Dependency> spill_dependency, bool is_sink,
bool is_write,
+ std::function<Status()> spill_exec_func,
+ std::function<Status()> spill_fin_cb = {})
+ : _is_sink(is_sink),
_is_write(is_write),
+ _state(state),
+ _spill_context(std::move(spill_context)),
+ _spilling_task_count(spilling_task_count),
+ _spill_dependency(std::move(spill_dependency)),
+ _submit_timer(submit_timer),
_task_context_holder(state->get_task_execution_context()),
_shared_state_holder(shared_state),
- _func(std::move(func)) {
- write_wait_in_queue_task_count =
profile->get_counter("SpillWriteTaskWaitInQueueCount");
- writing_task_count = profile->get_counter("SpillWriteTaskCount");
- read_wait_in_queue_task_count =
profile->get_counter("SpillReadTaskWaitInQueueCount");
- reading_task_count = profile->get_counter("SpillReadTaskCount");
+ _spill_exec_func(std::move(spill_exec_func)),
+ _spill_fin_cb(std::move(spill_fin_cb)) {
+ _exec_timer = profile->get_counter("ExecTime");
+ _spill_total_timer = profile->get_counter("SpillTotalTime");
+
+ _spill_write_timer = profile->get_counter("SpillWriteTime");
+ _spill_write_wait_in_queue_timer =
profile->get_counter("SpillWriteTaskWaitInQueueTime");
+ _write_wait_in_queue_task_count =
profile->get_counter("SpillWriteTaskWaitInQueueCount");
+ _writing_task_count = profile->get_counter("SpillWriteTaskCount");
+
+ _spill_revover_timer = profile->get_counter("SpillRecoverTime");
+ _spill_read_wait_in_queue_timer =
profile->get_counter("SpillReadTaskWaitInQueueTime");
+ _read_wait_in_queue_task_count =
profile->get_counter("SpillReadTaskWaitInQueueCount");
+ _reading_task_count = profile->get_counter("SpillReadTaskCount");
if (is_write) {
- COUNTER_UPDATE(write_wait_in_queue_task_count, 1);
+ COUNTER_UPDATE(_write_wait_in_queue_task_count, 1);
} else {
- COUNTER_UPDATE(read_wait_in_queue_task_count, 1);
+ COUNTER_UPDATE(_read_wait_in_queue_task_count, 1);
}
}
@@ -106,22 +125,46 @@ public:
if (!task_context_holder) {
return;
}
+
+ auto submit_elapsed_time = _submit_timer.elapsed_time();
+ if (_is_write) {
+ _spill_write_wait_in_queue_timer->update(submit_elapsed_time);
+ } else {
+ _spill_read_wait_in_queue_timer->update(submit_elapsed_time);
+ }
+ _exec_timer->update(submit_elapsed_time);
+ _spill_total_timer->update(submit_elapsed_time);
+
+ SCOPED_TIMER(_exec_timer);
+ SCOPED_TIMER(_spill_total_timer);
+
+ std::shared_ptr<ScopedTimer<MonotonicStopWatch>> write_or_read_timer;
if (_is_write) {
- COUNTER_UPDATE(write_wait_in_queue_task_count, -1);
- COUNTER_UPDATE(writing_task_count, 1);
+ write_or_read_timer =
+
std::make_shared<ScopedTimer<MonotonicStopWatch>>(_spill_write_timer);
+ COUNTER_UPDATE(_write_wait_in_queue_task_count, -1);
+ COUNTER_UPDATE(_writing_task_count, 1);
} else {
- COUNTER_UPDATE(read_wait_in_queue_task_count, -1);
- COUNTER_UPDATE(reading_task_count, 1);
+ write_or_read_timer =
+
std::make_shared<ScopedTimer<MonotonicStopWatch>>(_spill_revover_timer);
+ COUNTER_UPDATE(_read_wait_in_queue_task_count, -1);
+ COUNTER_UPDATE(_reading_task_count, 1);
}
SCOPED_ATTACH_TASK(_state);
Defer defer([&] {
if (_is_write) {
- COUNTER_UPDATE(writing_task_count, -1);
+ COUNTER_UPDATE(_writing_task_count, -1);
} else {
- COUNTER_UPDATE(reading_task_count, -1);
+ COUNTER_UPDATE(_reading_task_count, -1);
+ }
+ {
+ std::function<Status()> tmp;
+ std::swap(tmp, _spill_exec_func);
+ }
+ {
+ std::function<Status()> tmp;
+ std::swap(tmp, _spill_fin_cb);
}
- std::function<void()> tmp;
- std::swap(tmp, _func);
});
auto shared_state_holder = _shared_state_holder.lock();
@@ -132,19 +175,53 @@ public:
if (_state->is_cancelled()) {
return;
}
- _func();
+ shared_state_holder->_spill_status.update(_spill_exec_func());
+
+ auto num = _spilling_task_count.fetch_sub(1);
+ DCHECK_GE(_spilling_task_count, 0);
+
+ if (num == 1) {
+ if (_spill_fin_cb) {
+ shared_state_holder->_spill_status.update(_spill_fin_cb());
+ }
+ if (_spill_context) {
+ if (_is_sink) {
+ _spill_context->on_task_finished();
+ } else {
+ _spill_context->on_non_sink_task_finished();
+ }
+ }
+ _spill_dependency->set_ready();
+ }
}
private:
- RuntimeState* _state;
+ bool _is_sink;
bool _is_write;
- RuntimeProfile::Counter* write_wait_in_queue_task_count = nullptr;
- RuntimeProfile::Counter* writing_task_count = nullptr;
- RuntimeProfile::Counter* read_wait_in_queue_task_count = nullptr;
- RuntimeProfile::Counter* reading_task_count = nullptr;
+ RuntimeState* _state;
+ std::shared_ptr<SpillContext> _spill_context;
+ std::atomic_int& _spilling_task_count;
+ std::shared_ptr<Dependency> _spill_dependency;
+
+ MonotonicStopWatch _submit_timer;
+
+ RuntimeProfile::Counter* _exec_timer = nullptr;
+ RuntimeProfile::Counter* _spill_total_timer;
+
+ RuntimeProfile::Counter* _spill_write_timer;
+ RuntimeProfile::Counter* _spill_write_wait_in_queue_timer = nullptr;
+ RuntimeProfile::Counter* _write_wait_in_queue_task_count = nullptr;
+ RuntimeProfile::Counter* _writing_task_count = nullptr;
+
+ RuntimeProfile::Counter* _spill_revover_timer;
+ RuntimeProfile::Counter* _spill_read_wait_in_queue_timer = nullptr;
+ RuntimeProfile::Counter* _read_wait_in_queue_task_count = nullptr;
+ RuntimeProfile::Counter* _reading_task_count = nullptr;
+
std::weak_ptr<TaskExecutionContext> _task_context_holder;
- std::weak_ptr<BasicSharedState> _shared_state_holder;
- std::function<void()> _func;
+ std::weak_ptr<BasicSpillSharedState> _shared_state_holder;
+ std::function<Status()> _spill_exec_func;
+ std::function<Status()> _spill_fin_cb;
};
} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/vec/spill/spill_stream.cpp
b/be/src/vec/spill/spill_stream.cpp
index 6f9143b8073..14f200a92e5 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -96,6 +96,7 @@ void SpillStream::gc() {
Status SpillStream::prepare() {
writer_ =
std::make_unique<SpillWriter>(profile_, stream_id_, batch_rows_,
data_dir_, spill_dir_);
+ _set_write_counters(profile_);
reader_ = std::make_unique<SpillReader>(stream_id_,
writer_->get_file_path());
diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h
index 5be151be72c..6d1fcf779ed 100644
--- a/be/src/vec/spill/spill_stream.h
+++ b/be/src/vec/spill/spill_stream.h
@@ -62,8 +62,6 @@ public:
Status read_next_block_sync(Block* block, bool* eos);
- void set_write_counters(RuntimeProfile* profile) {
writer_->set_counters(profile); }
-
void set_read_counters(RuntimeProfile* profile) {
reader_->set_counters(profile); }
void update_shared_profiles(RuntimeProfile* source_op_profile);
@@ -75,6 +73,8 @@ private:
Status prepare();
+ void _set_write_counters(RuntimeProfile* profile) {
writer_->set_counters(profile); }
+
RuntimeState* state_ = nullptr;
int64_t stream_id_;
SpillDataDir* data_dir_ = nullptr;
diff --git a/be/src/vec/spill/spill_writer.cpp
b/be/src/vec/spill/spill_writer.cpp
index 34715787756..bf1ab900156 100644
--- a/be/src/vec/spill/spill_writer.cpp
+++ b/be/src/vec/spill/spill_writer.cpp
@@ -50,8 +50,10 @@ Status SpillWriter::close() {
}
total_written_bytes_ += meta_.size();
- COUNTER_UPDATE(_write_file_data_bytes_counter, meta_.size());
-
+ COUNTER_UPDATE(_write_file_total_size, meta_.size());
+ if (_write_file_current_size) {
+ COUNTER_UPDATE(_write_file_current_size, meta_.size());
+ }
data_dir_->update_spill_data_usage(meta_.size());
RETURN_IF_ERROR(file_writer_->close());
@@ -145,7 +147,10 @@ Status SpillWriter::_write_internal(const Block& block,
size_t& written_bytes) {
max_sub_block_size_ = std::max(max_sub_block_size_,
(size_t)buff_size);
meta_.append((const char*)&total_written_bytes_,
sizeof(size_t));
- COUNTER_UPDATE(_write_file_data_bytes_counter, buff_size);
+ COUNTER_UPDATE(_write_file_total_size, buff_size);
+ if (_write_file_current_size) {
+ COUNTER_UPDATE(_write_file_current_size, buff_size);
+ }
COUNTER_UPDATE(_write_block_counter, 1);
total_written_bytes_ += buff_size;
++written_blocks_;
diff --git a/be/src/vec/spill/spill_writer.h b/be/src/vec/spill/spill_writer.h
index a6ea3200b14..372ce21bb17 100644
--- a/be/src/vec/spill/spill_writer.h
+++ b/be/src/vec/spill/spill_writer.h
@@ -60,7 +60,8 @@ public:
_serialize_timer =
profile->get_counter("SpillWriteSerializeBlockTime");
_write_block_counter = profile->get_counter("SpillWriteBlockCount");
_write_block_bytes_counter =
profile->get_counter("SpillWriteBlockDataSize");
- _write_file_data_bytes_counter =
profile->get_counter("SpillWriteFileTotalSize");
+ _write_file_total_size =
profile->get_counter("SpillWriteFileTotalSize");
+ _write_file_current_size =
profile->get_counter("SpillWriteFileCurrentSize");
_write_rows_counter = profile->get_counter("SpillWriteRows");
}
@@ -85,7 +86,8 @@ private:
RuntimeProfile::Counter* _serialize_timer = nullptr;
RuntimeProfile::Counter* _write_block_counter = nullptr;
RuntimeProfile::Counter* _write_block_bytes_counter = nullptr;
- RuntimeProfile::Counter* _write_file_data_bytes_counter = nullptr;
+ RuntimeProfile::Counter* _write_file_total_size = nullptr;
+ RuntimeProfile::Counter* _write_file_current_size = nullptr;
RuntimeProfile::Counter* _write_rows_counter = nullptr;
RuntimeProfile::Counter* _memory_used_counter = nullptr;
RuntimeProfile::HighWaterMarkCounter* _peak_memory_usage_counter = nullptr;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]