This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ee45ed206a3 [fix](pipeline) Do not push data in local exchange if eos 
(#35972)
ee45ed206a3 is described below

commit ee45ed206a3cffcd79671b3ec3976fe95ea8d789
Author: Gabriel <[email protected]>
AuthorDate: Fri Jun 7 10:01:29 2024 +0800

    [fix](pipeline) Do not push data in local exchange if eos (#35972)
---
 be/src/pipeline/dependency.h                       |   4 +-
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   |   9 ++
 be/src/pipeline/exec/hashjoin_probe_operator.h     |   1 +
 .../exec/partitioned_hash_join_probe_operator.cpp  |  20 ++++
 .../exec/partitioned_hash_join_probe_operator.h    |   2 +
 .../local_exchange/local_exchange_sink_operator.h  |   4 +-
 .../local_exchange_source_operator.cpp             |   6 +-
 .../local_exchange_source_operator.h               |   6 +-
 be/src/pipeline/local_exchange/local_exchanger.cpp |  55 ++++++----
 be/src/pipeline/local_exchange/local_exchanger.h   | 120 ++++++++++++++-------
 10 files changed, 160 insertions(+), 67 deletions(-)

diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index e5a019b4fa0..0f9c698a82e 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -835,13 +835,13 @@ struct DataDistribution {
     std::vector<TExpr> partition_exprs;
 };
 
-class Exchanger;
+class ExchangerBase;
 
 struct LocalExchangeSharedState : public BasicSharedState {
 public:
     ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
     LocalExchangeSharedState(int num_instances);
-    std::unique_ptr<Exchanger> exchanger {};
+    std::unique_ptr<ExchangerBase> exchanger {};
     std::vector<MemTracker*> mem_trackers;
     std::atomic<size_t> mem_usage = 0;
     std::mutex le_lock;
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 002ead551f6..dc2df872bd5 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -363,6 +363,15 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
     return Status::OK();
 }
 
+std::string HashJoinProbeLocalState::debug_string(int indentation_level) const 
{
+    fmt::memory_buffer debug_string_buffer;
+    fmt::format_to(debug_string_buffer, "{}, short_circuit_for_probe: {}",
+                   JoinProbeLocalState<HashJoinSharedState, 
HashJoinProbeLocalState>::debug_string(
+                           indentation_level),
+                   _shared_state ? 
std::to_string(_shared_state->short_circuit_for_probe) : "NULL");
+    return fmt::to_string(debug_string_buffer);
+}
+
 Status HashJoinProbeLocalState::_extract_join_column(vectorized::Block& block,
                                                      const std::vector<int>& 
res_col_ids) {
     if (empty_right_table_shortcut()) {
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 028b0583167..b8bc892ef31 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -77,6 +77,7 @@ public:
         // !Base::_projections.empty() means nereids planner
         return _shared_state->empty_right_table_need_probe_dispose && 
!Base::_projections.empty();
     }
+    std::string debug_string(int indentation_level) const override;
 
 private:
     void _prepare_probe_block();
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 17dc04b7828..7ee06f5ab4d 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -370,6 +370,16 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
     return spill_io_pool->submit_func(exception_catch_func);
 }
 
+std::string PartitionedHashJoinProbeOperatorX::debug_string(RuntimeState* 
state,
+                                                            int 
indentation_level) const {
+    fmt::memory_buffer debug_string_buffer;
+    fmt::format_to(debug_string_buffer, "{}, in mem join probe: {}",
+                   
JoinProbeOperatorX<PartitionedHashJoinProbeLocalState>::debug_string(
+                           state, indentation_level),
+                   _inner_probe_operator ? 
_inner_probe_operator->debug_string(state, 0) : "NULL");
+    return fmt::to_string(debug_string_buffer);
+}
+
 Status 
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(RuntimeState*
 state,
                                                                            
uint32_t partition_index,
                                                                            
bool& has_data) {
@@ -763,6 +773,16 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     const auto need_to_spill = local_state._shared_state->need_to_spill;
+#ifndef NDEBUG
+    Defer eos_check_defer([&] {
+        if (*eos) {
+            LOG(INFO) << "query: " << print_id(state->query_id())
+                      << ", hash probe node: " << node_id() << ", task: " << 
state->task_id()
+                      << ", eos with child eos: " << local_state._child_eos
+                      << ", need spill: " << need_to_spill;
+        }
+    });
+#endif
     if (need_more_input_data(state)) {
         if (need_to_spill && _should_revoke_memory(state)) {
             return _revoke_memory(state);
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index aecd8a22f91..5dced043214 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -153,6 +153,8 @@ public:
     Status pull(doris::RuntimeState* state, vectorized::Block* output_block,
                 bool* eos) const override;
 
+    std::string debug_string(RuntimeState* state, int indentation_level = 0) 
const override;
+
     bool need_more_input_data(RuntimeState* state) const override;
     DataDistribution required_data_distribution() const override {
         if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index 36530bc8ef1..c29d6a7ec90 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -25,7 +25,7 @@ class PartitionerBase;
 
 namespace doris::pipeline {
 
-class Exchanger;
+class ExchangerBase;
 class ShuffleExchanger;
 class PassthroughExchanger;
 class BroadcastExchanger;
@@ -63,7 +63,7 @@ private:
     friend class LocalMergeSortExchanger;
     friend class AdaptivePassthroughExchanger;
 
-    Exchanger* _exchanger = nullptr;
+    ExchangerBase* _exchanger = nullptr;
 
     // Used by shuffle exchanger
     RuntimeProfile::Counter* _compute_hash_value_timer = nullptr;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
index 1c8dff51a29..79aba88fbaa 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
@@ -66,14 +66,16 @@ std::string LocalExchangeSourceLocalState::debug_string(int 
indentation_level) c
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer,
                    "{}, _channel_id: {}, _num_partitions: {}, _num_senders: 
{}, _num_sources: {}, "
-                   "_running_sink_operators: {}, _running_source_operators: 
{}",
+                   "_running_sink_operators: {}, _running_source_operators: 
{}, mem_usage: {}",
                    Base::debug_string(indentation_level), _channel_id, 
_exchanger->_num_partitions,
                    _exchanger->_num_senders, _exchanger->_num_sources,
-                   _exchanger->_running_sink_operators, 
_exchanger->_running_source_operators);
+                   _exchanger->_running_sink_operators, 
_exchanger->_running_source_operators,
+                   _shared_state->mem_usage.load());
     size_t i = 0;
     fmt::format_to(debug_string_buffer, ", MemTrackers: ");
     for (auto* mem_tracker : _shared_state->mem_trackers) {
         fmt::format_to(debug_string_buffer, "{}: {}, ", i, 
mem_tracker->consumption());
+        i++;
     }
     return fmt::to_string(debug_string_buffer);
 }
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index f32261cd574..58086097d6d 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -21,7 +21,7 @@
 
 namespace doris::pipeline {
 
-class Exchanger;
+class ExchangerBase;
 class ShuffleExchanger;
 class PassthroughExchanger;
 class BroadcastExchanger;
@@ -42,7 +42,7 @@ public:
 
 private:
     friend class LocalExchangeSourceOperatorX;
-    friend class Exchanger;
+    friend class ExchangerBase;
     friend class ShuffleExchanger;
     friend class PassthroughExchanger;
     friend class BroadcastExchanger;
@@ -50,7 +50,7 @@ private:
     friend class LocalMergeSortExchanger;
     friend class AdaptivePassthroughExchanger;
 
-    Exchanger* _exchanger = nullptr;
+    ExchangerBase* _exchanger = nullptr;
     int _channel_id;
     RuntimeProfile::Counter* _get_block_failed_counter = nullptr;
     RuntimeProfile::Counter* _copy_data_timer = nullptr;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index a7c6446be43..7b9186cb7c8 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -45,6 +45,7 @@ Status ShuffleExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block,
 
 void ShuffleExchanger::close(LocalExchangeSourceLocalState& local_state) {
     PartitionedBlock partitioned_block;
+    _data_queue[local_state._channel_id].set_eos();
     while 
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
         auto block_wrapper = partitioned_block.first;
         local_state._shared_state->sub_mem_usage(
@@ -140,8 +141,11 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
             if (size > 0) {
                 local_state._shared_state->add_mem_usage(
                         it.second, 
new_block_wrapper->data_block.allocated_bytes(), false);
-                data_queue[it.second].enqueue({new_block_wrapper, {row_idx, 
start, size}});
-                local_state._shared_state->set_ready_to_read(it.second);
+                if (data_queue[it.second].enqueue({new_block_wrapper, 
{row_idx, start, size}})) {
+                    local_state._shared_state->set_ready_to_read(it.second);
+                } else {
+                    new_block_wrapper->unref(local_state._shared_state);
+                }
             } else {
                 new_block_wrapper->unref(local_state._shared_state);
             }
@@ -154,8 +158,12 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
             if (size > 0) {
                 local_state._shared_state->add_mem_usage(
                         i % _num_sources, 
new_block_wrapper->data_block.allocated_bytes(), false);
-                data_queue[i % _num_sources].enqueue({new_block_wrapper, 
{row_idx, start, size}});
-                local_state._shared_state->set_ready_to_read(i % _num_sources);
+                if (data_queue[i % _num_sources].enqueue(
+                            {new_block_wrapper, {row_idx, start, size}})) {
+                    local_state._shared_state->set_ready_to_read(i % 
_num_sources);
+                } else {
+                    new_block_wrapper->unref(local_state._shared_state);
+                }
             } else {
                 new_block_wrapper->unref(local_state._shared_state);
             }
@@ -170,8 +178,11 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
             if (size > 0) {
                 local_state._shared_state->add_mem_usage(
                         map[i], 
new_block_wrapper->data_block.allocated_bytes(), false);
-                data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, 
start, size}});
-                local_state._shared_state->set_ready_to_read(map[i]);
+                if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, 
start, size}})) {
+                    local_state._shared_state->set_ready_to_read(map[i]);
+                } else {
+                    new_block_wrapper->unref(local_state._shared_state);
+                }
             } else {
                 new_block_wrapper->unref(local_state._shared_state);
             }
