This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch spill_and_reserve
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/spill_and_reserve by this push:
     new 815134fd33c [feat]CTE Spill (#44014)
815134fd33c is described below

commit 815134fd33c84004ff5d1824aa8ae96b9c8b31e9
Author: Jerry Hu <[email protected]>
AuthorDate: Sun Nov 17 00:26:09 2024 +0800

    [feat]CTE Spill (#44014)
---
 be/src/pipeline/dependency.cpp                     |   6 +-
 be/src/pipeline/dependency.h                       |  10 +-
 .../pipeline/exec/multi_cast_data_stream_sink.cpp  |  30 ++-
 be/src/pipeline/exec/multi_cast_data_stream_sink.h |  14 +-
 .../exec/multi_cast_data_stream_source.cpp         |  14 +-
 .../pipeline/exec/multi_cast_data_stream_source.h  |   9 +-
 be/src/pipeline/exec/multi_cast_data_streamer.cpp  | 275 +++++++++++++++++++--
 be/src/pipeline/exec/multi_cast_data_streamer.h    |  82 +++++-
 .../exec/partitioned_hash_join_sink_operator.cpp   |  16 +-
 .../exec/partitioned_hash_join_sink_operator.h     |   1 +
 be/src/vec/spill/spill_reader.cpp                  |   7 +
 be/src/vec/spill/spill_stream.cpp                  |   8 +
 be/src/vec/spill/spill_stream.h                    |   5 +
 13 files changed, 429 insertions(+), 48 deletions(-)

diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 9ad0ff1b57f..a7198a97da4 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -392,9 +392,11 @@ void SpillSortSharedState::close() {
 }
 
 MultiCastSharedState::MultiCastSharedState(const RowDescriptor& row_desc, 
ObjectPool* pool,
-                                           int cast_sender_count)
+                                           int cast_sender_count, int node_id)
         : 
multi_cast_data_streamer(std::make_unique<pipeline::MultiCastDataStreamer>(
-                  row_desc, pool, cast_sender_count, true)) {}
+                  row_desc, this, pool, cast_sender_count, node_id, true)) {}
+
+void MultiCastSharedState::update_spill_stream_profiles(RuntimeProfile* 
source_profile) {}
 
 int AggSharedState::get_slot_column_id(const vectorized::AggFnEvaluator* 
evaluator) {
     auto ctxs = evaluator->input_exprs_ctxs();
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index c9a16a88614..1d79331096c 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -570,10 +570,14 @@ public:
 
 class MultiCastDataStreamer;
 
-struct MultiCastSharedState : public BasicSharedState {
-public:
-    MultiCastSharedState(const RowDescriptor& row_desc, ObjectPool* pool, int 
cast_sender_count);
+struct MultiCastSharedState : public BasicSharedState,
+                              public BasicSpillSharedState,
+                              public 
std::enable_shared_from_this<MultiCastSharedState> {
+    MultiCastSharedState(const RowDescriptor& row_desc, ObjectPool* pool, int 
cast_sender_count,
+                         int node_id);
     std::unique_ptr<pipeline::MultiCastDataStreamer> multi_cast_data_streamer;
+
+    void update_spill_stream_profiles(RuntimeProfile* source_profile) override;
 };
 
 struct BlockRowPos {
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
index eb72e9601e1..4af54ec221c 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
@@ -23,9 +23,9 @@
 namespace doris::pipeline {
 #include "common/compile_check_begin.h"
 std::string MultiCastDataStreamSinkLocalState::name_suffix() {
-    auto& sinks = 
static_cast<MultiCastDataStreamSinkOperatorX*>(_parent)->sink_node().sinks;
+    const auto& sinks = 
static_cast<MultiCastDataStreamSinkOperatorX*>(_parent)->sink_node().sinks;
     std::string id_name = " (dst id : ";
-    for (auto& sink : sinks) {
+    for (const auto& sink : sinks) {
         id_name += std::to_string(sink.dest_node_id) + ",";
     }
     id_name += ")";
@@ -34,19 +34,39 @@ std::string 
MultiCastDataStreamSinkLocalState::name_suffix() {
 
 std::shared_ptr<BasicSharedState> 
MultiCastDataStreamSinkOperatorX::create_shared_state() const {
     std::shared_ptr<BasicSharedState> ss =
-            std::make_shared<MultiCastSharedState>(_row_desc, _pool, 
_cast_sender_count);
+            std::make_shared<MultiCastSharedState>(_row_desc, _pool, 
_cast_sender_count, _node_id);
     ss->id = operator_id();
-    for (auto& dest : dests_id()) {
+    for (const auto& dest : dests_id()) {
         ss->related_op_ids.insert(dest);
     }
     return ss;
 }
 
+std::vector<Dependency*> MultiCastDataStreamSinkLocalState::dependencies() 
const {
+    auto dependencies = Base::dependencies();
+    
dependencies.emplace_back(_shared_state->multi_cast_data_streamer->get_spill_dependency());
+    return dependencies;
+}
+
+Status MultiCastDataStreamSinkLocalState::open(RuntimeState* state) {
+    RETURN_IF_ERROR(Base::open(state));
+    _shared_state->multi_cast_data_streamer->set_sink_profile(profile());
+    _shared_state->setup_shared_profile(profile());
+    _shared_state->multi_cast_data_streamer->set_write_dependency(_dependency);
+    return Status::OK();
+}
+
+std::string MultiCastDataStreamSinkLocalState::debug_string(int 
indentation_level) const {
+    fmt::memory_buffer debug_string_buffer;
+    fmt::format_to(debug_string_buffer, "{}, ", 
Base::debug_string(indentation_level),
+                   _shared_state->multi_cast_data_streamer->debug_string());
+    return fmt::to_string(debug_string_buffer);
+}
+
 Status MultiCastDataStreamSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* in_block,
                                               bool eos) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
     if (in_block->rows() > 0 || eos) {
         COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
         auto st = 
local_state._shared_state->multi_cast_data_streamer->push(state, in_block, eos);
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h 
b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
index 57b5974064b..e0c454d8f10 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
@@ -17,22 +17,32 @@
 
 #pragma once
 
+#include <vector>
+
+#include "common/status.h"
 #include "operator.h"
+#include "pipeline/exec/data_queue.h"
 
 namespace doris::pipeline {
 
 class MultiCastDataStreamSinkOperatorX;
 class MultiCastDataStreamSinkLocalState final
-        : public PipelineXSinkLocalState<MultiCastSharedState> {
+        : public PipelineXSpillSinkLocalState<MultiCastSharedState> {
     ENABLE_FACTORY_CREATOR(MultiCastDataStreamSinkLocalState);
     MultiCastDataStreamSinkLocalState(DataSinkOperatorXBase* parent, 
RuntimeState* state)
             : Base(parent, state) {}
     friend class MultiCastDataStreamSinkOperatorX;
     friend class DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
-    using Base = PipelineXSinkLocalState<MultiCastSharedState>;
+    using Base = PipelineXSpillSinkLocalState<MultiCastSharedState>;
     using Parent = MultiCastDataStreamSinkOperatorX;
     std::string name_suffix() override;
 
+    Status open(RuntimeState* state) override;
+
+    std::vector<Dependency*> dependencies() const override;
+
+    std::string debug_string(int indentation_level) const override;
+
 private:
     std::shared_ptr<pipeline::MultiCastDataStreamer> _multi_cast_data_streamer;
 };
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
index e45e59d17e2..61adfed7573 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.cpp
@@ -38,6 +38,8 @@ Status 
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<Parent>();
+    _shared_state->multi_cast_data_streamer->set_source_profile(p._consumer_id,
+                                                                
_runtime_profile.get());
     
_shared_state->multi_cast_data_streamer->set_dep_by_sender_idx(p._consumer_id, 
_dependency);
     _wait_for_rf_timer = ADD_TIMER(_runtime_profile, "WaitForRuntimeFilter");
     _filter_timer = ADD_TIMER(_runtime_profile, "FilterTime");
@@ -50,6 +52,14 @@ Status 
MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalState
     return Status::OK();
 }
 
+std::vector<Dependency*> MultiCastDataStreamSourceLocalState::dependencies() 
const {
+    auto dependencies = Base::dependencies();
+    auto& p = _parent->cast<Parent>();
+    dependencies.emplace_back(
+            
_shared_state->multi_cast_data_streamer->get_spill_read_dependency(p._consumer_id));
+    return dependencies;
+}
+
 Status MultiCastDataStreamSourceLocalState::open(RuntimeState* state) {
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
@@ -92,9 +102,9 @@ Status 
MultiCastDataStreamerSourceOperatorX::get_block(RuntimeState* state,
     {
         SCOPED_TIMER(local_state._get_data_timer);
         
RETURN_IF_ERROR(local_state._shared_state->multi_cast_data_streamer->pull(
-                _consumer_id, output_block, eos));
+                state, _consumer_id, output_block, eos));
     }
-    if (!local_state._conjuncts.empty()) {
+    if (!local_state._conjuncts.empty() && !output_block->empty()) {
         SCOPED_TIMER(local_state._filter_timer);
         
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(local_state._conjuncts, 
output_block,
                                                                
output_block->columns()));
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_source.h 
b/be/src/pipeline/exec/multi_cast_data_stream_source.h
index 57410bf8d95..c1af8c5b21c 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_source.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_source.h
@@ -37,11 +37,12 @@ namespace pipeline {
 class MultiCastDataStreamer;
 class MultiCastDataStreamerSourceOperatorX;
 
-class MultiCastDataStreamSourceLocalState final : public 
PipelineXLocalState<MultiCastSharedState>,
-                                                  public RuntimeFilterConsumer 
{
+class MultiCastDataStreamSourceLocalState final
+        : public PipelineXSpillLocalState<MultiCastSharedState>,
+          public RuntimeFilterConsumer {
 public:
     ENABLE_FACTORY_CREATOR(MultiCastDataStreamSourceLocalState);
-    using Base = PipelineXLocalState<MultiCastSharedState>;
+    using Base = PipelineXSpillLocalState<MultiCastSharedState>;
     using Parent = MultiCastDataStreamerSourceOperatorX;
     MultiCastDataStreamSourceLocalState(RuntimeState* state, OperatorXBase* 
parent);
     Status init(RuntimeState* state, LocalStateInfo& info) override;
@@ -62,6 +63,8 @@ public:
         return res;
     }
 
+    std::vector<Dependency*> dependencies() const override;
+
 private:
     friend class MultiCastDataStreamerSourceOperatorX;
     vectorized::VExprContextSPtrs _output_expr_contexts;
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.cpp 
b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
index 3e629093e23..f1e399a3289 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.cpp
@@ -17,9 +17,26 @@
 
 #include "multi_cast_data_streamer.h"
 
+#include <fmt/format.h>
+#include <glog/logging.h>
+
+#include <iterator>
+#include <memory>
+#include <vector>
+
+#include "common/config.h"
+#include "common/exception.h"
+#include "common/logging.h"
+#include "common/status.h"
 #include "pipeline/dependency.h"
 #include "pipeline/exec/multi_cast_data_stream_source.h"
+#include "pipeline/exec/spill_utils.h"
+#include "runtime/exec_env.h"
 #include "runtime/runtime_state.h"
+#include "util/pretty_printer.h"
+#include "util/uid_util.h"
+#include "vec/core/block.h"
+#include "vec/spill/spill_stream_manager.h"
 
 namespace doris::pipeline {
 #include "common/compile_check_begin.h"
@@ -30,37 +47,115 @@ MultiCastBlock::MultiCastBlock(vectorized::Block* block, 
int used_count, int un_
     block->clear();
 }
 
-Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* 
block, bool* eos) {
+Status MultiCastDataStreamer::pull(RuntimeState* state, int sender_idx, 
vectorized::Block* block,
+                                   bool* eos) {
     int* un_finish_copy = nullptr;
     int use_count = 0;
+    size_t mem_size = 0;
+    bool spilled = false;
     {
         std::lock_guard l(_mutex);
+
+        if (!_cached_blocks[sender_idx].empty()) {
+            *block = std::move(_cached_blocks[sender_idx].front());
+            
_cached_blocks[sender_idx].erase(_cached_blocks[sender_idx].begin());
+            return Status::OK();
+        }
+
+        for (auto it = _spill_readers[sender_idx].begin();
+             it != _spill_readers[sender_idx].end();) {
+            if ((*it)->all_data_read) {
+                it = _spill_readers[sender_idx].erase(it);
+            } else {
+                it++;
+            }
+        }
+
+        if (!_spill_readers[sender_idx].empty()) {
+            auto reader_item = _spill_readers[sender_idx].front();
+            if (!reader_item->stream->ready_for_reading()) {
+                return Status::OK();
+            }
+
+            auto& reader = reader_item->reader;
+            RETURN_IF_ERROR(reader->open());
+            if (reader_item->block_offset != 0) {
+                reader->seek(reader_item->block_offset);
+                reader_item->block_offset = 0;
+            }
+
+            auto spill_func = [this, reader_item, sender_idx]() {
+                vectorized::Block block;
+                bool spill_eos = false;
+                size_t read_size = 0;
+                while (!spill_eos) {
+                    RETURN_IF_ERROR(reader_item->reader->read(&block, 
&spill_eos));
+                    if (!block.empty()) {
+                        std::lock_guard l(_mutex);
+                        read_size += block.allocated_bytes();
+                        
_cached_blocks[sender_idx].emplace_back(std::move(block));
+                        if (_cached_blocks[sender_idx].size() >= 32 ||
+                            read_size > 2 * 1024 * 1024) {
+                            break;
+                        }
+                    }
+                }
+
+                if (spill_eos || !_cached_blocks[sender_idx].empty()) {
+                    reader_item->all_data_read = spill_eos;
+                    _set_ready_for_read(sender_idx);
+                }
+                return Status::OK();
+            };
+
+            auto catch_exception_func = [spill_func = std::move(spill_func)]() 
{
+                RETURN_IF_CATCH_EXCEPTION(return spill_func(););
+            };
+
+            _spill_read_dependencies[sender_idx]->block();
+            auto spill_runnable = std::make_shared<SpillRecoverRunnable>(
+                    state, _spill_read_dependencies[sender_idx], 
_source_profiles[sender_idx],
+                    _shared_state->shared_from_this(), catch_exception_func);
+            auto* thread_pool =
+                    
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
+            RETURN_IF_ERROR(thread_pool->submit(std::move(spill_runnable)));
+            return Status::OK();
+        }
+
         auto& pos_to_pull = _sender_pos_to_read[sender_idx];
         const auto end = _multi_cast_blocks.end();
-        DCHECK(pos_to_pull != end);
+        if (pos_to_pull == end) {
+            _block_reading(sender_idx);
+            VLOG_DEBUG << "query: " << print_id(state->query_id())
+                       << ", pos_to_pull end: " << (void*)(_write_dependency);
+            *eos = _eos;
+            return Status::OK();
+        }
 
         *block = *pos_to_pull->_block;
 
-        _cumulative_mem_size -= pos_to_pull->_mem_size;
-
         pos_to_pull->_used_count--;
         use_count = pos_to_pull->_used_count;
+        mem_size = pos_to_pull->_mem_size;
         un_finish_copy = &pos_to_pull->_un_finish_copy;
 
         pos_to_pull++;
 
         if (pos_to_pull == end) {
             _block_reading(sender_idx);
+            *eos = _eos;
+            RETURN_IF_ERROR(_trigger_spill_if_need(state, &spilled));
         }
 
-        *eos = _eos and pos_to_pull == end;
-    }
-
-    if (use_count == 0) {
-        // will clear _multi_cast_blocks
-        _wait_copy_block(block, *un_finish_copy);
-    } else {
-        _copy_block(block, *un_finish_copy);
+        if (use_count == 0) {
+            _cumulative_mem_size.fetch_sub(mem_size);
+            _multi_cast_blocks.pop_front();
+            _write_dependency->set_ready();
+            VLOG_DEBUG << "**** query: " << print_id(state->query_id())
+                       << ", set ready: " << (void*)(_write_dependency);
+        } else {
+            _copy_block(block, *un_finish_copy);
+        }
     }
 
     return Status::OK();
@@ -71,13 +166,6 @@ void MultiCastDataStreamer::_copy_block(vectorized::Block* 
block, int& un_finish
     for (int i = 0; i < block->columns(); ++i) {
         block->get_by_position(i).column = 
block->get_by_position(i).column->clone_resized(rows);
     }
-
-    std::unique_lock l(_mutex);
-    un_finish_copy--;
-    if (un_finish_copy == 0) {
-        l.unlock();
-        _cv.notify_one();
-    }
 }
 
 void MultiCastDataStreamer::_wait_copy_block(vectorized::Block* block, int& 
un_finish_copy) {
@@ -86,16 +174,153 @@ void 
MultiCastDataStreamer::_wait_copy_block(vectorized::Block* block, int& un_f
     _multi_cast_blocks.pop_front();
 }
 
+Status MultiCastDataStreamer::_trigger_spill_if_need(RuntimeState* state, 
bool* triggered) {
+    vectorized::SpillStreamSPtr spill_stream;
+    *triggered = false;
+    if (_cumulative_mem_size.load() >= config::exchg_node_buffer_size_bytes &&
+        _multi_cast_blocks.size() >= 4) {
+        _write_dependency->block();
+
+        bool has_reached_end = false;
+        std::vector<int64_t> distances(_cast_sender_count);
+        size_t total_count = _multi_cast_blocks.size();
+        for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
+            distances[i] = std::distance(_multi_cast_blocks.begin(), 
_sender_pos_to_read[i]);
+            if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
+                has_reached_end = true;
+                CHECK_EQ(distances[i], total_count);
+            }
+
+            if (!_spill_readers[i].empty()) {
+                CHECK_EQ(distances[i], 0);
+            }
+        }
+
+        if (has_reached_end) {
+            
RETURN_IF_ERROR(ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream(
+                    state, spill_stream, print_id(state->query_id()), 
"MultiCastSender", _node_id,
+                    std::numeric_limits<int32_t>::max(), 
std::numeric_limits<size_t>::max(),
+                    _sink_profile));
+            for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
+                if (distances[i] < total_count) {
+                    auto reader = spill_stream->create_separate_reader();
+                    reader->set_counters(_source_profiles[i]);
+                    auto reader_item = std::make_shared<SpillingReader>(
+                            std::move(reader), spill_stream, distances[i], 
false);
+                    _spill_readers[i].emplace_back(std::move(reader_item));
+                }
+
+                _block_reading(i);
+            }
+
+            RETURN_IF_ERROR(_submit_spill_task(state, spill_stream));
+            DCHECK_EQ(_multi_cast_blocks.size(), 0);
+
+            for (auto& pos : _sender_pos_to_read) {
+                pos = _multi_cast_blocks.end();
+            }
+            _cumulative_mem_size = 0;
+            *triggered = true;
+        }
+    }
+
+    return Status::OK();
+}
+
+Status MultiCastDataStreamer::_submit_spill_task(RuntimeState* state,
+                                                 vectorized::SpillStreamSPtr 
spill_stream) {
+    std::vector<vectorized::Block> blocks;
+    for (auto& block : _multi_cast_blocks) {
+        blocks.emplace_back(std::move(*block._block));
+    }
+
+    _multi_cast_blocks.clear();
+
+    auto spill_func = [state, blocks = std::move(blocks),
+                       spill_stream = std::move(spill_stream)]() mutable {
+        const auto blocks_count = blocks.size();
+        while (!blocks.empty() && !state->is_cancelled()) {
+            auto block = std::move(blocks.front());
+            blocks.erase(blocks.begin());
+
+            RETURN_IF_ERROR(spill_stream->spill_block(state, block, false));
+        }
+        VLOG_DEBUG << "query: " << print_id(state->query_id()) << " multi cast 
write "
+                   << blocks_count << " blocks";
+        return spill_stream->spill_eof();
+    };
+
+    auto exception_catch_func = [spill_func = std::move(spill_func),
+                                 query_id = print_id(state->query_id()), 
this]() mutable {
+        auto status = [&]() { RETURN_IF_CATCH_EXCEPTION(return spill_func()); 
}();
+        _write_dependency->set_ready();
+
+        if (!status.ok()) {
+            LOG(WARNING) << "query: " << query_id
+                         << " multi cast write failed: " << status.to_string()
+                         << ", dependency: " << (void*)_spill_dependency.get();
+        } else {
+            for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
+                _set_ready_for_read(i);
+            }
+        }
+        return status;
+    };
+
+    auto spill_runnable = std::make_shared<SpillSinkRunnable>(
+            state, nullptr, _spill_dependency, _sink_profile, 
_shared_state->shared_from_this(),
+            exception_catch_func);
+
+    _spill_dependency->block();
+
+    auto* thread_pool = 
ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool();
+    return thread_pool->submit(std::move(spill_runnable));
+}
+
 Status MultiCastDataStreamer::push(RuntimeState* state, 
doris::vectorized::Block* block, bool eos) {
     auto rows = block->rows();
     COUNTER_UPDATE(_process_rows, rows);
 
     const auto block_mem_size = block->allocated_bytes();
-    _cumulative_mem_size += block_mem_size;
-    COUNTER_SET(_peak_mem_usage, std::max(_cumulative_mem_size, 
_peak_mem_usage->value()));
+
+    if (!_shared_state->_spill_status.ok()) {
+        return _shared_state->_spill_status.status();
+    }
 
     {
         std::lock_guard l(_mutex);
+
+        if (_pending_block) {
+            const auto pending_size = _pending_block->allocated_bytes();
+            _cumulative_mem_size += pending_size;
+            _multi_cast_blocks.emplace_back(_pending_block.get(), 
_cast_sender_count,
+                                            _cast_sender_count - 1, 
pending_size);
+            _pending_block.reset();
+
+            auto end = std::prev(_multi_cast_blocks.end());
+            for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
+                if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
+                    _sender_pos_to_read[i] = end;
+                    _set_ready_for_read(i);
+                }
+            }
+        }
+
+        _cumulative_mem_size += block_mem_size;
+        COUNTER_SET(_peak_mem_usage,
+                    std::max(_cumulative_mem_size.load(), 
_peak_mem_usage->value()));
+
+        if (!eos) {
+            bool spilled = false;
+            RETURN_IF_ERROR(_trigger_spill_if_need(state, &spilled));
+            if (spilled) {
+                _pending_block =
+                        
vectorized::Block::create_unique(block->get_columns_with_type_and_name());
+                block->clear();
+                return Status::OK();
+            }
+        }
+
         _multi_cast_blocks.emplace_back(block, _cast_sender_count, 
_cast_sender_count - 1,
                                         block_mem_size);
         // last elem
@@ -106,6 +331,7 @@ Status MultiCastDataStreamer::push(RuntimeState* state, 
doris::vectorized::Block
                 _set_ready_for_read(i);
             }
         }
+
         _eos = eos;
     }
 
@@ -135,4 +361,11 @@ void MultiCastDataStreamer::_block_reading(int sender_idx) 
{
     dep->block();
 }
 
+std::string MultiCastDataStreamer::debug_string() const {
+    fmt::memory_buffer debug_string_buffer;
+    fmt::format_to(debug_string_buffer, "MemSize: {}",
+                   PrettyPrinter::print_bytes(_cumulative_mem_size));
+    return fmt::to_string(debug_string_buffer);
+}
+
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/multi_cast_data_streamer.h 
b/be/src/pipeline/exec/multi_cast_data_streamer.h
index 07e64016363..079acb9c81f 100644
--- a/be/src/pipeline/exec/multi_cast_data_streamer.h
+++ b/be/src/pipeline/exec/multi_cast_data_streamer.h
@@ -17,11 +17,22 @@
 
 #pragma once
 
+#include <atomic>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "pipeline/dependency.h"
+#include "util/runtime_profile.h"
+#include "vec/core/block.h"
 #include "vec/sink/vdata_stream_sender.h"
+#include "vec/spill/spill_stream.h"
 
 namespace doris::pipeline {
 
 class Dependency;
+struct MultiCastSharedState;
+
 struct MultiCastBlock {
     MultiCastBlock(vectorized::Block* block, int used_count, int need_copy, 
size_t mem_size);
 
@@ -31,30 +42,53 @@ struct MultiCastBlock {
     size_t _mem_size;
 };
 
+struct SpillingReader {
+    vectorized::SpillReaderUPtr reader;
+    vectorized::SpillStreamSPtr stream;
+    int64_t block_offset {0};
+    bool all_data_read {false};
+};
+
 // TDOD: MultiCastDataStreamer same as the data queue, maybe rethink union and 
refactor the
 // code
 class MultiCastDataStreamer {
 public:
-    MultiCastDataStreamer(const RowDescriptor& row_desc, ObjectPool* pool, int 
cast_sender_count,
+    MultiCastDataStreamer(const RowDescriptor& row_desc, MultiCastSharedState* 
shared_state,
+                          ObjectPool* pool, int cast_sender_count, int32_t 
node_id,
                           bool with_dependencies = false)
             : _row_desc(row_desc),
+              _shared_state(shared_state),
               _profile(pool->add(new 
RuntimeProfile("MultiCastDataStreamSink"))),
-              _cast_sender_count(cast_sender_count) {
+              _cached_blocks(cast_sender_count),
+              _cast_sender_count(cast_sender_count),
+              _node_id(node_id),
+              _spill_readers(cast_sender_count),
+              _source_profiles(cast_sender_count) {
         _sender_pos_to_read.resize(cast_sender_count, 
_multi_cast_blocks.end());
         if (with_dependencies) {
             _dependencies.resize(cast_sender_count, nullptr);
         }
 
+        _spill_dependency = Dependency::create_shared(_node_id, _node_id,
+                                                      
"MultiCastDataStreamerDependency", true);
+
+        for (int i = 0; i != cast_sender_count; ++i) {
+            _spill_read_dependencies.emplace_back(Dependency::create_shared(
+                    node_id, node_id, "MultiCastReadSpillDependency", true));
+        }
         _peak_mem_usage = ADD_COUNTER(profile(), "PeakMemUsage", TUnit::BYTES);
         _process_rows = ADD_COUNTER(profile(), "ProcessRows", TUnit::UNIT);
     };
 
-    ~MultiCastDataStreamer() = default;
+    ~MultiCastDataStreamer() {
+        for (auto& item : _spill_readers) {
+            DCHECK(item.empty());
+        }
+    }
 
-    Status pull(int sender_idx, vectorized::Block* block, bool* eos);
+    Status pull(RuntimeState* state, int sender_idx, vectorized::Block* block, 
bool* eos);
 
     Status push(RuntimeState* state, vectorized::Block* block, bool eos);
-
     const RowDescriptor& row_desc() { return _row_desc; }
 
     RuntimeProfile* profile() { return _profile; }
@@ -64,6 +98,22 @@ public:
         _block_reading(sender_idx);
     }
 
+    void set_write_dependency(Dependency* dependency) { _write_dependency = 
dependency; }
+
+    Dependency* get_spill_dependency() const { return _spill_dependency.get(); 
}
+
+    Dependency* get_spill_read_dependency(int sender_idx) const {
+        return _spill_read_dependencies[sender_idx].get();
+    }
+
+    void set_sink_profile(RuntimeProfile* profile) { _sink_profile = profile; }
+
+    void set_source_profile(int sender_idx, RuntimeProfile* profile) {
+        _source_profiles[sender_idx] = profile;
+    }
+
+    std::string debug_string() const;
+
 private:
     void _set_ready_for_read(int sender_idx);
     void _block_reading(int sender_idx);
@@ -72,19 +122,37 @@ private:
 
     void _wait_copy_block(vectorized::Block* block, int& un_finish_copy);
 
+    Status _submit_spill_task(RuntimeState* state, vectorized::SpillStreamSPtr 
spill_stream);
+
+    Status _trigger_spill_if_need(RuntimeState* state, bool* triggered);
+
     const RowDescriptor& _row_desc;
+    MultiCastSharedState* _shared_state;
     RuntimeProfile* _profile = nullptr;
     std::list<MultiCastBlock> _multi_cast_blocks;
+    std::list<MultiCastBlock> _spilling_blocks;
+    std::vector<std::vector<vectorized::Block>> _cached_blocks;
     std::vector<std::list<MultiCastBlock>::iterator> _sender_pos_to_read;
     std::condition_variable _cv;
     std::mutex _mutex;
     bool _eos = false;
     int _cast_sender_count = 0;
-    int64_t _cumulative_mem_size = 0;
-
+    int _node_id;
+    std::atomic_int64_t _cumulative_mem_size = 0;
     RuntimeProfile::Counter* _process_rows = nullptr;
     RuntimeProfile::Counter* _peak_mem_usage = nullptr;
 
+    Dependency* _write_dependency;
     std::vector<Dependency*> _dependencies;
+    std::shared_ptr<Dependency> _spill_dependency;
+
+    vectorized::BlockUPtr _pending_block;
+
+    std::vector<std::vector<std::shared_ptr<SpillingReader>>> _spill_readers;
+
+    std::vector<std::shared_ptr<Dependency>> _spill_read_dependencies;
+
+    RuntimeProfile* _sink_profile;
+    std::vector<RuntimeProfile*> _source_profiles;
 };
 } // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
index a8c2b495365..6f2f7c8bc15 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp
@@ -21,6 +21,7 @@
 
 #include <algorithm>
 #include <memory>
+#include <mutex>
 
 #include "common/logging.h"
 #include "pipeline/exec/operator.h"
@@ -332,9 +333,12 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
         if (spill_context) {
             spill_context->on_task_finished();
         }
+
+        std::lock_guard<std::mutex> lock(_spill_mutex);
         _spill_dependency->set_ready();
         return status;
     };
+
     for (size_t i = 0; i != _shared_state->partitioned_build_blocks.size(); 
++i) {
         vectorized::SpillStreamSPtr& spilling_stream = 
_shared_state->spilled_streams[i];
         auto& mutable_block = _shared_state->partitioned_build_blocks[i];
@@ -390,9 +394,15 @@ Status PartitionedHashJoinSinkLocalState::revoke_memory(
         }
     }
 
-    if (_spilling_task_count > 0) {
-        _spill_dependency->block();
-    } else if (_child_eos) {
+    if (_spilling_task_count.load() > 0) {
+        std::lock_guard<std::mutex> lock(_spill_mutex);
+        if (_spilling_task_count.load() > 0) {
+            _spill_dependency->block();
+            return Status::OK();
+        }
+    }
+
+    if (_child_eos) {
         VLOG_DEBUG << "query:" << print_id(state->query_id()) << ", hash join 
sink "
                    << _parent->node_id() << " set_ready_to_read"
                    << ", task id: " << state->task_id();
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index 58b19004f33..374055a838f 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -68,6 +68,7 @@ protected:
 
     friend class PartitionedHashJoinSinkOperatorX;
 
+    std::mutex _spill_mutex;
     std::atomic<bool> _spilling_finished {false};
     std::atomic_int32_t _spilling_task_count {0};
 
diff --git a/be/src/vec/spill/spill_reader.cpp 
b/be/src/vec/spill/spill_reader.cpp
index c947081fcaf..014b83be23d 100644
--- a/be/src/vec/spill/spill_reader.cpp
+++ b/be/src/vec/spill/spill_reader.cpp
@@ -17,6 +17,8 @@
 
 #include "vec/spill/spill_reader.h"
 
+#include <glog/logging.h>
+
 #include <algorithm>
 
 #include "common/cast_set.h"
@@ -99,6 +101,11 @@ Status SpillReader::open() {
     return Status::OK();
 }
 
+void SpillReader::seek(size_t block_index) {
+    DCHECK_LT(block_index, block_count_);
+    read_block_index_ = block_index;
+}
+
 Status SpillReader::read(Block* block, bool* eos) {
     DCHECK(file_reader_);
     block->clear_column_data();
diff --git a/be/src/vec/spill/spill_stream.cpp 
b/be/src/vec/spill/spill_stream.cpp
index 3e5b93a21d7..a27916a87a3 100644
--- a/be/src/vec/spill/spill_stream.cpp
+++ b/be/src/vec/spill/spill_stream.cpp
@@ -115,6 +115,10 @@ Status SpillStream::prepare() {
     return writer_->open();
 }
 
+SpillReaderUPtr SpillStream::create_separate_reader() const {
+    return std::make_unique<SpillReader>(stream_id_, writer_->get_file_path());
+}
+
 const TUniqueId& SpillStream::query_id() const {
     return query_id_;
 }
@@ -144,6 +148,10 @@ Status SpillStream::spill_eof() {
     auto status = writer_->close();
     total_written_bytes_ = writer_->get_written_bytes();
     writer_.reset();
+
+    if (status.ok()) {
+        _ready_for_reading = true;
+    }
     return status;
 }
 
diff --git a/be/src/vec/spill/spill_stream.h b/be/src/vec/spill/spill_stream.h
index 9682130aad0..525abbb7855 100644
--- a/be/src/vec/spill/spill_stream.h
+++ b/be/src/vec/spill/spill_stream.h
@@ -67,8 +67,12 @@ public:
 
     void update_shared_profiles(RuntimeProfile* source_op_profile);
 
+    SpillReaderUPtr create_separate_reader() const;
+
     const TUniqueId& query_id() const;
 
+    bool ready_for_reading() const { return _ready_for_reading; }
+
 private:
     friend class SpillStreamManager;
 
@@ -86,6 +90,7 @@ private:
     size_t batch_bytes_;
     int64_t total_written_bytes_ = 0;
 
+    std::atomic_bool _ready_for_reading = false;
     std::atomic_bool _is_reading = false;
 
     SpillWriterUPtr writer_;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to