This is an automated email from the ASF dual-hosted git repository.
mrhhsg pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 351ba4aeb2c [opt](spill) handle oom exception in spill tasks (#35025)
(#35171)
351ba4aeb2c is described below
commit 351ba4aeb2c9a1859db6442758ea0d1446191c35
Author: Jerry Hu <[email protected]>
AuthorDate: Mon Jul 15 10:33:33 2024 +0800
[opt](spill) handle oom exception in spill tasks (#35025) (#35171)
---
.../pipeline/exec/partition_sort_sink_operator.cpp | 73 +++--
.../exec/partitioned_aggregation_sink_operator.cpp | 17 +-
.../partitioned_aggregation_source_operator.cpp | 145 ++++-----
.../exec/partitioned_hash_join_probe_operator.cpp | 341 +++++++--------------
.../exec/partitioned_hash_join_probe_operator.h | 5 +-
.../exec/partitioned_hash_join_sink_operator.cpp | 60 ++--
be/src/pipeline/exec/spill_sort_sink_operator.cpp | 143 ++++-----
.../pipeline/exec/spill_sort_source_operator.cpp | 35 ++-
.../exec/streaming_aggregation_operator.cpp | 130 ++++----
be/src/vec/spill/spill_stream.cpp | 8 +-
be/src/vec/spill/spill_stream.h | 4 +
11 files changed, 458 insertions(+), 503 deletions(-)
diff --git a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
index abe2fde555e..f820914b33e 100644
--- a/be/src/pipeline/exec/partition_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/partition_sort_sink_operator.cpp
@@ -180,42 +180,49 @@ Status
PartitionSortSinkOperatorX::_emplace_into_hash_table(
const vectorized::ColumnRawPtrs& key_columns, const vectorized::Block*
input_block,
PartitionSortSinkLocalState& local_state, bool eos) {
return std::visit(
- [&](auto&& agg_method) -> Status {
- SCOPED_TIMER(local_state._build_timer);
- using HashMethodType = std::decay_t<decltype(agg_method)>;
- using AggState = typename HashMethodType::State;
+ vectorized::Overload {
+ [&](std::monostate& arg) -> Status {
+ return Status::InternalError("Unit hash table");
+ },
+ [&](auto& agg_method) -> Status {
+ SCOPED_TIMER(local_state._build_timer);
+ using HashMethodType =
std::decay_t<decltype(agg_method)>;
+ using AggState = typename HashMethodType::State;
- AggState state(key_columns);
- size_t num_rows = input_block->rows();
- agg_method.init_serialized_keys(key_columns, num_rows);
+ AggState state(key_columns);
+ size_t num_rows = input_block->rows();
+ agg_method.init_serialized_keys(key_columns, num_rows);
- auto creator = [&](const auto& ctor, auto& key, auto& origin) {
- HashMethodType::try_presis_key(key, origin,
*local_state._agg_arena_pool);
- auto* aggregate_data = _pool->add(new
vectorized::PartitionBlocks(
- local_state._partition_sort_info,
local_state._value_places.empty()));
- local_state._value_places.push_back(aggregate_data);
- ctor(key, aggregate_data);
- local_state._num_partition++;
- };
- auto creator_for_null_key = [&](auto& mapped) {
- mapped = _pool->add(new vectorized::PartitionBlocks(
- local_state._partition_sort_info,
local_state._value_places.empty()));
- local_state._value_places.push_back(mapped);
- local_state._num_partition++;
- };
+ auto creator = [&](const auto& ctor, auto& key, auto&
origin) {
+ HashMethodType::try_presis_key(key, origin,
+
*local_state._agg_arena_pool);
+ auto* aggregate_data = _pool->add(new
vectorized::PartitionBlocks(
+ local_state._partition_sort_info,
+ local_state._value_places.empty()));
+
local_state._value_places.push_back(aggregate_data);
+ ctor(key, aggregate_data);
+ local_state._num_partition++;
+ };
+ auto creator_for_null_key = [&](auto& mapped) {
+ mapped = _pool->add(new
vectorized::PartitionBlocks(
+ local_state._partition_sort_info,
+ local_state._value_places.empty()));
+ local_state._value_places.push_back(mapped);
+ local_state._num_partition++;
+ };
- SCOPED_TIMER(local_state._emplace_key_timer);
- for (size_t row = 0; row < num_rows; ++row) {
- auto& mapped =
- agg_method.lazy_emplace(state, row, creator,
creator_for_null_key);
- mapped->add_row_idx(row);
- }
- for (auto* place : local_state._value_places) {
- SCOPED_TIMER(local_state._selector_block_timer);
-
RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos));
- }
- return Status::OK();
- },
+ SCOPED_TIMER(local_state._emplace_key_timer);
+ for (size_t row = 0; row < num_rows; ++row) {
+ auto& mapped = agg_method.lazy_emplace(state, row,
creator,
+
creator_for_null_key);
+ mapped->add_row_idx(row);
+ }
+ for (auto* place : local_state._value_places) {
+ SCOPED_TIMER(local_state._selector_block_timer);
+
RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos));
+ }
+ return Status::OK();
+ }},
local_state._partitioned_data->method_variant);
}
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index b610e1b9ed3..92cd341de19 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -298,12 +298,17 @@ Status
PartitionedAggSinkLocalState::revoke_memory(RuntimeState* state) {
}};
auto* runtime_state = _runtime_state.get();
auto* agg_data =
parent._agg_sink_operator->get_agg_data(runtime_state);
- Base::_shared_state->sink_status = std::visit(
- [&](auto&& agg_method) -> Status {
- auto& hash_table = *agg_method.hash_table;
- return _spill_hash_table(state, agg_method,
hash_table, _eos);
- },
- agg_data->method_variant);
+ Base::_shared_state->sink_status =
+ std::visit(vectorized::Overload {
+ [&](std::monostate& arg) -> Status {
+ return
Status::InternalError("Unit hash table");
+ },
+ [&](auto& agg_method) -> Status {
+ auto& hash_table =
*agg_method.hash_table;
+
RETURN_IF_CATCH_EXCEPTION(return _spill_hash_table(
+ state, agg_method,
hash_table, _eos));
+ }},
+ agg_data->method_variant);
RETURN_IF_ERROR(Base::_shared_state->sink_status);
Base::_shared_state->sink_status =
parent._agg_sink_operator->reset_hash_table(runtime_state);
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 43c805b9557..6d871451bfd 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -214,80 +214,83 @@ Status
PartitionedAggLocalState::initiate_merge_spill_partition_agg_data(Runtime
MonotonicStopWatch submit_timer;
submit_timer.start();
+ auto spill_func = [this, state, query_id, execution_context, submit_timer]
{
+ _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+ Defer defer {[&]() {
+ if (!_status.ok() || state->is_cancelled()) {
+ if (!_status.ok()) {
+ LOG(WARNING) << "query " << print_id(query_id) << " agg
node "
+ << _parent->node_id()
+ << " merge spilled agg data error: " <<
_status;
+ }
+ _shared_state->close();
+ } else if (_shared_state->spill_partitions.empty()) {
+ VLOG_DEBUG << "query " << print_id(query_id) << " agg node "
<< _parent->node_id()
+ << " merge spilled agg data finish";
+ }
+
Base::_shared_state->in_mem_shared_state->aggregate_data_container->init_once();
+ _is_merging = false;
+ _dependency->Dependency::set_ready();
+ }};
+ bool has_agg_data = false;
+ auto& parent = Base::_parent->template cast<Parent>();
+ while (!state->is_cancelled() && !has_agg_data &&
+ !_shared_state->spill_partitions.empty()) {
+ for (auto& stream :
_shared_state->spill_partitions[0]->spill_streams_) {
+ stream->set_read_counters(Base::_spill_read_data_time,
+ Base::_spill_deserialize_time,
Base::_spill_read_bytes,
+ Base::_spill_read_wait_io_timer);
+ vectorized::Block block;
+ bool eos = false;
+ while (!eos && !state->is_cancelled()) {
+ {
+ SCOPED_TIMER(Base::_spill_recover_time);
+ _status = stream->read_next_block_sync(&block, &eos);
+ }
+ RETURN_IF_ERROR(_status);
+
+ if (!block.empty()) {
+ has_agg_data = true;
+ _status = parent._agg_source_operator
+
->merge_with_serialized_key_helper<false>(
+ _runtime_state.get(),
&block);
+ RETURN_IF_ERROR(_status);
+ }
+ }
+
(void)ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(stream);
+ }
+ _shared_state->spill_partitions.pop_front();
+ }
+ if (_shared_state->spill_partitions.empty()) {
+ _shared_state->close();
+ }
+ return _status;
+ };
+
+ auto exception_catch_func = [spill_func, query_id, mem_tracker,
shared_state_holder,
+ execution_context, this]() {
+ SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+ std::shared_ptr<TaskExecutionContext> execution_context_lock;
+ auto shared_state_sptr = shared_state_holder.lock();
+ if (shared_state_sptr) {
+ execution_context_lock = execution_context.lock();
+ }
+ if (!shared_state_sptr || !execution_context_lock) {
+ LOG(INFO) << "query " << print_id(query_id)
+ << " execution_context released, maybe query was
cancelled.";
+ return;
+ }
+
+ auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func();
}); }();
+
+ if (!status.ok()) {
+ _status = status;
+ }
+ };
RETURN_IF_ERROR(
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
- [this, state, query_id, mem_tracker, shared_state_holder,
execution_context,
- submit_timer] {
- SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
- std::shared_ptr<TaskExecutionContext>
execution_context_lock;
- auto shared_state_sptr = shared_state_holder.lock();
- if (shared_state_sptr) {
- execution_context_lock = execution_context.lock();
- }
- if (!shared_state_sptr || !execution_context_lock) {
- LOG(INFO) << "query " << print_id(query_id)
- << " execution_context released, maybe
query was cancelled.";
- // FIXME: return status is meaningless?
- return Status::Cancelled("Cancelled");
- }
-
-
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
- Defer defer {[&]() {
- if (!_status.ok() || state->is_cancelled()) {
- if (!_status.ok()) {
- LOG(WARNING) << "query " <<
print_id(query_id) << " agg node "
- << _parent->node_id()
- << " merge spilled agg data
error: " << _status;
- }
- _shared_state->close();
- } else if
(_shared_state->spill_partitions.empty()) {
- VLOG_DEBUG << "query " << print_id(query_id)
<< " agg node "
- << _parent->node_id()
- << " merge spilled agg data finish";
- }
-
Base::_shared_state->in_mem_shared_state->aggregate_data_container
- ->init_once();
- _is_merging = false;
- _dependency->Dependency::set_ready();
- }};
- bool has_agg_data = false;
- auto& parent = Base::_parent->template cast<Parent>();
- while (!state->is_cancelled() && !has_agg_data &&
- !_shared_state->spill_partitions.empty()) {
- for (auto& stream :
-
_shared_state->spill_partitions[0]->spill_streams_) {
- stream->set_read_counters(
- Base::_spill_read_data_time,
Base::_spill_deserialize_time,
- Base::_spill_read_bytes,
Base::_spill_read_wait_io_timer);
- vectorized::Block block;
- bool eos = false;
- while (!eos && !state->is_cancelled()) {
- {
-
SCOPED_TIMER(Base::_spill_recover_time);
- _status =
stream->read_next_block_sync(&block, &eos);
- }
- RETURN_IF_ERROR(_status);
-
- if (!block.empty()) {
- has_agg_data = true;
- _status = parent._agg_source_operator
-
->merge_with_serialized_key_helper<false>(
-
_runtime_state.get(), &block);
- RETURN_IF_ERROR(_status);
- }
- }
- (void)ExecEnv::GetInstance()
- ->spill_stream_mgr()
- ->delete_spill_stream(stream);
- }
- _shared_state->spill_partitions.pop_front();
- }
- if (_shared_state->spill_partitions.empty()) {
- _shared_state->close();
- }
- return _status;
- }));
+ exception_catch_func));
return Status::OK();
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 628be711b41..fc006764452 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -153,28 +153,7 @@ Status
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
return Status::OK();
}
-Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState*
state,
- uint32_t
partition_index) {
- auto& partitioned_build_blocks = _shared_state->partitioned_build_blocks;
- auto& mutable_block = partitioned_build_blocks[partition_index];
- if (!mutable_block ||
- mutable_block->allocated_bytes() <
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
- --_spilling_task_count;
- return Status::OK();
- }
-
- auto& build_spilling_stream =
_shared_state->spilled_streams[partition_index];
- if (!build_spilling_stream) {
-
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
- state, build_spilling_stream, print_id(state->query_id()),
"hash_build_sink",
- _parent->id(), std::numeric_limits<int32_t>::max(),
- std::numeric_limits<size_t>::max(), _runtime_profile.get()));
- RETURN_IF_ERROR(build_spilling_stream->prepare_spill());
-
build_spilling_stream->set_write_counters(_spill_serialize_block_timer,
_spill_block_count,
- _spill_data_size,
_spill_write_disk_timer,
- _spill_write_wait_io_timer);
- }
-
+Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState*
state) {
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
auto execution_context = state->get_task_execution_context();
/// Resources in shared state will be released when the operator is closed,
@@ -182,14 +161,56 @@ Status
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
/// So, we need hold the pointer of shared state.
std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
_shared_state->shared_from_this();
+
auto query_id = state->query_id();
auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
MonotonicStopWatch submit_timer;
submit_timer.start();
- return spill_io_pool->submit_func([query_id, mem_tracker,
shared_state_holder,
- execution_context, state,
&build_spilling_stream,
- &mutable_block, submit_timer, this] {
+ auto spill_func = [query_id, state, submit_timer, this] {
+ _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+ SCOPED_TIMER(_spill_probe_timer);
+
+ auto& p = _parent->cast<PartitionedHashJoinProbeOperatorX>();
+ for (uint32_t partition_index = 0; partition_index !=
p._partition_count;
+ ++partition_index) {
+ auto& blocks = _probe_blocks[partition_index];
+ auto& partitioned_block = _partitioned_blocks[partition_index];
+ if (partitioned_block && partitioned_block->allocated_bytes() >=
+
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
+ blocks.emplace_back(partitioned_block->to_block());
+ partitioned_block.reset();
+ }
+
+ auto& spilling_stream = _probe_spilling_streams[partition_index];
+ if (!spilling_stream) {
+
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
+ state, spilling_stream, print_id(state->query_id()),
"hash_probe",
+ _parent->id(), std::numeric_limits<int32_t>::max(),
+ std::numeric_limits<size_t>::max(),
_runtime_profile.get()));
+ RETURN_IF_ERROR(spilling_stream->prepare_spill());
+ spilling_stream->set_write_counters(
+ _spill_serialize_block_timer, _spill_block_count,
_spill_data_size,
+ _spill_write_disk_timer, _spill_write_wait_io_timer);
+ }
+
+ COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
+ while (!blocks.empty() && !state->is_cancelled()) {
+ auto block = std::move(blocks.back());
+ blocks.pop_back();
+ RETURN_IF_ERROR(spilling_stream->spill_block(state, block,
false));
+ COUNTER_UPDATE(_spill_probe_rows, block.rows());
+ }
+ }
+ VLOG_DEBUG << "query: " << print_id(query_id)
+ << " hash probe revoke done, node: " << p.node_id()
+ << ", task: " << state->task_id();
+ _dependency->set_ready();
+ return Status::OK();
+ };
+
+ auto exception_catch_func = [query_id, mem_tracker, shared_state_holder,
execution_context,
+ spill_func, this]() {
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
std::shared_ptr<TaskExecutionContext> execution_context_lock;
auto shared_state_sptr = shared_state_holder.lock();
@@ -201,116 +222,18 @@ Status
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
<< " execution_context released, maybe query was
cancelled.";
return;
}
- _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
- SCOPED_TIMER(_spill_build_timer);
- if (_spill_status_ok) {
- auto build_block = mutable_block->to_block();
- DCHECK_EQ(mutable_block->rows(), 0);
- auto st = build_spilling_stream->spill_block(state, build_block,
false);
- if (!st.ok()) {
- std::unique_lock<std::mutex> lock(_spill_lock);
- _spill_status_ok = false;
- _spill_status = std::move(st);
- } else {
- COUNTER_UPDATE(_spill_build_rows, build_block.rows());
- COUNTER_UPDATE(_spill_build_blocks, 1);
- }
- }
-
- std::unique_lock<std::mutex> lock(_spill_lock);
- if (_spilling_task_count.fetch_sub(1) == 1) {
- LOG(INFO) << "hash probe " << _parent->id()
- << " revoke memory spill_build_block finish";
- _dependency->set_ready();
- }
- });
-}
-Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState*
state,
- uint32_t
partition_index) {
- auto& spilling_stream = _probe_spilling_streams[partition_index];
- if (!spilling_stream) {
-
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
- state, spilling_stream, print_id(state->query_id()),
"hash_probe", _parent->id(),
- std::numeric_limits<int32_t>::max(),
std::numeric_limits<size_t>::max(),
- _runtime_profile.get()));
- RETURN_IF_ERROR(spilling_stream->prepare_spill());
- spilling_stream->set_write_counters(_spill_serialize_block_timer,
_spill_block_count,
- _spill_data_size,
_spill_write_disk_timer,
- _spill_write_wait_io_timer);
- }
+ auto status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func();
}); }();
- auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
-
- auto& blocks = _probe_blocks[partition_index];
- auto& partitioned_block = _partitioned_blocks[partition_index];
- if (partitioned_block && partitioned_block->allocated_bytes() >=
-
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
- blocks.emplace_back(partitioned_block->to_block());
- partitioned_block.reset();
- }
-
- if (!blocks.empty()) {
- auto execution_context = state->get_task_execution_context();
- /// Resources in shared state will be released when the operator is
closed,
- /// but there may be asynchronous spilling tasks at this time, which
can lead to conflicts.
- /// So, we need hold the pointer of shared state.
- std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
- _shared_state->shared_from_this();
-
- auto query_id = state->query_id();
- auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
-
- MonotonicStopWatch submit_timer;
- submit_timer.start();
- return spill_io_pool->submit_func([query_id, mem_tracker,
shared_state_holder,
- execution_context, state, &blocks,
spilling_stream,
- submit_timer, this] {
- SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
- std::shared_ptr<TaskExecutionContext> execution_context_lock;
- auto shared_state_sptr = shared_state_holder.lock();
- if (shared_state_sptr) {
- execution_context_lock = execution_context.lock();
- }
- if (!shared_state_sptr || !execution_context_lock) {
- LOG(INFO) << "query: " << print_id(query_id)
- << " execution_context released, maybe query was
cancelled.";
- return;
- }
- _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
- SCOPED_TIMER(_spill_probe_timer);
- COUNTER_UPDATE(_spill_probe_blocks, blocks.size());
- while (!blocks.empty() && !state->is_cancelled()) {
- auto block = std::move(blocks.back());
- blocks.pop_back();
- if (_spill_status_ok) {
- auto st = spilling_stream->spill_block(state, block,
false);
- if (!st.ok()) {
- std::unique_lock<std::mutex> lock(_spill_lock);
- _spill_status_ok = false;
- _spill_status = std::move(st);
- break;
- }
- COUNTER_UPDATE(_spill_probe_rows, block.rows());
- } else {
- break;
- }
- }
-
- std::unique_lock<std::mutex> lock(_spill_lock);
- if (_spilling_task_count.fetch_sub(1) == 1) {
- LOG(INFO) << "hash probe " << _parent->id()
- << " revoke memory spill_probe_blocks finish";
- _dependency->set_ready();
- }
- });
- } else {
- std::unique_lock<std::mutex> lock(_spill_lock);
- if (_spilling_task_count.fetch_sub(1) == 1) {
- _dependency->set_ready();
+ if (!status.ok()) {
+ _spill_status_ok = false;
+ _spill_status = std::move(status);
}
- }
- return Status::OK();
+ _dependency->set_ready();
+ };
+
+ _dependency->block();
+ return spill_io_pool->submit_func(exception_catch_func);
}
Status PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t
partition_index) {
@@ -361,16 +284,10 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
MonotonicStopWatch submit_timer;
submit_timer.start();
- auto read_func = [this, query_id, mem_tracker, state, spilled_stream =
spilled_stream,
- &mutable_block, shared_state_holder, execution_context,
submit_timer,
- partition_index] {
- SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
- std::shared_ptr<TaskExecutionContext> execution_context_lock;
+ auto read_func = [this, query_id, state, spilled_stream = spilled_stream,
&mutable_block,
+ shared_state_holder, submit_timer, partition_index] {
auto shared_state_sptr = shared_state_holder.lock();
- if (shared_state_sptr) {
- execution_context_lock = execution_context.lock();
- }
- if (!shared_state_sptr || !execution_context_lock ||
state->is_cancelled()) {
+ if (!shared_state_sptr || state->is_cancelled()) {
LOG(INFO) << "query: " << print_id(query_id)
<< " execution_context released, maybe query was
cancelled.";
return;
@@ -378,15 +295,12 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
SCOPED_TIMER(_recovery_build_timer);
- Defer defer([this] { --_spilling_task_count; });
- DCHECK_EQ(_spill_status_ok.load(), true);
bool eos = false;
while (!eos) {
vectorized::Block block;
auto st = spilled_stream->read_next_block_sync(&block, &eos);
if (!st.ok()) {
- std::unique_lock<std::mutex> lock(_spill_lock);
_spill_status_ok = false;
_spill_status = std::move(st);
break;
@@ -409,7 +323,6 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
DCHECK_EQ(mutable_block->columns(), block.columns());
st = mutable_block->merge(std::move(block));
if (!st.ok()) {
- std::unique_lock<std::mutex> lock(_spill_lock);
_spill_status_ok = false;
_spill_status = std::move(st);
break;
@@ -425,16 +338,36 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
_dependency->set_ready();
};
+ auto exception_catch_func = [read_func, query_id, mem_tracker,
shared_state_holder,
+ execution_context, state, this]() {
+ SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+ std::shared_ptr<TaskExecutionContext> execution_context_lock;
+ auto shared_state_sptr = shared_state_holder.lock();
+ if (shared_state_sptr) {
+ execution_context_lock = execution_context.lock();
+ }
+ if (!shared_state_sptr || !execution_context_lock ||
state->is_cancelled()) {
+ LOG(INFO) << "query: " << print_id(query_id)
+ << " execution_context released, maybe query was
cancelled.";
+ return;
+ }
+
+ auto status = [&]() {
+ RETURN_IF_CATCH_EXCEPTION(read_func());
+ return Status::OK();
+ }();
+
+ if (!status.ok()) {
+ _spill_status_ok = false;
+ _spill_status = std::move(status);
+ }
+ };
+
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
has_data = true;
_dependency->block();
- ++_spilling_task_count;
- auto st = spill_io_pool->submit_func(read_func);
- if (!st.ok()) {
- --_spilling_task_count;
- }
- return st;
+ return spill_io_pool->submit_func(exception_catch_func);
}
std::string PartitionedHashJoinProbeLocalState::debug_string(int
indentation_level) const {
@@ -468,30 +401,14 @@ Status
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
MonotonicStopWatch submit_timer;
submit_timer.start();
- auto read_func = [this, query_id, mem_tracker, shared_state_holder,
execution_context,
- &spilled_stream, &blocks, submit_timer] {
- SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
- std::shared_ptr<TaskExecutionContext> execution_context_lock;
- auto shared_state_sptr = shared_state_holder.lock();
- if (shared_state_sptr) {
- execution_context_lock = execution_context.lock();
- }
- if (!shared_state_sptr || !execution_context_lock) {
- LOG(INFO) << "query: " << print_id(query_id)
- << " execution_context released, maybe query was
cancelled.";
- return;
- }
-
+ auto read_func = [this, query_id, &spilled_stream, &blocks, submit_timer] {
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
SCOPED_TIMER(_recovery_probe_timer);
- Defer defer([this] { --_spilling_task_count; });
- DCHECK_EQ(_spill_status_ok.load(), true);
vectorized::Block block;
bool eos = false;
auto st = spilled_stream->read_next_block_sync(&block, &eos);
if (!st.ok()) {
- std::unique_lock<std::mutex> lock(_spill_lock);
_spill_status_ok = false;
_spill_status = std::move(st);
} else {
@@ -510,16 +427,36 @@ Status
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
_dependency->set_ready();
};
+ auto exception_catch_func = [read_func, mem_tracker, shared_state_holder,
execution_context,
+ query_id, this]() {
+ SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+ std::shared_ptr<TaskExecutionContext> execution_context_lock;
+ auto shared_state_sptr = shared_state_holder.lock();
+ if (shared_state_sptr) {
+ execution_context_lock = execution_context.lock();
+ }
+ if (!shared_state_sptr || !execution_context_lock) {
+ LOG(INFO) << "query: " << print_id(query_id)
+ << " execution_context released, maybe query was
cancelled.";
+ return;
+ }
+
+ auto status = [&]() {
+ RETURN_IF_CATCH_EXCEPTION(read_func());
+ return Status::OK();
+ }();
+
+ if (!status.ok()) {
+ _spill_status_ok = false;
+ _spill_status = std::move(status);
+ }
+ };
+
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
DCHECK(spill_io_pool != nullptr);
_dependency->block();
has_data = true;
- ++_spilling_task_count;
- auto st = spill_io_pool->submit_func(read_func);
- if (!st.ok()) {
- --_spilling_task_count;
- }
- return st;
+ return spill_io_pool->submit_func(exception_catch_func);
}
PartitionedHashJoinProbeOperatorX::PartitionedHashJoinProbeOperatorX(ObjectPool*
pool,
@@ -701,15 +638,6 @@ Status
PartitionedHashJoinProbeOperatorX::pull(doris::RuntimeState* state,
return local_state._spill_status;
}
- if (_should_revoke_memory(state)) {
- bool wait_for_io = false;
- RETURN_IF_ERROR((const_cast<PartitionedHashJoinProbeOperatorX*>(this))
- ->_revoke_memory(state, wait_for_io));
- if (wait_for_io) {
- return Status::OK();
- }
- }
-
const auto partition_index = local_state._partition_cursor;
auto& probe_blocks = local_state._probe_blocks[partition_index];
if (local_state._need_to_setup_internal_operators) {
@@ -792,20 +720,8 @@ bool
PartitionedHashJoinProbeOperatorX::need_data_from_children(RuntimeState* st
size_t PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState*
state) const {
auto& local_state = get_local_state(state);
size_t mem_size = 0;
- uint32_t spilling_start = local_state._child_eos ?
local_state._partition_cursor + 1 : 0;
- DCHECK_GE(spilling_start, local_state._partition_cursor);
-
- auto& partitioned_build_blocks =
local_state._shared_state->partitioned_build_blocks;
auto& probe_blocks = local_state._probe_blocks;
- for (uint32_t i = spilling_start; i < _partition_count; ++i) {
- auto& build_block = partitioned_build_blocks[i];
- if (build_block) {
- auto block_bytes = build_block->allocated_bytes();
- if (block_bytes >=
vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
- mem_size += build_block->allocated_bytes();
- }
- }
-
+ for (uint32_t i = 0; i < _partition_count; ++i) {
for (auto& block : probe_blocks[i]) {
mem_size += block.allocated_bytes();
}
@@ -821,33 +737,12 @@ size_t
PartitionedHashJoinProbeOperatorX::revocable_mem_size(RuntimeState* state
return mem_size;
}
-Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state,
bool& wait_for_io) {
+Status PartitionedHashJoinProbeOperatorX::_revoke_memory(RuntimeState* state) {
auto& local_state = get_local_state(state);
- wait_for_io = false;
- uint32_t spilling_start = local_state._child_eos ?
local_state._partition_cursor + 1 : 0;
- DCHECK_GE(spilling_start, local_state._partition_cursor);
-
- if (_partition_count > spilling_start) {
- local_state._spilling_task_count = (_partition_count - spilling_start)
* 2;
- } else {
- return Status::OK();
- }
-
VLOG_DEBUG << "query: " << print_id(state->query_id()) << ", hash probe
node: " << id()
- << ", task: " << state->task_id()
- << ", revoke memory, spill task count: " <<
local_state._spilling_task_count;
- for (uint32_t i = spilling_start; i < _partition_count; ++i) {
- RETURN_IF_ERROR(local_state.spill_build_block(state, i));
- RETURN_IF_ERROR(local_state.spill_probe_blocks(state, i));
- }
+ << ", task: " << state->task_id();
- if (local_state._spilling_task_count > 0) {
- std::unique_lock<std::mutex> lock(local_state._spill_lock);
- if (local_state._spilling_task_count > 0) {
- local_state._dependency->block();
- wait_for_io = true;
- }
- }
+ RETURN_IF_ERROR(local_state.spill_probe_blocks(state));
return Status::OK();
}
@@ -893,11 +788,7 @@ Status
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
#endif
if (need_more_input_data(state)) {
if (need_to_spill && _should_revoke_memory(state)) {
- bool wait_for_io = false;
- RETURN_IF_ERROR(_revoke_memory(state, wait_for_io));
- if (wait_for_io) {
- return Status::OK();
- }
+ return _revoke_memory(state);
}
RETURN_IF_ERROR(_child_x->get_block_after_projects(state,
local_state._child_block.get(),
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 6be6c5a865b..db20efda67e 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -48,8 +48,7 @@ public:
Status open(RuntimeState* state) override;
Status close(RuntimeState* state) override;
- Status spill_build_block(RuntimeState* state, uint32_t partition_index);
- Status spill_probe_blocks(RuntimeState* state, uint32_t partition_index);
+ Status spill_probe_blocks(RuntimeState* state);
Status recovery_build_blocks_from_disk(RuntimeState* state, uint32_t
partition_index,
bool& has_data);
@@ -185,7 +184,7 @@ public:
}
private:
- Status _revoke_memory(RuntimeState* state, bool& wait_for_io);
+ Status _revoke_memory(RuntimeState* state);
friend class PartitionedHashJoinProbeLocalState;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index d253a519b0c..45ca975a88c 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -122,30 +122,15 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
/// So, we need hold the pointer of shared state.
std::weak_ptr<PartitionedHashJoinSharedState> shared_state_holder =
_shared_state->shared_from_this();
-
- _dependency->block();
auto query_id = state->query_id();
auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
- auto spill_func = [shared_state_holder, execution_context,
- build_blocks = std::move(build_blocks), state,
query_id, mem_tracker,
- num_slots, this]() mutable {
- SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+ auto spill_func = [build_blocks = std::move(build_blocks), state,
num_slots, this]() mutable {
Defer defer {[&]() {
// need to reset build_block here, or else build_block will be
destructed
// after SCOPED_ATTACH_TASK_WITH_ID and will trigger
memory_orphan_check failure
build_blocks.clear();
}};
- std::shared_ptr<TaskExecutionContext> execution_context_lock;
- auto shared_state_sptr = shared_state_holder.lock();
- if (shared_state_sptr) {
- execution_context_lock = execution_context.lock();
- }
- if (!shared_state_sptr || !execution_context_lock ||
state->is_cancelled()) {
- LOG(INFO) << "execution_context released, maybe query was
canceled.";
- return;
- }
-
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
std::vector<std::vector<uint32_t>>
partitions_indexes(p._partition_count);
@@ -228,8 +213,36 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
_dependency->set_ready();
};
+
+ auto exception_catch_func = [spill_func, shared_state_holder,
execution_context, state,
+ query_id, mem_tracker, this]() mutable {
+ SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+ std::shared_ptr<TaskExecutionContext> execution_context_lock;
+ auto shared_state_sptr = shared_state_holder.lock();
+ if (shared_state_sptr) {
+ execution_context_lock = execution_context.lock();
+ }
+ if (!shared_state_sptr || !execution_context_lock ||
state->is_cancelled()) {
+ LOG(INFO) << "execution_context released, maybe query was
canceled.";
+ return;
+ }
+
+ auto status = [&]() {
+ RETURN_IF_CATCH_EXCEPTION(spill_func());
+ return Status::OK();
+ }();
+
+ if (!status.ok()) {
+ std::unique_lock<std::mutex> lock(_spill_lock);
+ _spill_status = status;
+ _spill_status_ok = false;
+ _dependency->set_ready();
+ }
+ };
auto* thread_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
- return thread_pool->submit_func(spill_func);
+
+ _dependency->block();
+ return thread_pool->submit_func(exception_catch_func);
}
Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
@@ -288,7 +301,18 @@ Status
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
}
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
SCOPED_TIMER(_spill_build_timer);
- _spill_to_disk(i, spilling_stream);
+
+ auto status = [&]() {
+ RETURN_IF_CATCH_EXCEPTION(_spill_to_disk(i, spilling_stream));
+ return Status::OK();
+ }();
+
+ if (!status.OK()) {
+ std::unique_lock<std::mutex> lock(_spill_lock);
+ _dependency->set_ready();
+ _spill_status_ok = false;
+ _spill_status = std::move(status);
+ }
});
if (!st.ok()) {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index c6a943c59b5..004283841ec 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -238,78 +238,83 @@ Status
SpillSortSinkLocalState::revoke_memory(RuntimeState* state) {
MonotonicStopWatch submit_timer;
submit_timer.start();
- status =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
- [this, state, query_id, mem_tracker, shared_state_holder, &parent,
execution_context,
- submit_timer] {
- SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
- std::shared_ptr<TaskExecutionContext> execution_context_lock;
- auto shared_state_sptr = shared_state_holder.lock();
- if (shared_state_sptr) {
- execution_context_lock = execution_context.lock();
- }
- if (!shared_state_sptr || !execution_context_lock) {
- LOG(INFO) << "query " << print_id(query_id)
- << " execution_context released, maybe query was
cancelled.";
- return Status::OK();
+ auto spill_func = [this, state, query_id, &parent, submit_timer] {
+ _spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
+ Defer defer {[&]() {
+ if (!_shared_state->sink_status.ok() || state->is_cancelled()) {
+ if (!_shared_state->sink_status.ok()) {
+ LOG(WARNING) << "query " << print_id(query_id) << " sort
node " << _parent->id()
+ << " revoke memory error: " <<
_shared_state->sink_status;
}
+ _shared_state->close();
+ } else {
+ VLOG_DEBUG << "query " << print_id(query_id) << " sort node "
<< _parent->id()
+ << " revoke memory finish";
+ }
-
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
- Defer defer {[&]() {
- if (!_shared_state->sink_status.ok() ||
state->is_cancelled()) {
- if (!_shared_state->sink_status.ok()) {
- LOG(WARNING) << "query " << print_id(query_id) <<
" sort node "
- << _parent->id()
- << " revoke memory error: " <<
_shared_state->sink_status;
- }
- _shared_state->close();
- } else {
- VLOG_DEBUG << "query " << print_id(query_id) << " sort
node "
- << _parent->id() << " revoke memory finish";
- }
-
- if (!_shared_state->sink_status.ok()) {
- _shared_state->close();
- }
-
- _spilling_stream.reset();
- if (_eos) {
- _dependency->set_ready_to_read();
- _finish_dependency->set_ready();
- } else {
- _dependency->Dependency::set_ready();
- }
- }};
-
- _shared_state->sink_status =
-
parent._sort_sink_operator->prepare_for_spill(_runtime_state.get());
- RETURN_IF_ERROR(_shared_state->sink_status);
-
- auto* sink_local_state =
_runtime_state->get_sink_local_state();
- update_profile(sink_local_state->profile());
-
- bool eos = false;
- vectorized::Block block;
- while (!eos && !state->is_cancelled()) {
- {
- SCOPED_TIMER(_spill_merge_sort_timer);
- _shared_state->sink_status =
-
parent._sort_sink_operator->merge_sort_read_for_spill(
- _runtime_state.get(), &block,
-
_shared_state->spill_block_batch_row_count, &eos);
- }
- RETURN_IF_ERROR(_shared_state->sink_status);
- {
- SCOPED_TIMER(Base::_spill_timer);
- _shared_state->sink_status =
- _spilling_stream->spill_block(state, block,
eos);
- }
- RETURN_IF_ERROR(_shared_state->sink_status);
- block.clear_column_data();
- }
- parent._sort_sink_operator->reset(_runtime_state.get());
+ if (!_shared_state->sink_status.ok()) {
+ _shared_state->close();
+ }
+
+ _spilling_stream.reset();
+ if (_eos) {
+ _dependency->set_ready_to_read();
+ _finish_dependency->set_ready();
+ } else {
+ _dependency->Dependency::set_ready();
+ }
+ }};
+
+ _shared_state->sink_status =
+
parent._sort_sink_operator->prepare_for_spill(_runtime_state.get());
+ RETURN_IF_ERROR(_shared_state->sink_status);
+
+ auto* sink_local_state = _runtime_state->get_sink_local_state();
+ update_profile(sink_local_state->profile());
+
+ bool eos = false;
+ vectorized::Block block;
+ while (!eos && !state->is_cancelled()) {
+ {
+ SCOPED_TIMER(_spill_merge_sort_timer);
+ _shared_state->sink_status =
parent._sort_sink_operator->merge_sort_read_for_spill(
+ _runtime_state.get(), &block,
_shared_state->spill_block_batch_row_count,
+ &eos);
+ }
+ RETURN_IF_ERROR(_shared_state->sink_status);
+ {
+ SCOPED_TIMER(Base::_spill_timer);
+ _shared_state->sink_status =
_spilling_stream->spill_block(state, block, eos);
+ }
+ RETURN_IF_ERROR(_shared_state->sink_status);
+ block.clear_column_data();
+ }
+ parent._sort_sink_operator->reset(_runtime_state.get());
- return Status::OK();
- });
+ return Status::OK();
+ };
+
+ auto exception_catch_func = [this, query_id, mem_tracker,
shared_state_holder,
+ execution_context, spill_func]() {
+ SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+ std::shared_ptr<TaskExecutionContext> execution_context_lock;
+ auto shared_state_sptr = shared_state_holder.lock();
+ if (shared_state_sptr) {
+ execution_context_lock = execution_context.lock();
+ }
+ if (!shared_state_sptr || !execution_context_lock) {
+ LOG(INFO) << "query " << print_id(query_id)
+ << " execution_context released, maybe query was
cancelled.";
+ return;
+ }
+
+ _shared_state->sink_status = [&]() {
+ RETURN_IF_CATCH_EXCEPTION({ return spill_func(); });
+ }();
+ };
+
+ status =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
+ exception_catch_func);
if (!status.ok()) {
if (!_eos) {
Base::_dependency->Dependency::set_ready();
diff --git a/be/src/pipeline/exec/spill_sort_source_operator.cpp
b/be/src/pipeline/exec/spill_sort_source_operator.cpp
index fe6b4ee3efc..18a3d4310fd 100644
--- a/be/src/pipeline/exec/spill_sort_source_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_source_operator.cpp
@@ -98,20 +98,7 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
MonotonicStopWatch submit_timer;
submit_timer.start();
- auto spill_func = [this, state, query_id, mem_tracker, &parent,
shared_state_holder,
- execution_context, submit_timer] {
- SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
- std::shared_ptr<TaskExecutionContext> execution_context_lock;
- auto shared_state_sptr = shared_state_holder.lock();
- if (shared_state_sptr) {
- execution_context_lock = execution_context.lock();
- }
- if (!shared_state_sptr || !execution_context_lock) {
- LOG(INFO) << "query " << print_id(query_id)
- << " execution_context released, maybe query was
cancelled.";
- return Status::OK();
- }
-
+ auto spill_func = [this, state, query_id, &parent, submit_timer] {
_spill_wait_in_queue_timer->update(submit_timer.elapsed_time());
SCOPED_TIMER(_spill_merge_sort_timer);
Defer defer {[&]() {
@@ -185,8 +172,26 @@ Status
SpillSortLocalState::initiate_merge_sort_spill_streams(RuntimeState* stat
}
return Status::OK();
};
+
+ auto exception_catch_func = [this, query_id, mem_tracker,
shared_state_holder,
+ execution_context, spill_func]() {
+ SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
+ std::shared_ptr<TaskExecutionContext> execution_context_lock;
+ auto shared_state_sptr = shared_state_holder.lock();
+ if (shared_state_sptr) {
+ execution_context_lock = execution_context.lock();
+ }
+ if (!shared_state_sptr || !execution_context_lock) {
+ LOG(INFO) << "query " << print_id(query_id)
+ << " execution_context released, maybe query was
cancelled.";
+ return;
+ }
+
+ _status = [&]() { RETURN_IF_CATCH_EXCEPTION({ return spill_func(); });
}();
+ };
+
return
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->submit_func(
- spill_func);
+ exception_catch_func);
}
Status SpillSortLocalState::_create_intermediate_merger(
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 83952411f46..d7589f59f9f 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -672,68 +672,76 @@ Status
StreamingAggLocalState::_pre_agg_with_serialized_key(doris::vectorized::B
const bool used_too_much_memory =
spill_streaming_agg_mem_limit > 0 && _memory_usage() >
spill_streaming_agg_mem_limit;
RETURN_IF_ERROR(std::visit(
- [&](auto&& agg_method) -> Status {
- auto& hash_tbl = *agg_method.hash_table;
- /// If too much memory is used during the pre-aggregation
stage,
- /// it is better to output the data directly without
performing further aggregation.
- // do not try to do agg, just init and serialize directly
return the out_block
- if (used_too_much_memory ||
(hash_tbl.add_elem_size_overflow(rows) &&
-
!_should_expand_preagg_hash_tables())) {
- SCOPED_TIMER(_streaming_agg_timer);
- ret_flag = true;
-
- // will serialize value data to string column.
- // non-nullable column(id in `_make_nullable_keys`)
- // will be converted to nullable.
- bool mem_reuse = p._make_nullable_keys.empty() &&
out_block->mem_reuse();
-
- std::vector<vectorized::DataTypePtr> data_types;
- vectorized::MutableColumns value_columns;
- for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- auto data_type =
-
_aggregate_evaluators[i]->function()->get_serialized_type();
- if (mem_reuse) {
- value_columns.emplace_back(
- std::move(*out_block->get_by_position(i +
key_size).column)
- .mutate());
- } else {
- // slot type of value it should always be string
type
- value_columns.emplace_back(_aggregate_evaluators[i]
- ->function()
-
->create_serialize_column());
+ vectorized::Overload {
+ [&](std::monostate& arg) -> Status {
+ return Status::InternalError("Uninited hash table");
+ },
+ [&](auto& agg_method) -> Status {
+ auto& hash_tbl = *agg_method.hash_table;
+ /// If too much memory is used during the
pre-aggregation stage,
+ /// it is better to output the data directly without
performing further aggregation.
+ // do not try to do agg, just init and serialize
directly return the out_block
+ if (used_too_much_memory ||
(hash_tbl.add_elem_size_overflow(rows) &&
+
!_should_expand_preagg_hash_tables())) {
+ SCOPED_TIMER(_streaming_agg_timer);
+ ret_flag = true;
+
+ // will serialize value data to string column.
+ // non-nullable column(id in `_make_nullable_keys`)
+ // will be converted to nullable.
+ bool mem_reuse =
+ p._make_nullable_keys.empty() &&
out_block->mem_reuse();
+
+ std::vector<vectorized::DataTypePtr> data_types;
+ vectorized::MutableColumns value_columns;
+ for (int i = 0; i < _aggregate_evaluators.size();
++i) {
+ auto data_type =
+
_aggregate_evaluators[i]->function()->get_serialized_type();
+ if (mem_reuse) {
+ value_columns.emplace_back(
+
std::move(*out_block->get_by_position(i + key_size)
+ .column)
+ .mutate());
+ } else {
+ // slot type of value it should always be
string type
+
value_columns.emplace_back(_aggregate_evaluators[i]
+
->function()
+
->create_serialize_column());
+ }
+ data_types.emplace_back(data_type);
+ }
+
+ for (int i = 0; i != _aggregate_evaluators.size();
++i) {
+ SCOPED_TIMER(_serialize_data_timer);
+ RETURN_IF_ERROR(
+
_aggregate_evaluators[i]->streaming_agg_serialize_to_column(
+ in_block, value_columns[i],
rows,
+ _agg_arena_pool.get()));
+ }
+
+ if (!mem_reuse) {
+ vectorized::ColumnsWithTypeAndName
columns_with_schema;
+ for (int i = 0; i < key_size; ++i) {
+ columns_with_schema.emplace_back(
+
key_columns[i]->clone_resized(rows),
+
_probe_expr_ctxs[i]->root()->data_type(),
+
_probe_expr_ctxs[i]->root()->expr_name());
+ }
+ for (int i = 0; i < value_columns.size(); ++i)
{
+
columns_with_schema.emplace_back(std::move(value_columns[i]),
+
data_types[i], "");
+ }
+
out_block->swap(vectorized::Block(columns_with_schema));
+ } else {
+ for (int i = 0; i < key_size; ++i) {
+
std::move(*out_block->get_by_position(i).column)
+ .mutate()
+
->insert_range_from(*key_columns[i], 0, rows);
+ }
+ }
}
- data_types.emplace_back(data_type);
- }
-
- for (int i = 0; i != _aggregate_evaluators.size(); ++i) {
- SCOPED_TIMER(_serialize_data_timer);
-
RETURN_IF_ERROR(_aggregate_evaluators[i]->streaming_agg_serialize_to_column(
- in_block, value_columns[i], rows,
_agg_arena_pool.get()));
- }
-
- if (!mem_reuse) {
- vectorized::ColumnsWithTypeAndName columns_with_schema;
- for (int i = 0; i < key_size; ++i) {
- columns_with_schema.emplace_back(
- key_columns[i]->clone_resized(rows),
- _probe_expr_ctxs[i]->root()->data_type(),
- _probe_expr_ctxs[i]->root()->expr_name());
- }
- for (int i = 0; i < value_columns.size(); ++i) {
-
columns_with_schema.emplace_back(std::move(value_columns[i]),
- data_types[i],
"");
- }
-
out_block->swap(vectorized::Block(columns_with_schema));
- } else {
- for (int i = 0; i < key_size; ++i) {
- std::move(*out_block->get_by_position(i).column)
- .mutate()
- ->insert_range_from(*key_columns[i], 0,
rows);
- }
- }
- }
- return Status::OK();
- },
+ return Status::OK();
+ }},
_agg_data->method_variant));
if (!ret_flag) {
diff --git a/be/src/vec/spill/spill_stream.cpp
b/be/src/vec/spill/spill_stream.cpp
index b9c27a9d6ae..0ac9f95563e 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -42,13 +42,17 @@ SpillStream::SpillStream(RuntimeState* state, int64_t
stream_id, SpillDataDir* d
spill_dir_(std::move(spill_dir)),
batch_rows_(batch_rows),
batch_bytes_(batch_bytes),
+ query_id_(state->query_id()),
profile_(profile) {}
SpillStream::~SpillStream() {
bool exists = false;
auto status = io::global_local_filesystem()->exists(spill_dir_, &exists);
if (status.ok() && exists) {
- auto gc_dir = fmt::format("{}/{}/{}", get_data_dir()->path(),
SPILL_GC_DIR_PREFIX,
+ auto query_dir = fmt::format("{}/{}/{}", get_data_dir()->path(),
SPILL_GC_DIR_PREFIX,
+ print_id(query_id_));
+ (void)io::global_local_filesystem()->create_directory(query_dir);
+ auto gc_dir = fmt::format("{}/{}", query_dir,
std::filesystem::path(spill_dir_).filename().string());
(void)io::global_local_filesystem()->rename(spill_dir_, gc_dir);
}
@@ -62,7 +66,7 @@ Status SpillStream::prepare() {
}
const TUniqueId& SpillStream::query_id() const {
- return state_->query_id();
+ return query_id_;
}
const std::string& SpillStream::get_spill_root_dir() const {
diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h
index cadfa6fb6d4..8751b406608 100644
--- a/be/src/vec/spill/spill_stream.h
+++ b/be/src/vec/spill/spill_stream.h
@@ -40,6 +40,8 @@ public:
std::string spill_dir, size_t batch_rows, size_t batch_bytes,
RuntimeProfile* profile);
+ SpillStream() = delete;
+
~SpillStream();
int64_t id() const { return stream_id_; }
@@ -99,6 +101,8 @@ private:
SpillWriterUPtr writer_;
SpillReaderUPtr reader_;
+ TUniqueId query_id_;
+
RuntimeProfile* profile_ = nullptr;
RuntimeProfile::Counter* write_wait_io_timer_ = nullptr;
RuntimeProfile::Counter* read_wait_io_timer_ = nullptr;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]