@@ -190,14 +201,16 @@ Status PassthroughExchanger::sink(RuntimeState* state, 
vectorized::Block* in_blo
     new_block.swap(*in_block);
     auto channel_id = (local_state._channel_id++) % _num_partitions;
     local_state._shared_state->add_mem_usage(channel_id, 
new_block.allocated_bytes());
-    _data_queue[channel_id].enqueue(std::move(new_block));
-    local_state._shared_state->set_ready_to_read(channel_id);
+    if (_data_queue[channel_id].enqueue(std::move(new_block))) {
+        local_state._shared_state->set_ready_to_read(channel_id);
+    }
 
     return Status::OK();
 }
 
 void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
     vectorized::Block next_block;
+    _data_queue[local_state._channel_id].set_eos();
     while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
         local_state._shared_state->sub_mem_usage(local_state._channel_id,
                                                  next_block.allocated_bytes());
@@ -237,8 +250,9 @@ Status PassToOneExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block
                                 LocalExchangeSinkLocalState& local_state) {
     vectorized::Block new_block(in_block->clone_empty());
     new_block.swap(*in_block);
-    _data_queue[0].enqueue(std::move(new_block));
-    local_state._shared_state->set_ready_to_read(0);
+    if (_data_queue[0].enqueue(std::move(new_block))) {
+        local_state._shared_state->set_ready_to_read(0);
+    }
 
     return Status::OK();
 }
