This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 62bb1803f78 [fix](local exchange) Fix EOS processing in local 
exchanger (#39031)
62bb1803f78 is described below

commit 62bb1803f785c3ac2f9dd7c3835adf0e3eca60f5
Author: Gabriel <[email protected]>
AuthorDate: Fri Aug 9 15:19:50 2024 +0800

    [fix](local exchange) Fix EOS processing in local exchanger (#39031)
---
 be/src/pipeline/dependency.cpp                     |   6 +-
 be/src/pipeline/dependency.h                       |  85 ++++--
 be/src/pipeline/exec/operator.cpp                  |   5 +-
 .../local_exchange_sink_operator.cpp               |   5 +-
 .../local_exchange_source_operator.cpp             |  11 +-
 be/src/pipeline/local_exchange/local_exchanger.cpp | 297 +++++++++------------
 be/src/pipeline/local_exchange/local_exchanger.h   | 122 +++++----
 be/src/pipeline/pipeline_fragment_context.cpp      |  26 +-
 8 files changed, 312 insertions(+), 245 deletions(-)

diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 5e1ce79a1eb..560efec94e1 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -188,10 +188,12 @@ void 
LocalExchangeSharedState::sub_running_sink_operators() {
     }
 }
 
-void LocalExchangeSharedState::sub_running_source_operators() {
+void LocalExchangeSharedState::sub_running_source_operators(
+        LocalExchangeSourceLocalState& local_state) {
     std::unique_lock<std::mutex> lc(le_lock);
     if (exchanger->_running_source_operators.fetch_sub(1) == 1) {
         _set_always_ready();
+        exchanger->finalize(local_state);
     }
 }
 
@@ -397,4 +399,6 @@ Status 
AggSharedState::_destroy_agg_status(vectorized::AggregateDataPtr data) {
     return Status::OK();
 }
 
+LocalExchangeSharedState::~LocalExchangeSharedState() = default;
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index ee1afaaf55e..36f06b91095 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -49,7 +49,7 @@ class Dependency;
 class PipelineTask;
 struct BasicSharedState;
 using DependencySPtr = std::shared_ptr<Dependency>;
-using DependencyMap = std::map<int, std::vector<DependencySPtr>>;
+class LocalExchangeSourceLocalState;
 
 static constexpr auto SLOW_DEPENDENCY_THRESHOLD = 60 * 1000L * 1000L * 1000L;
 static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 30 * 1000L * 1000L * 1000L;
@@ -811,20 +811,21 @@ struct LocalExchangeSharedState : public BasicSharedState 
{
 public:
     ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
     LocalExchangeSharedState(int num_instances);
+    ~LocalExchangeSharedState() override;
     std::unique_ptr<ExchangerBase> exchanger {};
     std::vector<MemTracker*> mem_trackers;
     std::atomic<int64_t> mem_usage = 0;
     // We need to make sure to add mem_usage first and then enqueue, otherwise 
sub mem_usage may cause negative mem_usage during concurrent dequeue.
     std::mutex le_lock;
-    void create_source_dependencies(int operator_id, int node_id) {
+    virtual void create_dependencies(int operator_id, int node_id) {
         for (auto& source_dep : source_deps) {
             source_dep = std::make_shared<Dependency>(operator_id, node_id,
                                                       
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
             source_dep->set_shared_state(this);
         }
-    };
+    }
     void sub_running_sink_operators();
-    void sub_running_source_operators();
+    void sub_running_source_operators(LocalExchangeSourceLocalState& 
local_state);
     void _set_always_ready() {
         for (auto& dep : source_deps) {
             DCHECK(dep);
@@ -836,7 +837,10 @@ public:
         }
     }
 
-    Dependency* get_dep_by_channel_id(int channel_id) { return 
source_deps[channel_id].get(); }
+    virtual std::vector<DependencySPtr> get_dep_by_channel_id(int channel_id) {
+        return {source_deps[channel_id]};
+    }
+    virtual Dependency* get_sink_dep_by_channel_id(int channel_id) { return 
nullptr; }
 
     void set_ready_to_read(int channel_id) {
         auto& dep = source_deps[channel_id];
@@ -847,28 +851,79 @@ public:
     void add_mem_usage(int channel_id, size_t delta, bool 
update_total_mem_usage = true) {
         mem_trackers[channel_id]->consume(delta);
         if (update_total_mem_usage) {
-            add_total_mem_usage(delta);
+            add_total_mem_usage(delta, channel_id);
         }
     }
 
-    void sub_mem_usage(int channel_id, size_t delta, bool 
update_total_mem_usage = true) {
-        mem_trackers[channel_id]->release(delta);
-        if (update_total_mem_usage) {
-            sub_total_mem_usage(delta);
-        }
-    }
+    void sub_mem_usage(int channel_id, size_t delta) { 
mem_trackers[channel_id]->release(delta); }
 
-    void add_total_mem_usage(size_t delta) {
+    virtual void add_total_mem_usage(size_t delta, int channel_id = 0) {
         if (mem_usage.fetch_add(delta) + delta > 
config::local_exchange_buffer_mem_limit) {
             sink_deps.front()->block();
         }
     }
 
-    void sub_total_mem_usage(size_t delta) {
-        if (mem_usage.fetch_sub(delta) - delta <= 
config::local_exchange_buffer_mem_limit) {
+    virtual void sub_total_mem_usage(size_t delta, int channel_id = 0) {
+        auto prev_usage = mem_usage.fetch_sub(delta);
+        DCHECK_GE(prev_usage - delta, 0) << "prev_usage: " << prev_usage << " 
delta: " << delta
+                                         << " channel_id: " << channel_id;
+        if (prev_usage - delta <= config::local_exchange_buffer_mem_limit) {
             sink_deps.front()->set_ready();
         }
     }
 };
 
+struct LocalMergeExchangeSharedState : public LocalExchangeSharedState {
+    LocalMergeExchangeSharedState(int num_instances)
+            : LocalExchangeSharedState(num_instances),
+              _queues_mem_usage(num_instances),
+              _each_queue_limit(config::local_exchange_buffer_mem_limit / 
num_instances) {
+        for (size_t i = 0; i < num_instances; i++) {
+            _queues_mem_usage[i] = 0;
+        }
+    }
+
+    void create_dependencies(int operator_id, int node_id) override {
+        sink_deps.resize(source_deps.size());
+        for (size_t i = 0; i < source_deps.size(); i++) {
+            source_deps[i] = std::make_shared<Dependency>(operator_id, node_id,
+                                                          
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
+            source_deps[i]->set_shared_state(this);
+            sink_deps[i] = std::make_shared<Dependency>(
+                    operator_id, node_id, 
"LOCAL_EXCHANGE_OPERATOR_SINK_DEPENDENCY", true);
+            sink_deps[i]->set_shared_state(this);
+        }
+    }
+
+    void sub_total_mem_usage(size_t delta, int channel_id) override {
+        auto prev_usage = _queues_mem_usage[channel_id].fetch_sub(delta);
+        DCHECK_GE(prev_usage - delta, 0) << "prev_usage: " << prev_usage << " 
delta: " << delta
+                                         << " channel_id: " << channel_id;
+        if (prev_usage - delta <= _each_queue_limit) {
+            sink_deps[channel_id]->set_ready();
+        }
+        if (_queues_mem_usage[channel_id] == 0) {
+            source_deps[channel_id]->block();
+        }
+    }
+    void add_total_mem_usage(size_t delta, int channel_id) override {
+        if (_queues_mem_usage[channel_id].fetch_add(delta) + delta > 
_each_queue_limit) {
+            sink_deps[channel_id]->block();
+        }
+        source_deps[channel_id]->set_ready();
+    }
+
+    Dependency* get_sink_dep_by_channel_id(int channel_id) override {
+        return sink_deps[channel_id].get();
+    }
+
+    std::vector<DependencySPtr> get_dep_by_channel_id(int channel_id) override 
{
+        return source_deps;
+    }
+
+private:
+    std::vector<std::atomic_int64_t> _queues_mem_usage;
+    const int64_t _each_queue_limit;
+};
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 07e0c3cf640..1e00b9fcbcb 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -452,7 +452,10 @@ Status 
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
             DCHECK(info.le_state_map.find(_parent->operator_id()) != 
info.le_state_map.end());
             _shared_state = 
info.le_state_map.at(_parent->operator_id()).first.get();
 
-            _dependency = _shared_state->get_dep_by_channel_id(info.task_idx);
+            auto deps = _shared_state->get_dep_by_channel_id(info.task_idx);
+            if (deps.size() == 1) {
+                _dependency = deps.front().get();
+            }
             _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
                     _runtime_profile, "WaitForDependency[" + 
_dependency->name() + "]Time", 1);
         } else if (info.shared_state) {
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index ba51a2da39b..97acd2a8070 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -27,8 +27,9 @@ LocalExchangeSinkLocalState::~LocalExchangeSinkLocalState() = 
default;
 
 std::vector<Dependency*> LocalExchangeSinkLocalState::dependencies() const {
     auto deps = Base::dependencies();
-    auto exchanger_deps = _exchanger->local_sink_state_dependency(_channel_id);
-    for (auto* dep : exchanger_deps) {
+
+    auto dep = _shared_state->get_sink_dep_by_channel_id(_channel_id);
+    if (dep != nullptr) {
         deps.push_back(dep);
     }
     return deps;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
index 6b0cca2d71a..32e93fbc5b2 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
@@ -56,7 +56,7 @@ Status LocalExchangeSourceLocalState::close(RuntimeState* 
state) {
         _exchanger->close(*this);
     }
     if (_shared_state) {
-        _shared_state->sub_running_source_operators();
+        _shared_state->sub_running_source_operators(*this);
     }
 
     return Base::close(state);
@@ -64,9 +64,12 @@ Status LocalExchangeSourceLocalState::close(RuntimeState* 
state) {
 
 std::vector<Dependency*> LocalExchangeSourceLocalState::dependencies() const {
     auto deps = Base::dependencies();
-    auto exchanger_deps = _exchanger->local_state_dependency(_channel_id);
-    for (auto* dep : exchanger_deps) {
-        deps.push_back(dep);
+    auto le_deps = _shared_state->get_dep_by_channel_id(_channel_id);
+    if (le_deps.size() > 1) {
+        // If this is a local merge exchange, we should use all dependencies 
here.
+        for (auto& dep : le_deps) {
+            deps.push_back(dep.get());
+        }
     }
     return deps;
 }
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 647ddcfba2d..e256419688e 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -27,28 +27,74 @@
 namespace doris::pipeline {
 
 template <typename BlockType>
-bool Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id,
+void Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id,
                                                        
LocalExchangeSinkLocalState& local_state,
                                                        BlockType&& block) {
+    size_t allocated_bytes = 0;
+    // PartitionedBlock is used by shuffle exchanger.
+    // PartitionedBlock will be push into multiple queues with different row 
ranges, so it will be
+    // referenced multiple times. Otherwise, we only ref the block once 
because it is only push into
+    // one queue.
+    if constexpr (std::is_same_v<PartitionedBlock, BlockType>) {
+        allocated_bytes = block.first->data_block.allocated_bytes();
+    } else {
+        block->ref(1);
+        allocated_bytes = block->data_block.allocated_bytes();
+    }
     std::unique_lock l(_m);
+    local_state._shared_state->add_mem_usage(channel_id, allocated_bytes,
+                                             !std::is_same_v<PartitionedBlock, 
BlockType>);
     if (_data_queue[channel_id].enqueue(std::move(block))) {
         local_state._shared_state->set_ready_to_read(channel_id);
-        return true;
+    } else {
+        local_state._shared_state->sub_mem_usage(channel_id, allocated_bytes);
+        // `enqueue(block)` return false iff this queue's source operator is 
already closed so we
+        // just unref the block.
+        if constexpr (std::is_same_v<PartitionedBlock, BlockType>) {
+            block.first->unref(local_state._shared_state, allocated_bytes);
+        } else {
+            block->unref(local_state._shared_state, allocated_bytes);
+        }
     }
-    return false;
 }
 
 template <typename BlockType>
 bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& 
local_state,
-                                         BlockType& block, bool* eos) {
+                                         BlockType& block, bool* eos,
+                                         vectorized::Block* data_block) {
+    return _dequeue_data(local_state, block, eos, data_block, 
local_state._channel_id);
+}
+
+template <typename BlockType>
+bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState& 
local_state,
+                                         BlockType& block, bool* eos, 
vectorized::Block* data_block,
+                                         int channel_id) {
     bool all_finished = _running_sink_operators == 0;
-    if (_data_queue[local_state._channel_id].try_dequeue(block)) {
+    if (_data_queue[channel_id].try_dequeue(block)) {
+        if constexpr (std::is_same_v<PartitionedBlock, BlockType>) {
+            local_state._shared_state->sub_mem_usage(channel_id,
+                                                     
block.first->data_block.allocated_bytes());
+        } else {
+            local_state._shared_state->sub_mem_usage(channel_id,
+                                                     
block->data_block.allocated_bytes());
+            data_block->swap(block->data_block);
+            block->unref(local_state._shared_state, 
data_block->allocated_bytes());
+        }
         return true;
     } else if (all_finished) {
         *eos = true;
     } else {
         std::unique_lock l(_m);
-        if (_data_queue[local_state._channel_id].try_dequeue(block)) {
+        if (_data_queue[channel_id].try_dequeue(block)) {
+            if constexpr (std::is_same_v<PartitionedBlock, BlockType>) {
+                local_state._shared_state->sub_mem_usage(channel_id,
+                                                         
block.first->data_block.allocated_bytes());
+            } else {
+                local_state._shared_state->sub_mem_usage(channel_id,
+                                                         
block->data_block.allocated_bytes());
+                data_block->swap(block->data_block);
+                block->unref(local_state._shared_state, 
data_block->allocated_bytes());
+            }
             return true;
         }
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
@@ -76,12 +122,11 @@ Status ShuffleExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block,
 
 void ShuffleExchanger::close(LocalExchangeSourceLocalState& local_state) {
     PartitionedBlock partitioned_block;
+    bool eos;
+    vectorized::Block block;
     _data_queue[local_state._channel_id].set_eos();
-    while 
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
-        auto block_wrapper = partitioned_block.first;
-        local_state._shared_state->sub_mem_usage(
-                local_state._channel_id, 
block_wrapper->data_block.allocated_bytes(), false);
-        block_wrapper->unref(local_state._shared_state);
+    while (_dequeue_data(local_state, partitioned_block, &eos, &block)) {
+        partitioned_block.first->unref(local_state._shared_state);
     }
 }
 
@@ -95,17 +140,15 @@ Status ShuffleExchanger::get_block(RuntimeState* state, 
vectorized::Block* block
             const auto* offset_start = 
partitioned_block.second.row_idxs->data() +
                                        partitioned_block.second.offset_start;
             auto block_wrapper = partitioned_block.first;
-            local_state._shared_state->sub_mem_usage(
-                    local_state._channel_id, 
block_wrapper->data_block.allocated_bytes(), false);
             RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->data_block, 
offset_start,
                                                    offset_start + 
partitioned_block.second.length));
             block_wrapper->unref(local_state._shared_state);
-        } while (mutable_block.rows() < state->batch_size() &&
-                 
_data_queue[local_state._channel_id].try_dequeue(partitioned_block));
+        } while (mutable_block.rows() < state->batch_size() && !*eos &&
+                 _dequeue_data(local_state, partitioned_block, eos, block));
         return Status::OK();
     };
 
-    if (_dequeue_data(local_state, partitioned_block, eos)) {
+    if (_dequeue_data(local_state, partitioned_block, eos, block)) {
         SCOPED_TIMER(local_state._copy_data_timer);
         mutable_block = 
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
                 block, partitioned_block.first->data_block);
@@ -135,11 +178,11 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
     }
 
     vectorized::Block data_block;
-    std::shared_ptr<ShuffleBlockWrapper> new_block_wrapper;
+    std::shared_ptr<BlockWrapper> new_block_wrapper;
     if (_free_blocks.try_dequeue(data_block)) {
-        new_block_wrapper = 
ShuffleBlockWrapper::create_shared(std::move(data_block));
+        new_block_wrapper = BlockWrapper::create_shared(std::move(data_block));
     } else {
-        new_block_wrapper = 
ShuffleBlockWrapper::create_shared(block->clone_empty());
+        new_block_wrapper = BlockWrapper::create_shared(block->clone_empty());
     }
 
     new_block_wrapper->data_block.swap(*block);
@@ -157,15 +200,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
             uint32_t start = local_state._partition_rows_histogram[it.first];
             uint32_t size = local_state._partition_rows_histogram[it.first + 
1] - start;
             if (size > 0) {
-                local_state._shared_state->add_mem_usage(
-                        it.second, 
new_block_wrapper->data_block.allocated_bytes(), false);
-
-                if (!_enqueue_data_and_set_ready(it.second, local_state,
-                                                 {new_block_wrapper, {row_idx, 
start, size}})) {
-                    local_state._shared_state->sub_mem_usage(
-                            it.second, 
new_block_wrapper->data_block.allocated_bytes(), false);
-                    new_block_wrapper->unref(local_state._shared_state);
-                }
+                _enqueue_data_and_set_ready(it.second, local_state,
+                                            {new_block_wrapper, {row_idx, 
start, size}});
             } else {
                 new_block_wrapper->unref(local_state._shared_state);
             }
@@ -176,15 +212,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
             uint32_t start = local_state._partition_rows_histogram[i];
             uint32_t size = local_state._partition_rows_histogram[i + 1] - 
start;
             if (size > 0) {
-                local_state._shared_state->add_mem_usage(
-                        i % _num_sources, 
new_block_wrapper->data_block.allocated_bytes(), false);
-                if (!_enqueue_data_and_set_ready(i % _num_sources, local_state,
-                                                 {new_block_wrapper, {row_idx, 
start, size}})) {
-                    local_state._shared_state->sub_mem_usage(
-                            i % _num_sources, 
new_block_wrapper->data_block.allocated_bytes(),
-                            false);
-                    new_block_wrapper->unref(local_state._shared_state);
-                }
+                _enqueue_data_and_set_ready(i % _num_sources, local_state,
+                                            {new_block_wrapper, {row_idx, 
start, size}});
             } else {
                 new_block_wrapper->unref(local_state._shared_state);
             }
@@ -197,14 +226,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
             uint32_t start = local_state._partition_rows_histogram[i];
             uint32_t size = local_state._partition_rows_histogram[i + 1] - 
start;
             if (size > 0) {
-                local_state._shared_state->add_mem_usage(
-                        map[i], 
new_block_wrapper->data_block.allocated_bytes(), false);
-                if (!_enqueue_data_and_set_ready(map[i], local_state,
-                                                 {new_block_wrapper, {row_idx, 
start, size}})) {
-                    local_state._shared_state->sub_mem_usage(
-                            map[i], 
new_block_wrapper->data_block.allocated_bytes(), false);
-                    new_block_wrapper->unref(local_state._shared_state);
-                }
+                _enqueue_data_and_set_ready(map[i], local_state,
+                                            {new_block_wrapper, {row_idx, 
start, size}});
             } else {
                 new_block_wrapper->unref(local_state._shared_state);
             }
@@ -217,40 +240,32 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
 Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos,
                                   LocalExchangeSinkLocalState& local_state) {
     vectorized::Block new_block;
+    BlockWrapperSPtr wrapper;
     if (!_free_blocks.try_dequeue(new_block)) {
         new_block = {in_block->clone_empty()};
     }
     new_block.swap(*in_block);
+    wrapper = BlockWrapper::create_shared(std::move(new_block));
     auto channel_id = (local_state._channel_id++) % _num_partitions;
-    size_t memory_usage = new_block.allocated_bytes();
-    local_state._shared_state->add_mem_usage(channel_id, memory_usage);
-    if (!_enqueue_data_and_set_ready(channel_id, local_state, 
std::move(new_block))) {
-        local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
-    }
+    _enqueue_data_and_set_ready(channel_id, local_state, std::move(wrapper));
 
     return Status::OK();
 }
 
 void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
     vectorized::Block next_block;
+    BlockWrapperSPtr wrapper;
+    bool eos;
     _data_queue[local_state._channel_id].set_eos();
-    while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
-        local_state._shared_state->sub_mem_usage(local_state._channel_id,
-                                                 next_block.allocated_bytes());
+    while (_dequeue_data(local_state, wrapper, &eos, &next_block)) {
+        next_block = vectorized::Block();
     }
 }
 
 Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                        LocalExchangeSourceLocalState& 
local_state) {
-    vectorized::Block next_block;
-    if (_dequeue_data(local_state, next_block, eos)) {
-        block->swap(next_block);
-        local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
-        if (_free_block_limit == 0 ||
-            _free_blocks.size_approx() < _free_block_limit * _num_sources) {
-            _free_blocks.enqueue(std::move(next_block));
-        }
-    }
+    BlockWrapperSPtr next_block;
+    _dequeue_data(local_state, next_block, eos, block);
     return Status::OK();
 }
 
@@ -258,7 +273,9 @@ Status PassToOneExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block
                                 LocalExchangeSinkLocalState& local_state) {
     vectorized::Block new_block(in_block->clone_empty());
     new_block.swap(*in_block);
-    _enqueue_data_and_set_ready(0, local_state, std::move(new_block));
+
+    BlockWrapperSPtr wrapper = 
BlockWrapper::create_shared(std::move(new_block));
+    _enqueue_data_and_set_ready(0, local_state, std::move(wrapper));
 
     return Status::OK();
 }
@@ -269,10 +286,8 @@ Status PassToOneExchanger::get_block(RuntimeState* state, 
vectorized::Block* blo
         *eos = true;
         return Status::OK();
     }
-    vectorized::Block next_block;
-    if (_dequeue_data(local_state, next_block, eos)) {
-        *block = std::move(next_block);
-    }
+    BlockWrapperSPtr next_block;
+    _dequeue_data(local_state, next_block, eos, block);
     return Status::OK();
 }
 
@@ -282,21 +297,39 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state, 
vectorized::Block* in_
     if (!_free_blocks.try_dequeue(new_block)) {
         new_block = {in_block->clone_empty()};
     }
-    new_block.swap(*in_block);
     DCHECK_LE(local_state._channel_id, _data_queue.size());
 
-    size_t memory_usage = new_block.allocated_bytes();
-    add_mem_usage(local_state, memory_usage);
-
-    if (!_enqueue_data_and_set_ready(local_state._channel_id, local_state, 
std::move(new_block))) {
-        sub_mem_usage(local_state, memory_usage);
-    }
+    new_block.swap(*in_block);
+    _enqueue_data_and_set_ready(local_state._channel_id, local_state,
+                                
BlockWrapper::create_shared(std::move(new_block)));
     if (eos) {
-        _queue_deps[local_state._channel_id]->set_always_ready();
+        
local_state._shared_state->source_deps[local_state._channel_id]->set_always_ready();
     }
     return Status::OK();
 }
 
+void ExchangerBase::finalize(LocalExchangeSourceLocalState& local_state) {
+    DCHECK(_running_source_operators == 0);
+    vectorized::Block block;
+    while (_free_blocks.try_dequeue(block)) {
+        // do nothing
+    }
+}
+void LocalMergeSortExchanger::finalize(LocalExchangeSourceLocalState& 
local_state) {
+    BlockWrapperSPtr next_block;
+    vectorized::Block block;
+    bool eos;
+    int id = 0;
+    for (auto& data_queue : _data_queue) {
+        data_queue.set_eos();
+        while (_dequeue_data(local_state, next_block, &eos, &block, id)) {
+            block = vectorized::Block();
+        }
+        id++;
+    }
+    ExchangerBase::finalize(local_state);
+}
+
 Status LocalMergeSortExchanger::build_merger(RuntimeState* state,
                                              LocalExchangeSourceLocalState& 
local_state) {
     RETURN_IF_ERROR(_sort_source->build_merger(state, _merger, 
local_state.profile()));
@@ -304,18 +337,8 @@ Status LocalMergeSortExchanger::build_merger(RuntimeState* 
state,
     for (int channel_id = 0; channel_id < _num_partitions; channel_id++) {
         vectorized::BlockSupplier block_supplier = [&, id = 
channel_id](vectorized::Block* block,
                                                                         bool* 
eos) {
-            vectorized::Block next_block;
-            bool all_finished = _running_sink_operators == 0;
-            if (_data_queue[id].try_dequeue(next_block)) {
-                block->swap(next_block);
-                if (_free_block_limit == 0 ||
-                    _free_blocks.size_approx() < _free_block_limit * 
_num_sources) {
-                    _free_blocks.enqueue(std::move(next_block));
-                }
-                sub_mem_usage(local_state, id, block->allocated_bytes());
-            } else if (all_finished) {
-                *eos = true;
-            }
+            BlockWrapperSPtr next_block;
+            _dequeue_data(local_state, next_block, eos, block, id);
             return Status::OK();
         };
         child_block_suppliers.push_back(block_supplier);
@@ -349,63 +372,13 @@ Status LocalMergeSortExchanger::get_block(RuntimeState* 
state, vectorized::Block
     return Status::OK();
 }
 
-void LocalMergeSortExchanger::sub_mem_usage(LocalExchangeSinkLocalState& 
local_state,
-                                            int64_t delta) {
-    const auto channel_id = local_state._channel_id;
-    local_state._shared_state->mem_trackers[channel_id]->release(delta);
-    if (_queues_mem_usege[channel_id].fetch_sub(delta) > _each_queue_limit) {
-        _sink_deps[channel_id]->set_ready();
-    }
-    // if queue empty , block this queue
-    if (_queues_mem_usege[channel_id] == 0) {
-        _queue_deps[channel_id]->block();
-    }
-}
-
-void LocalMergeSortExchanger::add_mem_usage(LocalExchangeSinkLocalState& 
local_state,
-                                            int64_t delta) {
-    const auto channel_id = local_state._channel_id;
-    local_state._shared_state->mem_trackers[channel_id]->consume(delta);
-    if (_queues_mem_usege[channel_id].fetch_add(delta) > _each_queue_limit) {
-        _sink_deps[channel_id]->block();
-    }
-    _queue_deps[channel_id]->set_ready();
-}
-
-void LocalMergeSortExchanger::sub_mem_usage(LocalExchangeSourceLocalState& 
local_state,
-                                            int channel_id, int64_t delta) {
-    local_state._shared_state->mem_trackers[channel_id]->release(delta);
-    if (_queues_mem_usege[channel_id].fetch_sub(delta) <= _each_queue_limit) {
-        _sink_deps[channel_id]->set_ready();
-    }
-    // if queue empty , block this queue
-    if (_queues_mem_usege[channel_id] == 0) {
-        _queue_deps[channel_id]->block();
-    }
-}
-
-std::vector<Dependency*> 
LocalMergeSortExchanger::local_sink_state_dependency(int channel_id) {
-    DCHECK(_sink_deps[channel_id]);
-    return {_sink_deps[channel_id].get()};
-}
-
-std::vector<Dependency*> LocalMergeSortExchanger::local_state_dependency(int 
channel_id) {
-    if (channel_id != 0) {
-        return {};
-    }
-    std::vector<Dependency*> deps;
-    for (auto depSptr : _queue_deps) {
-        deps.push_back(depSptr.get());
-    }
-    return deps;
-}
-
 Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block, bool eos,
                                 LocalExchangeSinkLocalState& local_state) {
     for (size_t i = 0; i < _num_partitions; i++) {
         auto mutable_block = 
vectorized::MutableBlock::create_unique(in_block->clone_empty());
         RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0, 
in_block->rows()));
-        _enqueue_data_and_set_ready(i, local_state, mutable_block->to_block());
+        _enqueue_data_and_set_ready(i, local_state,
+                                    
BlockWrapper::create_shared(mutable_block->to_block()));
     }
 
     return Status::OK();
@@ -413,18 +386,18 @@ Status BroadcastExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block
 
 void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
     vectorized::Block next_block;
+    bool eos;
+    BlockWrapperSPtr wrapper;
     _data_queue[local_state._channel_id].set_eos();
-    while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
-        // do nothing
+    while (_dequeue_data(local_state, wrapper, &eos, &next_block)) {
+        next_block = vectorized::Block();
     }
 }
 
 Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                      LocalExchangeSourceLocalState& 
local_state) {
-    vectorized::Block next_block;
-    if (_dequeue_data(local_state, next_block, eos)) {
-        *block = std::move(next_block);
-    }
+    BlockWrapperSPtr next_block;
+    _dequeue_data(local_state, next_block, eos, block);
     return Status::OK();
 }
 
@@ -437,12 +410,8 @@ Status 
AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
     }
     new_block.swap(*in_block);
     auto channel_id = (local_state._channel_id++) % _num_partitions;
-    size_t memory_usage = new_block.allocated_bytes();
-    local_state._shared_state->add_mem_usage(channel_id, memory_usage);
-
-    if (!_enqueue_data_and_set_ready(channel_id, local_state, 
std::move(new_block))) {
-        local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
-    }
+    _enqueue_data_and_set_ready(channel_id, local_state,
+                                
BlockWrapper::create_shared(std::move(new_block)));
 
     return Status::OK();
 }
