This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 73c336a1ae6 [improvement](spill) avoid occuping too much memory while
spill build block during the hash join build phase (#33747)
73c336a1ae6 is described below
commit 73c336a1ae6aaf89b841bd7d4d58ef96e74df7fe
Author: Jerry Hu <[email protected]>
AuthorDate: Wed Apr 17 15:50:29 2024 +0800
[improvement](spill) avoid occuping too much memory while spill build block
during the hash join build phase (#33747)
---
.../exec/partitioned_hash_join_probe_operator.cpp | 3 +-
.../exec/partitioned_hash_join_probe_operator.h | 2 +-
.../exec/partitioned_hash_join_sink_operator.cpp | 166 ++++++++++++++++-----
.../exec/partitioned_hash_join_sink_operator.h | 4 +-
4 files changed, 135 insertions(+), 40 deletions(-)
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 35750ee5a55..78dcaf1e6c5 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -535,7 +535,7 @@ Status
PartitionedHashJoinProbeOperatorX::push(RuntimeState* state, vectorized::
}
std::vector<uint32_t> partition_indexes[_partition_count];
- auto* channel_ids =
reinterpret_cast<uint64_t*>(local_state._partitioner->get_channel_ids());
+ auto* channel_ids =
reinterpret_cast<uint32_t*>(local_state._partitioner->get_channel_ids());
for (uint32_t i = 0; i != rows; ++i) {
partition_indexes[channel_ids[i]].emplace_back(i);
}
@@ -862,6 +862,7 @@ Status
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
RETURN_IF_ERROR(
_inner_probe_operator->pull(local_state._runtime_state.get(), block, eos));
if (*eos) {
+ _update_profile_from_internal_states(local_state);
local_state._runtime_state.reset();
}
}
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 96a5cf96e34..5bdc5278ffc 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -33,7 +33,7 @@ class RuntimeState;
namespace pipeline {
-using PartitionerType = vectorized::XXHashPartitioner<LocalExchangeChannelIds>;
+using PartitionerType =
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>;
class PartitionedHashJoinProbeOperatorX;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index a0adf0505f8..c9d61757461 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -45,8 +45,21 @@ Status
PartitionedHashJoinSinkLocalState::init(doris::RuntimeState* state,
Status PartitionedHashJoinSinkLocalState::open(RuntimeState* state) {
RETURN_IF_ERROR(PipelineXSpillSinkLocalState::open(state));
+ auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+ for (uint32_t i = 0; i != p._partition_count; ++i) {
+ auto& spilling_stream = _shared_state->spilled_streams[i];
+
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
+ state, spilling_stream, print_id(state->query_id()),
+ fmt::format("hash_build_sink_{}", i), _parent->id(),
+ std::numeric_limits<int32_t>::max(),
std::numeric_limits<size_t>::max(), _profile));
+ RETURN_IF_ERROR(spilling_stream->prepare_spill());
+ spilling_stream->set_write_counters(_spill_serialize_block_timer,
_spill_block_count,
+ _spill_data_size,
_spill_write_disk_timer,
+ _spill_write_wait_io_timer);
+ }
return _partitioner->open(state);
}
+
Status PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status
exec_status) {
SCOPED_TIMER(PipelineXSpillSinkLocalState::exec_time_counter());
SCOPED_TIMER(PipelineXSpillSinkLocalState::_close_timer);
@@ -87,39 +100,127 @@ size_t
PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* state
return mem_size;
}
+Status
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeState*
state) {
+ _shared_state->need_to_spill = true;
+ auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+ _shared_state->inner_shared_state->hash_table_variants.reset();
+ auto row_desc = p._child_x->row_desc();
+ auto build_block =
std::move(_shared_state->inner_shared_state->build_block);
+ if (!build_block) {
+ build_block = vectorized::Block::create_shared();
+ auto inner_sink_state =
_shared_state->inner_runtime_state->get_sink_local_state();
+ if (inner_sink_state) {
+ auto& mutable_block =
reinterpret_cast<HashJoinBuildSinkLocalState*>(inner_sink_state)
+ ->_build_side_mutable_block;
+ *build_block = mutable_block.to_block();
+ LOG(INFO) << "hash join sink will revoke build mutable block: "
+ << build_block->allocated_bytes();
+ }
+ }
+
+ /// Here need to skip the first row in build block.
+ /// The first row in build block is generated by
`HashJoinBuildSinkOperatorX::sink`.
+ if (build_block->rows() <= 1) {
+ return Status::OK();
+ }
+
+ if (build_block->columns() > row_desc.num_slots()) {
+ build_block->erase(row_desc.num_slots());
+ }
+
+ {
+ /// TODO: DO NOT execute build exprs twice(when partition and building
hash table)
+ SCOPED_TIMER(_partition_timer);
+ RETURN_IF_ERROR(
+ _partitioner->do_partitioning(state, build_block.get(),
_mem_tracker.get()));
+ }
+
+ auto execution_context = state->get_task_execution_context();
+ _dependency->block();
+ auto spill_func = [execution_context, build_block, state, this]() {
+ auto execution_context_lock = execution_context.lock();
+ if (!execution_context_lock) {
+ LOG(INFO) << "execution_context released, maybe query was
cancelled.";
+ return;
+ }
+ auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
+ SCOPED_TIMER(_partition_shuffle_timer);
+ auto* channel_ids =
reinterpret_cast<uint32_t*>(_partitioner->get_channel_ids());
+
+ auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
+ std::vector<uint32_t> partition_indices;
+ const auto reserved_size = 4096;
+ partition_indices.reserve(reserved_size);
+
+ auto flush_rows = [&partition_indices, &build_block, &state, this](
+ std::unique_ptr<vectorized::MutableBlock>&
partition_block,
+ vectorized::SpillStreamSPtr&
spilling_stream) {
+ auto* begin = &(partition_indices[0]);
+ const auto count = partition_indices.size();
+ if (!partition_block) {
+ partition_block =
+
vectorized::MutableBlock::create_unique(build_block->clone_empty());
+ }
+ partition_block->add_rows(build_block.get(), begin, begin + count);
+ partition_indices.clear();
+
+ if (partition_block->allocated_bytes() >=
+ vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
+ auto block = partition_block->to_block();
+ partition_block =
+
vectorized::MutableBlock::create_unique(build_block->clone_empty());
+ auto status = spilling_stream->spill_block(state, block,
false);
+
+ if (!status.ok()) {
+ std::unique_lock<std::mutex> lock(_spill_lock);
+ _spill_status = status;
+ _spill_status_ok = false;
+ _dependency->set_ready();
+ return false;
+ }
+ }
+ return true;
+ };
+
+ for (uint32_t i = 0; i != p._partition_count; ++i) {
+ vectorized::SpillStreamSPtr& spilling_stream =
_shared_state->spilled_streams[i];
+ DCHECK(spilling_stream != nullptr);
+
+ const auto rows = build_block->rows();
+ for (size_t idx = 1; idx != rows; ++idx) {
+ if (channel_ids[idx] == i) {
+ partition_indices.emplace_back(idx);
+ } else {
+ continue;
+ }
+
+ const auto count = partition_indices.size();
+ if (UNLIKELY(count >= reserved_size)) {
+ if (!flush_rows(partitioned_blocks[i], spilling_stream)) {
+ break;
+ }
+ }
+ }
+
+ if (!partition_indices.empty()) {
+ flush_rows(partitioned_blocks[i], spilling_stream);
+ }
+ }
+
+ _dependency->set_ready();
+ };
+ auto* thread_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool();
+ return thread_pool->submit_func(spill_func);
+}
+
Status PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
LOG(INFO) << "hash join sink " << _parent->id() << " revoke_memory"
<< ", eos: " << _child_eos;
DCHECK_EQ(_spilling_streams_count, 0);
if (!_shared_state->need_to_spill) {
- profile()->add_info_string("Spilled", "true");
_shared_state->need_to_spill = true;
- auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
- _shared_state->inner_shared_state->hash_table_variants.reset();
- auto row_desc = p._child_x->row_desc();
- auto build_block =
std::move(_shared_state->inner_shared_state->build_block);
- if (!build_block) {
- build_block = vectorized::Block::create_shared();
- auto inner_sink_state =
_shared_state->inner_runtime_state->get_sink_local_state();
- if (inner_sink_state) {
- auto& mutable_block =
-
reinterpret_cast<HashJoinBuildSinkLocalState*>(inner_sink_state)
- ->_build_side_mutable_block;
- *build_block = mutable_block.to_block();
- LOG(INFO) << "hash join sink will revoke build mutable block: "
- << build_block->allocated_bytes();
- }
- }
-
- /// Here need to skip the first row in build block.
- /// The first row in build block is generated by
`HashJoinBuildSinkOperatorX::sink`.
- if (build_block->rows() > 1) {
- if (build_block->columns() > row_desc.num_slots()) {
- build_block->erase(row_desc.num_slots());
- }
- RETURN_IF_ERROR(_partition_block(state, build_block.get(), 1,
build_block->rows()));
- }
+ return _revoke_unpartitioned_block(state);
}
_spilling_streams_count = _shared_state->partitioned_build_blocks.size();
@@ -133,16 +234,7 @@ Status
PartitionedHashJoinSinkLocalState::revoke_memory(RuntimeState* state) {
continue;
}
- if (!spilling_stream) {
-
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
- state, spilling_stream, print_id(state->query_id()),
"hash_build_sink",
- _parent->id(), std::numeric_limits<int32_t>::max(),
- std::numeric_limits<size_t>::max(), _profile));
- RETURN_IF_ERROR(spilling_stream->prepare_spill());
- spilling_stream->set_write_counters(_spill_serialize_block_timer,
_spill_block_count,
- _spill_data_size,
_spill_write_disk_timer,
- _spill_write_wait_io_timer);
- }
+ DCHECK(spilling_stream != nullptr);
auto* spill_io_pool =
ExecEnv::GetInstance()->spill_stream_mgr()->get_async_task_thread_pool();
@@ -201,7 +293,7 @@ Status
PartitionedHashJoinSinkLocalState::_partition_block(RuntimeState* state,
auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
SCOPED_TIMER(_partition_shuffle_timer);
- auto* channel_ids =
reinterpret_cast<uint64_t*>(_partitioner->get_channel_ids());
+ auto* channel_ids =
reinterpret_cast<uint32_t*>(_partitioner->get_channel_ids());
std::vector<uint32_t> partition_indexes[p._partition_count];
DCHECK_LT(begin, end);
for (size_t i = begin; i != end; ++i) {
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 3a03c2fc724..3f29e3093b6 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -34,7 +34,7 @@ class RuntimeState;
namespace pipeline {
-using PartitionerType = vectorized::XXHashPartitioner<LocalExchangeChannelIds>;
+using PartitionerType =
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>;
class PartitionedHashJoinSinkOperatorX;
@@ -60,6 +60,8 @@ protected:
Status _partition_block(RuntimeState* state, vectorized::Block* in_block,
size_t begin,
size_t end);
+ Status _revoke_unpartitioned_block(RuntimeState* state);
+
friend class PartitionedHashJoinSinkOperatorX;
std::atomic_int _spilling_streams_count {0};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]