@@ -273,9 +287,10 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state, 
vectorized::Block* in_
     }
     new_block.swap(*in_block);
     DCHECK_LE(local_state._channel_id, _data_queue.size());
-    _data_queue[local_state._channel_id].enqueue(std::move(new_block));
     add_mem_usage(local_state, new_block.allocated_bytes());
-    local_state._shared_state->set_ready_to_read(0);
+    if (_data_queue[local_state._channel_id].enqueue(std::move(new_block))) {
+        local_state._shared_state->set_ready_to_read(0);
+    }
     return Status::OK();
 }
 
@@ -361,8 +376,9 @@ Status BroadcastExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block
     for (size_t i = 0; i < _num_partitions; i++) {
         auto mutable_block = 
vectorized::MutableBlock::create_unique(in_block->clone_empty());
         RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0, 
in_block->rows()));
-        _data_queue[i].enqueue(mutable_block->to_block());
-        local_state._shared_state->set_ready_to_read(i);
+        if (_data_queue[i].enqueue(mutable_block->to_block())) {
+            local_state._shared_state->set_ready_to_read(i);
+        }
     }
 
     return Status::OK();
@@ -370,6 +386,7 @@ Status BroadcastExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block
 
 void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
     vectorized::Block next_block;
+    _data_queue[local_state._channel_id].set_eos();
     while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
         // do nothing
     }