@@ -497,11 +466,8 @@ Status 
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
             RETURN_IF_ERROR(mutable_block->add_rows(block, start, size));
             auto new_block = mutable_block->to_block();
 
-            size_t memory_usage = new_block.allocated_bytes();
-            local_state._shared_state->add_mem_usage(i, memory_usage);
-            if (!_enqueue_data_and_set_ready(i, local_state, 
std::move(new_block))) {
-                local_state._shared_state->sub_mem_usage(i, memory_usage);
-            }
+            _enqueue_data_and_set_ready(i, local_state,
+                                        
BlockWrapper::create_shared(std::move(new_block)));
         }
     }
     return Status::OK();
@@ -522,16 +488,19 @@ Status AdaptivePassthroughExchanger::sink(RuntimeState* 
state, vectorized::Block
 Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, 
vectorized::Block* block,
                                                bool* eos,
                                                LocalExchangeSourceLocalState& 
local_state) {
+    BlockWrapperSPtr next_block;
+    _dequeue_data(local_state, next_block, eos, block);
+    return Status::OK();
+}
+
+void AdaptivePassthroughExchanger::close(LocalExchangeSourceLocalState& 
local_state) {
     vectorized::Block next_block;
-    if (_dequeue_data(local_state, next_block, eos)) {
-        block->swap(next_block);
-        if (_free_block_limit == 0 ||
-            _free_blocks.size_approx() < _free_block_limit * _num_sources) {
-            _free_blocks.enqueue(std::move(next_block));
-        }
-        local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
+    bool eos;
+    BlockWrapperSPtr wrapper;
+    _data_queue[local_state._channel_id].set_eos();
+    while (_dequeue_data(local_state, wrapper, &eos, &next_block)) {
+        next_block = vectorized::Block();
     }
-    return Status::OK();
 }
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index afdebd21101..dfb5c31fff8 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -24,9 +24,22 @@ namespace doris::pipeline {
 
 class LocalExchangeSourceLocalState;
 class LocalExchangeSinkLocalState;
-struct ShuffleBlockWrapper;
+struct BlockWrapper;
 class SortSourceOperatorX;
 
+/**
+ * One exchanger is hold by one `LocalExchangeSharedState`. And one 
`LocalExchangeSharedState` is
+ * shared by all local exchange sink operators and source operators with the 
same id.
+ *
+ * In exchanger, two block queues is maintained, one is data block queue and 
another is free block queue.
+ *
+ * In details, data block queue has queues as many as source operators. Each 
source operator will get
+ * data block from the corresponding queue. Data blocks is push into the queue 
by sink operators. One
+ * sink operator will push blocks into one or more queues.
+ *
+ * Free block is used to reuse the allocated memory. To reduce the memory 
limit, we also use a conf
+ * to limit the size of free block queue.
+ */
 class ExchangerBase {
 public:
     ExchangerBase(int running_sink_operators, int num_partitions, int 
free_block_limit)
@@ -50,16 +63,17 @@ public:
     virtual Status sink(RuntimeState* state, vectorized::Block* in_block, bool 
eos,
                         LocalExchangeSinkLocalState& local_state) = 0;
     virtual ExchangeType get_type() const = 0;
+    // Called if a local exchanger source operator are closed. Free the unused 
data block in data_queue.
     virtual void close(LocalExchangeSourceLocalState& local_state) = 0;
-
-    virtual std::vector<Dependency*> local_sink_state_dependency(int 
channel_id) { return {}; }
-    virtual std::vector<Dependency*> local_state_dependency(int channel_id) { 
return {}; }
+    // Called if all local exchanger source operators are closed. We free the 
memory in
+    // `_free_blocks` here.
+    virtual void finalize(LocalExchangeSourceLocalState& local_state);
 
     virtual std::string data_queue_debug_string(int i) = 0;
 
 protected:
     friend struct LocalExchangeSharedState;
-    friend struct ShuffleBlockWrapper;
+    friend struct BlockWrapper;
     friend class LocalExchangeSourceLocalState;
     friend class LocalExchangeSinkOperatorX;
     friend class LocalExchangeSinkLocalState;
@@ -78,7 +92,7 @@ struct PartitionedRowIdxs {
     uint32_t length;
 };
 
-using PartitionedBlock = std::pair<std::shared_ptr<ShuffleBlockWrapper>, 
PartitionedRowIdxs>;
+using PartitionedBlock = std::pair<std::shared_ptr<BlockWrapper>, 
PartitionedRowIdxs>;
 
 template <typename BlockType>
 struct BlockQueue {
@@ -108,6 +122,8 @@ struct BlockQueue {
     void set_eos() { eos = true; }
 };
 
+using BlockWrapperSPtr = std::shared_ptr<BlockWrapper>;
+
 template <typename BlockType>
 class Exchanger : public ExchangerBase {
 public:
@@ -123,9 +139,13 @@ public:
     }
 
 protected:
-    bool _enqueue_data_and_set_ready(int channel_id, 
LocalExchangeSinkLocalState& local_state,
+    // Enqueue data block and set downstream source operator to read.
+    void _enqueue_data_and_set_ready(int channel_id, 
LocalExchangeSinkLocalState& local_state,
                                      BlockType&& block);
-    bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType& 
block, bool* eos);
+    bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType& 
block, bool* eos,
+                       vectorized::Block* data_block);
+    bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType& 
block, bool* eos,
+                       vectorized::Block* data_block, int channel_id);
     std::vector<BlockQueue<BlockType>> _data_queue;
 
 private:
@@ -135,10 +155,33 @@ private:
 class LocalExchangeSourceLocalState;
 class LocalExchangeSinkLocalState;
 
-struct ShuffleBlockWrapper {
-    ENABLE_FACTORY_CREATOR(ShuffleBlockWrapper);
-    ShuffleBlockWrapper(vectorized::Block&& data_block_) : 
data_block(std::move(data_block_)) {}
+/**
+ * `BlockWrapper` is used to wrap a data block with a reference count.
+ *
+ * In function `unref()`, if `ref_count` decremented to 0, which means this 
block is not needed by
+ * operators, so we put it into `_free_blocks` to reuse its memory if needed 
and refresh memory usage
+ * in current queue.
+ *
+ * Note: `ref_count` will be larger than 1 only if this block is shared 
between multiple queues in
+ * shuffle exchanger.
+ */
+struct BlockWrapper {
+    ENABLE_FACTORY_CREATOR(BlockWrapper);
+    BlockWrapper(vectorized::Block&& data_block_) : 
data_block(std::move(data_block_)) {}
+    ~BlockWrapper() { DCHECK_EQ(ref_count.load(), 0); }
     void ref(int delta) { ref_count += delta; }
+    void unref(LocalExchangeSharedState* shared_state, size_t allocated_bytes) 
{
+        if (ref_count.fetch_sub(1) == 1) {
+            shared_state->sub_total_mem_usage(allocated_bytes);
+            if (shared_state->exchanger->_free_block_limit == 0 ||
+                shared_state->exchanger->_free_blocks.size_approx() <
+                        shared_state->exchanger->_free_block_limit *
+                                shared_state->exchanger->_num_sources) {
+                data_block.clear_column_data();
+                
shared_state->exchanger->_free_blocks.enqueue(std::move(data_block));
+            }
+        }
+    }
     void unref(LocalExchangeSharedState* shared_state) {
         if (ref_count.fetch_sub(1) == 1) {
             shared_state->sub_total_mem_usage(data_block.allocated_bytes());
@@ -197,12 +240,12 @@ class BucketShuffleExchanger final : public 
ShuffleExchanger {
     ExchangeType get_type() const override { return 
ExchangeType::BUCKET_HASH_SHUFFLE; }
 };
 
-class PassthroughExchanger final : public Exchanger<vectorized::Block> {
+class PassthroughExchanger final : public Exchanger<BlockWrapperSPtr> {
 public:
     ENABLE_FACTORY_CREATOR(PassthroughExchanger);
     PassthroughExchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
-            : Exchanger<vectorized::Block>(running_sink_operators, 
num_partitions,
-                                           free_block_limit) {
+            : Exchanger<BlockWrapperSPtr>(running_sink_operators, 
num_partitions,
+                                          free_block_limit) {
         _data_queue.resize(num_partitions);
     }
     ~PassthroughExchanger() override = default;
@@ -215,12 +258,12 @@ public:
     void close(LocalExchangeSourceLocalState& local_state) override;
 };
 
-class PassToOneExchanger final : public Exchanger<vectorized::Block> {
+class PassToOneExchanger final : public Exchanger<BlockWrapperSPtr> {
 public:
     ENABLE_FACTORY_CREATOR(PassToOneExchanger);
     PassToOneExchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
-            : Exchanger<vectorized::Block>(running_sink_operators, 
num_partitions,
-                                           free_block_limit) {
+            : Exchanger<BlockWrapperSPtr>(running_sink_operators, 
num_partitions,
+                                          free_block_limit) {
         _data_queue.resize(num_partitions);
     }
     ~PassToOneExchanger() override = default;
@@ -233,25 +276,14 @@ public:
     void close(LocalExchangeSourceLocalState& local_state) override {}
 };
 
-class LocalMergeSortExchanger final : public Exchanger<vectorized::Block> {
+class LocalMergeSortExchanger final : public Exchanger<BlockWrapperSPtr> {
 public:
     ENABLE_FACTORY_CREATOR(LocalMergeSortExchanger);
     LocalMergeSortExchanger(std::shared_ptr<SortSourceOperatorX> sort_source,
                             int running_sink_operators, int num_partitions, 
int free_block_limit)
-            : Exchanger<vectorized::Block>(running_sink_operators, 
num_partitions,
-                                           free_block_limit),
-              _sort_source(std::move(sort_source)),
-              _queues_mem_usege(num_partitions),
-              _each_queue_limit(config::local_exchange_buffer_mem_limit / 
num_partitions) {
+            : Exchanger<BlockWrapperSPtr>(running_sink_operators, 
num_partitions, free_block_limit),
+              _sort_source(std::move(sort_source)) {
         _data_queue.resize(num_partitions);
-        for (size_t i = 0; i < num_partitions; i++) {
-            _queues_mem_usege[i] = 0;
-            _sink_deps.push_back(
-                    std::make_shared<Dependency>(0, 0, 
"LOCAL_MERGE_SORT_SINK_DEPENDENCY", true));
-            _queue_deps.push_back(
-                    std::make_shared<Dependency>(0, 0, 
"LOCAL_MERGE_SORT_QUEUE_DEPENDENCY"));
-            _queue_deps.back()->block();
-        }
     }
     ~LocalMergeSortExchanger() override = default;
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
@@ -263,33 +295,21 @@ public:
 
     Status build_merger(RuntimeState* statem, LocalExchangeSourceLocalState& 
local_state);
 
-    std::vector<Dependency*> local_sink_state_dependency(int channel_id) 
override;
-
-    std::vector<Dependency*> local_state_dependency(int channel_id) override;
-
-    void add_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t 
delta);
-    void sub_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t 
delta);
-    void sub_mem_usage(LocalExchangeSourceLocalState& local_state, int 
channel_id, int64_t delta);
     void close(LocalExchangeSourceLocalState& local_state) override {}
+    void finalize(LocalExchangeSourceLocalState& local_state) override;
 
 private:
-    // only channel_id = 0 , build _merger and use it
-
     std::unique_ptr<vectorized::VSortedRunMerger> _merger;
     std::shared_ptr<SortSourceOperatorX> _sort_source;
-    std::vector<DependencySPtr> _sink_deps;
     std::vector<std::atomic_int64_t> _queues_mem_usege;
-    // if cur queue is empty, block this queue
-    std::vector<DependencySPtr> _queue_deps;
-    const int64_t _each_queue_limit;
 };
 
-class BroadcastExchanger final : public Exchanger<vectorized::Block> {
+class BroadcastExchanger final : public Exchanger<BlockWrapperSPtr> {
 public:
     ENABLE_FACTORY_CREATOR(BroadcastExchanger);
     BroadcastExchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
-            : Exchanger<vectorized::Block>(running_sink_operators, 
num_partitions,
-                                           free_block_limit) {
+            : Exchanger<BlockWrapperSPtr>(running_sink_operators, 
num_partitions,
+                                          free_block_limit) {
         _data_queue.resize(num_partitions);
     }
     ~BroadcastExchanger() override = default;
@@ -304,13 +324,13 @@ public:
 
 //The code in AdaptivePassthroughExchanger is essentially
 // a copy of ShuffleExchanger and PassthroughExchanger.
-class AdaptivePassthroughExchanger : public Exchanger<vectorized::Block> {
+class AdaptivePassthroughExchanger : public Exchanger<BlockWrapperSPtr> {
 public:
     ENABLE_FACTORY_CREATOR(AdaptivePassthroughExchanger);
     AdaptivePassthroughExchanger(int running_sink_operators, int 
num_partitions,
                                  int free_block_limit)
-            : Exchanger<vectorized::Block>(running_sink_operators, 
num_partitions,
-                                           free_block_limit) {
+            : Exchanger<BlockWrapperSPtr>(running_sink_operators, 
num_partitions,
+                                          free_block_limit) {
         _data_queue.resize(num_partitions);
     }
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
@@ -320,7 +340,7 @@ public:
                      LocalExchangeSourceLocalState& local_state) override;
     ExchangeType get_type() const override { return 
ExchangeType::ADAPTIVE_PASSTHROUGH; }
 
-    void close(LocalExchangeSourceLocalState& local_state) override {}
+    void close(LocalExchangeSourceLocalState& local_state) override;
 
 private:
     Status _passthrough_sink(RuntimeState* state, vectorized::Block* in_block, 
bool eos,
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index a2f26ac0a00..6f7a59c0f98 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -697,7 +697,10 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
                                             is_shuffled_hash_join, 
shuffle_idx_to_instance_idx));
 
     // 2. Create and initialize LocalExchangeSharedState.
-    auto shared_state = 
LocalExchangeSharedState::create_shared(_num_instances);
+    std::shared_ptr<LocalExchangeSharedState> shared_state =
+            data_distribution.distribution_type == 
ExchangeType::LOCAL_MERGE_SORT
+                    ? 
LocalMergeExchangeSharedState::create_shared(_num_instances)
+                    : LocalExchangeSharedState::create_shared(_num_instances);
     switch (data_distribution.distribution_type) {
     case ExchangeType::HASH_SHUFFLE:
         shared_state->exchanger = ShuffleExchanger::create_unique(
@@ -730,11 +733,20 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
                         : 0);
         break;
     case ExchangeType::PASS_TO_ONE:
-        shared_state->exchanger = BroadcastExchanger::create_unique(
-                cur_pipe->num_tasks(), _num_instances,
-                
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
-                        ? 
_runtime_state->query_options().local_exchange_free_blocks_limit
-                        : 0);
+        if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
+            // If shared hash table is enabled for BJ, hash table will be 
built by only one task
+            shared_state->exchanger = PassToOneExchanger::create_unique(
+                    cur_pipe->num_tasks(), _num_instances,
+                    
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
+                            ? 
_runtime_state->query_options().local_exchange_free_blocks_limit
+                            : 0);
+        } else {
+            shared_state->exchanger = BroadcastExchanger::create_unique(
+                    cur_pipe->num_tasks(), _num_instances,
+                    
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
+                            ? 
_runtime_state->query_options().local_exchange_free_blocks_limit
+                            : 0);
+        }
         break;
     case ExchangeType::LOCAL_MERGE_SORT: {
         auto child_op = cur_pipe->sink_x()->child_x();
@@ -788,7 +800,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
     }
     operator_xs.insert(operator_xs.begin(), source_op);
 
-    shared_state->create_source_dependencies(source_op->operator_id(), 
source_op->node_id());
+    shared_state->create_dependencies(source_op->operator_id(), 
source_op->node_id());
 
     // 5. Set children for two pipelines separately.
     std::vector<std::shared_ptr<Pipeline>> new_children;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to