This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 85ae773996f [fix](spill) incorrect revocable mem size of hash join
(#34379)
85ae773996f is described below
commit 85ae773996f893df65145152156d9fbf21ccc94f
Author: Jerry Hu <[email protected]>
AuthorDate: Sun May 5 23:20:27 2024 +0800
[fix](spill) incorrect revocable mem size of hash join (#34379)
---
.../exec/partitioned_hash_join_probe_operator.cpp | 8 +-
.../exec/partitioned_hash_join_sink_operator.cpp | 160 +++++++++------------
2 files changed, 71 insertions(+), 97 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 24333e6ca1f..62339bb8513 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -155,6 +155,7 @@ Status
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState*
state,
uint32_t
partition_index) {
+ _shared_state_holder = _shared_state->shared_from_this();
auto& partitioned_build_blocks = _shared_state->partitioned_build_blocks;
auto& mutable_block = partitioned_build_blocks[partition_index];
if (!mutable_block ||
@@ -177,7 +178,6 @@ Status
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
auto execution_context = state->get_task_execution_context();
- _shared_state_holder = _shared_state->shared_from_this();
MonotonicStopWatch submit_timer;
submit_timer.start();
return spill_io_pool->submit_func(
@@ -217,6 +217,7 @@ Status
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState*
state,
uint32_t
partition_index) {
+ _shared_state_holder = _shared_state->shared_from_this();
auto& spilling_stream = _probe_spilling_streams[partition_index];
if (!spilling_stream) {
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
@@ -241,7 +242,6 @@ Status
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
if (!blocks.empty()) {
auto execution_context = state->get_task_execution_context();
- _shared_state_holder = _shared_state->shared_from_this();
MonotonicStopWatch submit_timer;
submit_timer.start();
return spill_io_pool->submit_func(
@@ -313,6 +313,7 @@ Status
PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(RuntimeState*
state,
uint32_t partition_index,
bool& has_data) {
+ _shared_state_holder = _shared_state->shared_from_this();
auto& spilled_stream = _shared_state->spilled_streams[partition_index];
has_data = false;
if (!spilled_stream) {
@@ -327,7 +328,6 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
}
auto execution_context = state->get_task_execution_context();
- _shared_state_holder = _shared_state->shared_from_this();
MonotonicStopWatch submit_timer;
submit_timer.start();
@@ -404,6 +404,7 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
Status
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(RuntimeState*
state,
uint32_t partition_index,
bool& has_data) {
+ _shared_state_holder = _shared_state->shared_from_this();
auto& spilled_stream = _probe_spilling_streams[partition_index];
has_data = false;
if (!spilled_stream) {
@@ -414,7 +415,6 @@ Status
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
/// TODO: maybe recovery more blocks each time.
auto execution_context = state->get_task_execution_context();
- _shared_state_holder = _shared_state->shared_from_this();
MonotonicStopWatch submit_timer;
submit_timer.start();
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index b2f2e9581f2..4f6bfe5f8c8 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -73,15 +73,12 @@ Status
PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status exec
size_t PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState*
state) const {
/// If no need to spill, all rows were sunk into the
`_inner_sink_operator` without partitioned.
if (!_shared_state->need_to_spill) {
- if (_shared_state->inner_shared_state &&
_shared_state->inner_shared_state->build_block) {
- return
_shared_state->inner_shared_state->build_block->allocated_bytes();
- } else if (_shared_state->inner_runtime_state) {
- auto inner_sink_state =
_shared_state->inner_runtime_state->get_sink_local_state();
-
- if (inner_sink_state) {
- auto& build_block =
reinterpret_cast<HashJoinBuildSinkLocalState*>(inner_sink_state)
- ->_build_side_mutable_block;
- return build_block.allocated_bytes();
+ if (_shared_state->inner_shared_state) {
+ auto inner_sink_state_ =
_shared_state->inner_runtime_state->get_sink_local_state();
+ if (inner_sink_state_) {
+ auto inner_sink_state =
+
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
+ return inner_sink_state->_build_side_mem_used;
}
}
return 0;
@@ -105,47 +102,30 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
_shared_state->inner_shared_state->hash_table_variants.reset();
auto row_desc = p._child_x->row_desc();
- auto build_block =
std::move(_shared_state->inner_shared_state->build_block);
- if (!build_block) {
- build_block = vectorized::Block::create_shared();
- auto inner_sink_state =
_shared_state->inner_runtime_state->get_sink_local_state();
- if (inner_sink_state) {
- auto& mutable_block =
reinterpret_cast<HashJoinBuildSinkLocalState*>(inner_sink_state)
- ->_build_side_mutable_block;
- *build_block = mutable_block.to_block();
- LOG(INFO) << "hash join sink will revoke build mutable block: "
- << build_block->allocated_bytes();
- }
+ std::vector<vectorized::Block> build_blocks;
+ auto inner_sink_state_ =
_shared_state->inner_runtime_state->get_sink_local_state();
+ if (inner_sink_state_) {
+ auto inner_sink_state =
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
+ build_blocks = std::move(inner_sink_state->_build_blocks);
}
- /// Here need to skip the first row in build block.
- /// The first row in build block is generated by
`HashJoinBuildSinkOperatorX::sink`.
- if (build_block->rows() <= 1) {
+ if (build_blocks.empty()) {
+ LOG(WARNING) << "has no data to revoke, node: " << _parent->node_id()
+ << ", task: " << state->task_id();
return Status::OK();
}
- if (build_block->columns() > row_desc.num_slots()) {
- build_block->erase(row_desc.num_slots());
- }
-
- {
- /// TODO: DO NOT execute build exprs twice(when partition and building
hash table)
- SCOPED_TIMER(_partition_timer);
- RETURN_IF_ERROR(
- _partitioner->do_partitioning(state, build_block.get(),
_mem_tracker.get()));
- }
-
auto execution_context = state->get_task_execution_context();
_dependency->block();
auto query_id = state->query_id();
auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
- auto spill_func = [execution_context, build_block, state, query_id,
mem_tracker,
- this]() mutable {
+ auto spill_func = [execution_context, build_blocks =
std::move(build_blocks), state, query_id,
+ mem_tracker, this]() mutable {
SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
Defer defer {[&]() {
// need to reset build_block here, or else build_block will be
destructed
// after SCOPED_ATTACH_TASK_WITH_ID and will trigger
memory_orphan_check failure
- build_block.reset();
+ build_blocks.clear();
}};
auto execution_context_lock = execution_context.lock();
@@ -155,77 +135,71 @@ Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
}
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
- SCOPED_TIMER(_partition_shuffle_timer);
- auto* channel_ids = _partitioner->get_channel_ids().get<uint32_t>();
-
auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
- std::vector<uint32_t> partition_indices;
+ std::vector<std::vector<uint32_t>>
partitions_indexes(p._partition_count);
+
const auto reserved_size = 4096;
- partition_indices.reserve(reserved_size);
-
- auto flush_rows = [&partition_indices, &build_block, &state, this](
- std::unique_ptr<vectorized::MutableBlock>&
partition_block,
- vectorized::SpillStreamSPtr&
spilling_stream) {
- auto* begin = &(partition_indices[0]);
- const auto count = partition_indices.size();
- if (!partition_block) {
- partition_block =
-
vectorized::MutableBlock::create_unique(build_block->clone_empty());
- }
- partition_block->add_rows(build_block.get(), begin, begin + count);
- partition_indices.clear();
-
- if (partition_block->allocated_bytes() >=
- vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
- auto block = partition_block->to_block();
- partition_block =
-
vectorized::MutableBlock::create_unique(build_block->clone_empty());
- auto status = spilling_stream->spill_block(state, block,
false);
-
- if (!status.ok()) {
- std::unique_lock<std::mutex> lock(_spill_lock);
- _spill_status = status;
- _spill_status_ok = false;
- _dependency->set_ready();
- return false;
- }
+ std::for_each(partitions_indexes.begin(), partitions_indexes.end(),
+ [](std::vector<uint32_t>& indices) {
indices.reserve(reserved_size); });
+
+ auto flush_rows = [&state,
this](std::unique_ptr<vectorized::MutableBlock>& partition_block,
+ vectorized::SpillStreamSPtr&
spilling_stream) {
+ auto block = partition_block->to_block();
+ auto status = spilling_stream->spill_block(state, block, false);
+
+ if (!status.ok()) {
+ std::unique_lock<std::mutex> lock(_spill_lock);
+ _spill_status = status;
+ _spill_status_ok = false;
+ _dependency->set_ready();
+ return false;
}
return true;
};
- for (uint32_t i = 0; i != p._partition_count; ++i) {
- vectorized::SpillStreamSPtr& spilling_stream =
_shared_state->spilled_streams[i];
- DCHECK(spilling_stream != nullptr);
+ for (size_t block_idx = 0; block_idx != build_blocks.size();
++block_idx) {
+ auto& build_block = build_blocks[block_idx];
+ const auto is_last_block = block_idx == build_blocks.size() - 1;
+ if (UNLIKELY(build_block.empty())) {
+ continue;
+ }
+ {
+ SCOPED_TIMER(_partition_timer);
+ (void)_partitioner->do_partitioning(state, &build_block,
_mem_tracker.get());
+ }
- if (UNLIKELY(state->is_cancelled())) {
- break;
+ auto* channel_ids =
_partitioner->get_channel_ids().get<uint32_t>();
+ for (size_t i = 0; i != build_block.rows(); ++i) {
+ partitions_indexes[channel_ids[i]].emplace_back(i);
}
- const auto rows = build_block->rows();
- for (size_t idx = 1; idx != rows; ++idx) {
- if (channel_ids[idx] == i) {
- partition_indices.emplace_back(idx);
- } else {
- continue;
+ for (uint32_t partition_idx = 0; partition_idx !=
p._partition_count; ++partition_idx) {
+ auto* begin = partitions_indexes[partition_idx].data();
+ auto* end = begin + partitions_indexes[partition_idx].size();
+ auto& partition_block = partitioned_blocks[partition_idx];
+ vectorized::SpillStreamSPtr& spilling_stream =
+ _shared_state->spilled_streams[partition_idx];
+ if (UNLIKELY(!partition_block)) {
+ partition_block =
+
vectorized::MutableBlock::create_unique(build_block.clone_empty());
}
- const auto count = partition_indices.size();
- if (UNLIKELY(count >= reserved_size)) {
- if (!flush_rows(partitioned_blocks[i], spilling_stream)) {
- break;
- }
+ {
+ SCOPED_TIMER(_partition_shuffle_timer);
+ partition_block->add_rows(&build_block, begin, end);
+ partitions_indexes[partition_idx].clear();
+ }
+
+ build_block.clear();
- if (UNLIKELY(state->is_cancelled())) {
- LOG(INFO) << "query was canceled.";
- partition_indices.clear();
- break;
+ if (partition_block->rows() >= reserved_size || is_last_block)
{
+ if (!flush_rows(partition_block, spilling_stream)) {
+ return;
}
+ partition_block =
+
vectorized::MutableBlock::create_unique(build_block.clone_empty());
}
}
-
- if (!partition_indices.empty()) {
- flush_rows(partitioned_blocks[i], spilling_stream);
- }
}
_dependency->set_ready();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]