@@ -403,8 +420,9 @@ Status 
AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
     new_block.swap(*in_block);
     auto channel_id = (local_state._channel_id++) % _num_partitions;
     local_state._shared_state->add_mem_usage(channel_id, 
new_block.allocated_bytes());
-    _data_queue[channel_id].enqueue(std::move(new_block));
-    local_state._shared_state->set_ready_to_read(channel_id);
+    if (_data_queue[channel_id].enqueue(std::move(new_block))) {
+        local_state._shared_state->set_ready_to_read(channel_id);
+    }
 
     return Status::OK();
 }
@@ -460,9 +478,10 @@ Status 
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
             RETURN_IF_ERROR(mutable_block->add_rows(block, start, size));
             auto new_block = mutable_block->to_block();
             local_state._shared_state->add_mem_usage(i, 
new_block.allocated_bytes());
-            data_queue[i].enqueue(std::move(new_block));
+            if (data_queue[i].enqueue(std::move(new_block))) {
+                local_state._shared_state->set_ready_to_read(i);
+            }
         }
-        local_state._shared_state->set_ready_to_read(i);
     }
     return Status::OK();
 }
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index 806ac8b9131..113f4906ff8 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -27,29 +27,30 @@ class LocalExchangeSinkLocalState;
 struct ShuffleBlockWrapper;
 class SortSourceOperatorX;
 
-class Exchanger {
+class ExchangerBase {
 public:
-    Exchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
+    ExchangerBase(int running_sink_operators, int num_partitions, int 
free_block_limit)
             : _running_sink_operators(running_sink_operators),
               _running_source_operators(num_partitions),
               _num_partitions(num_partitions),
               _num_senders(running_sink_operators),
               _num_sources(num_partitions),
               _free_block_limit(free_block_limit) {}
-    Exchanger(int running_sink_operators, int num_sources, int num_partitions, 
int free_block_limit)
+    ExchangerBase(int running_sink_operators, int num_sources, int 
num_partitions,
+                  int free_block_limit)
             : _running_sink_operators(running_sink_operators),
               _running_source_operators(num_partitions),
               _num_partitions(num_partitions),
               _num_senders(running_sink_operators),
               _num_sources(num_sources),
               _free_block_limit(free_block_limit) {}
-    virtual ~Exchanger() = default;
+    virtual ~ExchangerBase() = default;
     virtual Status get_block(RuntimeState* state, vectorized::Block* block, 
bool* eos,
                              LocalExchangeSourceLocalState& local_state) = 0;
     virtual Status sink(RuntimeState* state, vectorized::Block* in_block, bool 
eos,
                         LocalExchangeSinkLocalState& local_state) = 0;
     virtual ExchangeType get_type() const = 0;
-    virtual void close(LocalExchangeSourceLocalState& local_state) {}
+    virtual void close(LocalExchangeSourceLocalState& local_state) = 0;
 
     virtual DependencySPtr get_local_state_dependency(int _channel_id) { 
return nullptr; }
 
@@ -68,6 +69,56 @@ protected:
     moodycamel::ConcurrentQueue<vectorized::Block> _free_blocks;
 };
 
+struct PartitionedRowIdxs {
+    std::shared_ptr<vectorized::PODArray<uint32_t>> row_idxs;
+    uint32_t offset_start;
+    uint32_t length;
+};
+
+using PartitionedBlock = std::pair<std::shared_ptr<ShuffleBlockWrapper>, 
PartitionedRowIdxs>;
+
+template <typename BlockType>
+struct BlockQueue {
+    std::atomic<bool> eos = false;
+    moodycamel::ConcurrentQueue<BlockType> data_queue;
+    BlockQueue() : eos(false), 
data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
+    BlockQueue(BlockQueue<BlockType>&& other)
+            : eos(other.eos.load()), data_queue(std::move(other.data_queue)) {}
+    inline bool enqueue(BlockType const& item) {
+        if (!eos) {
+            data_queue.enqueue(item);
+            return true;
+        }
+        return false;
+    }
+
+    inline bool enqueue(BlockType&& item) {
+        if (!eos) {
+            data_queue.enqueue(std::move(item));
+            return true;
+        }
+        return false;
+    }
+
+    bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }
+
+    void set_eos() { eos = true; }
+};
+
+template <typename BlockType>
+class Exchanger : public ExchangerBase {
+public:
+    Exchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
+            : ExchangerBase(running_sink_operators, num_partitions, 
free_block_limit) {}
+    Exchanger(int running_sink_operators, int num_sources, int num_partitions, 
int free_block_limit)
+            : ExchangerBase(running_sink_operators, num_sources, 
num_partitions, free_block_limit) {
+    }
+    ~Exchanger() override = default;
+
+protected:
+    std::vector<BlockQueue<BlockType>> _data_queue;
+};
+
 class LocalExchangeSourceLocalState;
 class LocalExchangeSinkLocalState;
 
