This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 85ae773996f [fix](spill) incorrect revocable mem size of hash join 
(#34379)
85ae773996f is described below

commit 85ae773996f893df65145152156d9fbf21ccc94f
Author: Jerry Hu <[email protected]>
AuthorDate: Sun May 5 23:20:27 2024 +0800

    [fix](spill) incorrect revocable mem size of hash join (#34379)
---
 .../exec/partitioned_hash_join_probe_operator.cpp  |   8 +-
 .../exec/partitioned_hash_join_sink_operator.cpp   | 160 +++++++++------------
 2 files changed, 71 insertions(+), 97 deletions(-)

diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 24333e6ca1f..62339bb8513 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -155,6 +155,7 @@ Status 
PartitionedHashJoinProbeLocalState::close(RuntimeState* state) {
 
 Status PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* 
state,
                                                              uint32_t 
partition_index) {
+    _shared_state_holder = _shared_state->shared_from_this();
     auto& partitioned_build_blocks = _shared_state->partitioned_build_blocks;
     auto& mutable_block = partitioned_build_blocks[partition_index];
     if (!mutable_block ||
@@ -177,7 +178,6 @@ Status 
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
 
     auto* spill_io_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
     auto execution_context = state->get_task_execution_context();
-    _shared_state_holder = _shared_state->shared_from_this();
     MonotonicStopWatch submit_timer;
     submit_timer.start();
     return spill_io_pool->submit_func(
@@ -217,6 +217,7 @@ Status 
PartitionedHashJoinProbeLocalState::spill_build_block(RuntimeState* state
 
 Status PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* 
state,
                                                               uint32_t 
partition_index) {
+    _shared_state_holder = _shared_state->shared_from_this();
     auto& spilling_stream = _probe_spilling_streams[partition_index];
     if (!spilling_stream) {
         
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
@@ -241,7 +242,6 @@ Status 
PartitionedHashJoinProbeLocalState::spill_probe_blocks(RuntimeState* stat
 
     if (!blocks.empty()) {
         auto execution_context = state->get_task_execution_context();
-        _shared_state_holder = _shared_state->shared_from_this();
         MonotonicStopWatch submit_timer;
         submit_timer.start();
         return spill_io_pool->submit_func(
@@ -313,6 +313,7 @@ Status 
PartitionedHashJoinProbeLocalState::finish_spilling(uint32_t partition_in
 Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(RuntimeState*
 state,
                                                                            
uint32_t partition_index,
                                                                            
bool& has_data) {
+    _shared_state_holder = _shared_state->shared_from_this();
     auto& spilled_stream = _shared_state->spilled_streams[partition_index];
     has_data = false;
     if (!spilled_stream) {
@@ -327,7 +328,6 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
     }
 
     auto execution_context = state->get_task_execution_context();
-    _shared_state_holder = _shared_state->shared_from_this();
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
@@ -404,6 +404,7 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
 Status 
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(RuntimeState*
 state,
                                                                            
uint32_t partition_index,
                                                                            
bool& has_data) {
+    _shared_state_holder = _shared_state->shared_from_this();
     auto& spilled_stream = _probe_spilling_streams[partition_index];
     has_data = false;
     if (!spilled_stream) {
@@ -414,7 +415,6 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(Runti
 
     /// TODO: maybe recovery more blocks each time.
     auto execution_context = state->get_task_execution_context();
-    _shared_state_holder = _shared_state->shared_from_this();
 
     MonotonicStopWatch submit_timer;
     submit_timer.start();
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index b2f2e9581f2..4f6bfe5f8c8 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -73,15 +73,12 @@ Status 
PartitionedHashJoinSinkLocalState::close(RuntimeState* state, Status exec
 size_t PartitionedHashJoinSinkLocalState::revocable_mem_size(RuntimeState* 
state) const {
     /// If no need to spill, all rows were sunk into the 
`_inner_sink_operator` without partitioned.
     if (!_shared_state->need_to_spill) {
-        if (_shared_state->inner_shared_state && 
_shared_state->inner_shared_state->build_block) {
-            return 
_shared_state->inner_shared_state->build_block->allocated_bytes();
-        } else if (_shared_state->inner_runtime_state) {
-            auto inner_sink_state = 
_shared_state->inner_runtime_state->get_sink_local_state();
-
-            if (inner_sink_state) {
-                auto& build_block = 
reinterpret_cast<HashJoinBuildSinkLocalState*>(inner_sink_state)
-                                            ->_build_side_mutable_block;
-                return build_block.allocated_bytes();
+        if (_shared_state->inner_shared_state) {
+            auto inner_sink_state_ = 
_shared_state->inner_runtime_state->get_sink_local_state();
+            if (inner_sink_state_) {
+                auto inner_sink_state =
+                        
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
+                return inner_sink_state->_build_side_mem_used;
             }
         }
         return 0;
@@ -105,47 +102,30 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
     auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
     _shared_state->inner_shared_state->hash_table_variants.reset();
     auto row_desc = p._child_x->row_desc();
-    auto build_block = 
std::move(_shared_state->inner_shared_state->build_block);
-    if (!build_block) {
-        build_block = vectorized::Block::create_shared();
-        auto inner_sink_state = 
_shared_state->inner_runtime_state->get_sink_local_state();
-        if (inner_sink_state) {
-            auto& mutable_block = 
reinterpret_cast<HashJoinBuildSinkLocalState*>(inner_sink_state)
-                                          ->_build_side_mutable_block;
-            *build_block = mutable_block.to_block();
-            LOG(INFO) << "hash join sink will revoke build mutable block: "
-                      << build_block->allocated_bytes();
-        }
+    std::vector<vectorized::Block> build_blocks;
+    auto inner_sink_state_ = 
_shared_state->inner_runtime_state->get_sink_local_state();
+    if (inner_sink_state_) {
+        auto inner_sink_state = 
assert_cast<HashJoinBuildSinkLocalState*>(inner_sink_state_);
+        build_blocks = std::move(inner_sink_state->_build_blocks);
     }
 
-    /// Here need to skip the first row in build block.
-    /// The first row in build block is generated by 
`HashJoinBuildSinkOperatorX::sink`.
-    if (build_block->rows() <= 1) {
+    if (build_blocks.empty()) {
+        LOG(WARNING) << "has no data to revoke, node: " << _parent->node_id()
+                     << ", task: " << state->task_id();
         return Status::OK();
     }
 
-    if (build_block->columns() > row_desc.num_slots()) {
-        build_block->erase(row_desc.num_slots());
-    }
-
-    {
-        /// TODO: DO NOT execute build exprs twice(when partition and building 
hash table)
-        SCOPED_TIMER(_partition_timer);
-        RETURN_IF_ERROR(
-                _partitioner->do_partitioning(state, build_block.get(), 
_mem_tracker.get()));
-    }
-
     auto execution_context = state->get_task_execution_context();
     _dependency->block();
     auto query_id = state->query_id();
     auto mem_tracker = state->get_query_ctx()->query_mem_tracker;
-    auto spill_func = [execution_context, build_block, state, query_id, 
mem_tracker,
-                       this]() mutable {
+    auto spill_func = [execution_context, build_blocks = 
std::move(build_blocks), state, query_id,
+                       mem_tracker, this]() mutable {
         SCOPED_ATTACH_TASK_WITH_ID(mem_tracker, query_id);
         Defer defer {[&]() {
             // need to reset build_block here, or else build_block will be 
destructed
             // after SCOPED_ATTACH_TASK_WITH_ID and will trigger 
memory_orphan_check failure
-            build_block.reset();
+            build_blocks.clear();
         }};
 
         auto execution_context_lock = execution_context.lock();
@@ -155,77 +135,71 @@ Status 
PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(RuntimeSta
         }
 
         auto& p = _parent->cast<PartitionedHashJoinSinkOperatorX>();
-        SCOPED_TIMER(_partition_shuffle_timer);
-        auto* channel_ids = _partitioner->get_channel_ids().get<uint32_t>();
-
         auto& partitioned_blocks = _shared_state->partitioned_build_blocks;
-        std::vector<uint32_t> partition_indices;
+        std::vector<std::vector<uint32_t>> 
partitions_indexes(p._partition_count);
+
         const auto reserved_size = 4096;
-        partition_indices.reserve(reserved_size);
-
-        auto flush_rows = [&partition_indices, &build_block, &state, this](
-                                  std::unique_ptr<vectorized::MutableBlock>& 
partition_block,
-                                  vectorized::SpillStreamSPtr& 
spilling_stream) {
-            auto* begin = &(partition_indices[0]);
-            const auto count = partition_indices.size();
-            if (!partition_block) {
-                partition_block =
-                        
vectorized::MutableBlock::create_unique(build_block->clone_empty());
-            }
-            partition_block->add_rows(build_block.get(), begin, begin + count);
-            partition_indices.clear();
-
-            if (partition_block->allocated_bytes() >=
-                vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM) {
-                auto block = partition_block->to_block();
-                partition_block =
-                        
vectorized::MutableBlock::create_unique(build_block->clone_empty());
-                auto status = spilling_stream->spill_block(state, block, 
false);
-
-                if (!status.ok()) {
-                    std::unique_lock<std::mutex> lock(_spill_lock);
-                    _spill_status = status;
-                    _spill_status_ok = false;
-                    _dependency->set_ready();
-                    return false;
-                }
+        std::for_each(partitions_indexes.begin(), partitions_indexes.end(),
+                      [](std::vector<uint32_t>& indices) { 
indices.reserve(reserved_size); });
+
+        auto flush_rows = [&state, 
this](std::unique_ptr<vectorized::MutableBlock>& partition_block,
+                                         vectorized::SpillStreamSPtr& 
spilling_stream) {
+            auto block = partition_block->to_block();
+            auto status = spilling_stream->spill_block(state, block, false);
+
+            if (!status.ok()) {
+                std::unique_lock<std::mutex> lock(_spill_lock);
+                _spill_status = status;
+                _spill_status_ok = false;
+                _dependency->set_ready();
+                return false;
             }
             return true;
         };
 
-        for (uint32_t i = 0; i != p._partition_count; ++i) {
-            vectorized::SpillStreamSPtr& spilling_stream = 
_shared_state->spilled_streams[i];
-            DCHECK(spilling_stream != nullptr);
+        for (size_t block_idx = 0; block_idx != build_blocks.size(); 
++block_idx) {
+            auto& build_block = build_blocks[block_idx];
+            const auto is_last_block = block_idx == build_blocks.size() - 1;
+            if (UNLIKELY(build_block.empty())) {
+                continue;
+            }
+            {
+                SCOPED_TIMER(_partition_timer);
+                (void)_partitioner->do_partitioning(state, &build_block, 
_mem_tracker.get());
+            }
 
-            if (UNLIKELY(state->is_cancelled())) {
-                break;
+            auto* channel_ids = 
_partitioner->get_channel_ids().get<uint32_t>();
+            for (size_t i = 0; i != build_block.rows(); ++i) {
+                partitions_indexes[channel_ids[i]].emplace_back(i);
             }
 
-            const auto rows = build_block->rows();
-            for (size_t idx = 1; idx != rows; ++idx) {
-                if (channel_ids[idx] == i) {
-                    partition_indices.emplace_back(idx);
-                } else {
-                    continue;
+            for (uint32_t partition_idx = 0; partition_idx != 
p._partition_count; ++partition_idx) {
+                auto* begin = partitions_indexes[partition_idx].data();
+                auto* end = begin + partitions_indexes[partition_idx].size();
+                auto& partition_block = partitioned_blocks[partition_idx];
+                vectorized::SpillStreamSPtr& spilling_stream =
+                        _shared_state->spilled_streams[partition_idx];
+                if (UNLIKELY(!partition_block)) {
+                    partition_block =
+                            
vectorized::MutableBlock::create_unique(build_block.clone_empty());
                 }
 
-                const auto count = partition_indices.size();
-                if (UNLIKELY(count >= reserved_size)) {
-                    if (!flush_rows(partitioned_blocks[i], spilling_stream)) {
-                        break;
-                    }
+                {
+                    SCOPED_TIMER(_partition_shuffle_timer);
+                    partition_block->add_rows(&build_block, begin, end);
+                    partitions_indexes[partition_idx].clear();
+                }
+
+                build_block.clear();
 
-                    if (UNLIKELY(state->is_cancelled())) {
-                        LOG(INFO) << "query was canceled.";
-                        partition_indices.clear();
-                        break;
+                if (partition_block->rows() >= reserved_size || is_last_block) 
{
+                    if (!flush_rows(partition_block, spilling_stream)) {
+                        return;
                     }
+                    partition_block =
+                            
vectorized::MutableBlock::create_unique(build_block.clone_empty());
                 }
             }
-
-            if (!partition_indices.empty()) {
-                flush_rows(partitioned_blocks[i], spilling_stream);
-            }
         }
 
         _dependency->set_ready();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to