@@ -91,19 +142,12 @@ struct ShuffleBlockWrapper {
     vectorized::Block data_block;
 };
 
-class ShuffleExchanger : public Exchanger {
-    struct PartitionedRowIdxs {
-        std::shared_ptr<vectorized::PODArray<uint32_t>> row_idxs;
-        uint32_t offset_start;
-        uint32_t length;
-    };
-
-    using PartitionedBlock = std::pair<std::shared_ptr<ShuffleBlockWrapper>, 
PartitionedRowIdxs>;
-
+class ShuffleExchanger : public Exchanger<PartitionedBlock> {
 public:
     ENABLE_FACTORY_CREATOR(ShuffleExchanger);
     ShuffleExchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
-            : Exchanger(running_sink_operators, num_partitions, 
free_block_limit) {
+            : Exchanger<PartitionedBlock>(running_sink_operators, 
num_partitions,
+                                          free_block_limit) {
         _data_queue.resize(num_partitions);
     }
     ~ShuffleExchanger() override = default;
@@ -118,7 +162,8 @@ public:
 protected:
     ShuffleExchanger(int running_sink_operators, int num_sources, int 
num_partitions,
                      bool ignore_source_data_distribution, int 
free_block_limit)
-            : Exchanger(running_sink_operators, num_sources, num_partitions, 
free_block_limit),
+            : Exchanger<PartitionedBlock>(running_sink_operators, num_sources, 
num_partitions,
+                                          free_block_limit),
               
_ignore_source_data_distribution(ignore_source_data_distribution) {
         _data_queue.resize(num_partitions);
     }
@@ -126,8 +171,6 @@ protected:
                        vectorized::Block* block, bool eos,
                        LocalExchangeSinkLocalState& local_state);
 
-    std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> _data_queue;
-
     const bool _ignore_source_data_distribution = false;
 };
 
@@ -141,11 +184,12 @@ class BucketShuffleExchanger final : public 
ShuffleExchanger {
     ExchangeType get_type() const override { return 
ExchangeType::BUCKET_HASH_SHUFFLE; }
 };
 
-class PassthroughExchanger final : public Exchanger {
+class PassthroughExchanger final : public Exchanger<vectorized::Block> {
 public:
     ENABLE_FACTORY_CREATOR(PassthroughExchanger);
     PassthroughExchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
-            : Exchanger(running_sink_operators, num_partitions, 
free_block_limit) {
+            : Exchanger<vectorized::Block>(running_sink_operators, 
num_partitions,
+                                           free_block_limit) {
         _data_queue.resize(num_partitions);
     }
     ~PassthroughExchanger() override = default;
@@ -156,16 +200,14 @@ public:
                      LocalExchangeSourceLocalState& local_state) override;
     ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; 
}
     void close(LocalExchangeSourceLocalState& local_state) override;
-
-private:
-    std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
 };
 
-class PassToOneExchanger final : public Exchanger {
+class PassToOneExchanger final : public Exchanger<vectorized::Block> {
 public:
     ENABLE_FACTORY_CREATOR(PassToOneExchanger);
     PassToOneExchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
-            : Exchanger(running_sink_operators, num_partitions, 
free_block_limit) {
+            : Exchanger<vectorized::Block>(running_sink_operators, 
num_partitions,
+                                           free_block_limit) {
         _data_queue.resize(num_partitions);
     }
     ~PassToOneExchanger() override = default;
@@ -175,17 +217,16 @@ public:
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
                      LocalExchangeSourceLocalState& local_state) override;
     ExchangeType get_type() const override { return ExchangeType::PASS_TO_ONE; 
}
-
-private:
-    std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
+    void close(LocalExchangeSourceLocalState& local_state) override {}
 };
 
-class LocalMergeSortExchanger final : public Exchanger {
+class LocalMergeSortExchanger final : public Exchanger<vectorized::Block> {
 public:
     ENABLE_FACTORY_CREATOR(LocalMergeSortExchanger);
     LocalMergeSortExchanger(std::shared_ptr<SortSourceOperatorX> sort_source,
                             int running_sink_operators, int num_partitions, 
int free_block_limit)
-            : Exchanger(running_sink_operators, num_partitions, 
free_block_limit),
+            : Exchanger<vectorized::Block>(running_sink_operators, 
num_partitions,
+                                           free_block_limit),
               _sort_source(std::move(sort_source)),
               _queues_mem_usege(num_partitions),
               _each_queue_limit(config::local_exchange_buffer_mem_limit / 
num_partitions) {
@@ -212,11 +253,10 @@ public:
     }
 
     void add_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t 
delta);
-
     void sub_mem_usage(LocalExchangeSourceLocalState& local_state, int 
channel_id, int64_t delta);
+    void close(LocalExchangeSourceLocalState& local_state) override {}
 
 private:
-    std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
     std::unique_ptr<vectorized::VSortedRunMerger> _merger;
     std::shared_ptr<SortSourceOperatorX> _sort_source;
     std::vector<DependencySPtr> _sink_deps;
@@ -224,11 +264,12 @@ private:
     const int64_t _each_queue_limit;
 };
 
-class BroadcastExchanger final : public Exchanger {
+class BroadcastExchanger final : public Exchanger<vectorized::Block> {
 public:
     ENABLE_FACTORY_CREATOR(BroadcastExchanger);
     BroadcastExchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
-            : Exchanger(running_sink_operators, num_partitions, 
free_block_limit) {
+            : Exchanger<vectorized::Block>(running_sink_operators, 
num_partitions,
+                                           free_block_limit) {
         _data_queue.resize(num_partitions);
     }
     ~BroadcastExchanger() override = default;
@@ -239,19 +280,17 @@ public:
                      LocalExchangeSourceLocalState& local_state) override;
     ExchangeType get_type() const override { return ExchangeType::BROADCAST; }
     void close(LocalExchangeSourceLocalState& local_state) override;
-
-private:
-    std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
 };
 
 //The code in AdaptivePassthroughExchanger is essentially
 // a copy of ShuffleExchanger and PassthroughExchanger.
-class AdaptivePassthroughExchanger : public Exchanger {
+class AdaptivePassthroughExchanger : public Exchanger<vectorized::Block> {
 public:
     ENABLE_FACTORY_CREATOR(AdaptivePassthroughExchanger);
     AdaptivePassthroughExchanger(int running_sink_operators, int 
num_partitions,
                                  int free_block_limit)
-            : Exchanger(running_sink_operators, num_partitions, 
free_block_limit) {
+            : Exchanger<vectorized::Block>(running_sink_operators, 
num_partitions,
+                                           free_block_limit) {
         _data_queue.resize(num_partitions);
     }
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
@@ -261,6 +300,8 @@ public:
                      LocalExchangeSourceLocalState& local_state) override;
     ExchangeType get_type() const override { return 
ExchangeType::ADAPTIVE_PASSTHROUGH; }
 
+    void close(LocalExchangeSourceLocalState& local_state) override {}
+
 private:
     Status _passthrough_sink(RuntimeState* state, vectorized::Block* in_block, 
bool eos,
                              LocalExchangeSinkLocalState& local_state);
@@ -269,7 +310,6 @@ private:
     Status _split_rows(RuntimeState* state, const uint32_t* __restrict 
channel_ids,
                        vectorized::Block* block, bool eos,
                        LocalExchangeSinkLocalState& local_state);
-    std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
 
     std::atomic_bool _is_pass_through = false;
     std::atomic_int32_t _total_block = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to