This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new a5189156266 [fix](pipeline) Do not push data in local exchange if eos 
(#35972) (#36010)
a5189156266 is described below

commit a5189156266b8cb1dfccdec29000eb834276ed9a
Author: Gabriel <[email protected]>
AuthorDate: Fri Jun 7 15:40:55 2024 +0800

    [fix](pipeline) Do not push data in local exchange if eos (#35972) (#36010)
    
    
    pick #35972 and #34536
---
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   |   9 ++
 be/src/pipeline/exec/hashjoin_probe_operator.h     |   1 +
 .../exec/partitioned_hash_join_probe_operator.cpp  |  20 +++
 .../exec/partitioned_hash_join_probe_operator.h    |   2 +
 be/src/pipeline/pipeline_x/dependency.h            |   4 +-
 .../local_exchange/local_exchange_sink_operator.h  |   4 +-
 .../local_exchange_source_operator.cpp             |   6 +-
 .../local_exchange_source_operator.h               |   5 +-
 .../pipeline_x/local_exchange/local_exchanger.cpp  | 115 +++++++++-------
 .../pipeline_x/local_exchange/local_exchanger.h    | 147 ++++++++++++++-------
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  38 ++++--
 .../java/org/apache/doris/qe/SessionVariable.java  |   5 +
 gensrc/thrift/PaloInternalService.thrift           |   1 +
 13 files changed, 249 insertions(+), 108 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index a58ad62211c..00cf6a65eb0 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -373,6 +373,15 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState* 
state, vectorized::Bloc
     return Status::OK();
 }
 
+std::string HashJoinProbeLocalState::debug_string(int indentation_level) const 
{
+    fmt::memory_buffer debug_string_buffer;
+    fmt::format_to(debug_string_buffer, "{}, short_circuit_for_probe: {}",
+                   JoinProbeLocalState<HashJoinSharedState, 
HashJoinProbeLocalState>::debug_string(
+                           indentation_level),
+                   _shared_state ? 
std::to_string(_shared_state->short_circuit_for_probe) : "NULL");
+    return fmt::to_string(debug_string_buffer);
+}
+
 Status HashJoinProbeLocalState::_extract_join_column(vectorized::Block& block,
                                                      
vectorized::ColumnUInt8::MutablePtr& null_map,
                                                      
vectorized::ColumnRawPtrs& raw_ptrs,
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index b4930307bcc..5cdfe9feeb7 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -94,6 +94,7 @@ public:
     const std::shared_ptr<vectorized::Block>& build_block() const {
         return _shared_state->build_block;
     }
+    std::string debug_string(int indentation_level) const override;
 
 private:
     void _prepare_probe_block();
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 10fa2effcc0..0576ae91dd6 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -437,6 +437,16 @@ Status 
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
     return st;
 }
 
+std::string PartitionedHashJoinProbeOperatorX::debug_string(RuntimeState* 
state,
+                                                            int 
indentation_level) const {
+    fmt::memory_buffer debug_string_buffer;
+    fmt::format_to(debug_string_buffer, "{}, in mem join probe: {}",
+                   
JoinProbeOperatorX<PartitionedHashJoinProbeLocalState>::debug_string(
+                           state, indentation_level),
+                   _inner_probe_operator ? 
_inner_probe_operator->debug_string(state, 0) : "NULL");
+    return fmt::to_string(debug_string_buffer);
+}
+
 Status 
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(RuntimeState*
 state,
                                                                            
uint32_t partition_index,
                                                                            
bool& has_data) {
@@ -872,6 +882,16 @@ Status 
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     const auto need_to_spill = local_state._shared_state->need_to_spill;
+#ifndef NDEBUG
+    Defer eos_check_defer([&] {
+        if (*eos) {
+            LOG(INFO) << "query: " << print_id(state->query_id())
+                      << ", hash probe node: " << node_id() << ", task: " << 
state->task_id()
+                      << ", eos with child eos: " << local_state._child_eos
+                      << ", need spill: " << need_to_spill;
+        }
+    });
+#endif
     if (need_more_input_data(state)) {
         if (need_to_spill && _should_revoke_memory(state)) {
             bool wait_for_io = false;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 3b942d15755..d56a57ae428 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -155,6 +155,8 @@ public:
     Status pull(doris::RuntimeState* state, vectorized::Block* output_block,
                 bool* eos) const override;
 
+    std::string debug_string(RuntimeState* state, int indentation_level = 0) 
const override;
+
     bool need_more_input_data(RuntimeState* state) const override;
     DataDistribution required_data_distribution() const override {
         if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 72763d03c8f..b60b3e9ae3b 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -766,13 +766,13 @@ struct DataDistribution {
     std::vector<TExpr> partition_exprs;
 };
 
-class Exchanger;
+class ExchangerBase;
 
 struct LocalExchangeSharedState : public BasicSharedState {
 public:
     ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
     LocalExchangeSharedState(int num_instances);
-    std::unique_ptr<Exchanger> exchanger {};
+    std::unique_ptr<ExchangerBase> exchanger {};
     std::vector<MemTracker*> mem_trackers;
     std::atomic<size_t> mem_usage = 0;
     std::mutex le_lock;
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index db6662a221a..99b88747a98 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -21,7 +21,7 @@
 
 namespace doris::pipeline {
 
-class Exchanger;
+class ExchangerBase;
 class ShuffleExchanger;
 class PassthroughExchanger;
 class BroadcastExchanger;
@@ -50,7 +50,7 @@ private:
     friend class PassToOneExchanger;
     friend class AdaptivePassthroughExchanger;
 
-    Exchanger* _exchanger = nullptr;
+    ExchangerBase* _exchanger = nullptr;
 
     // Used by shuffle exchanger
     RuntimeProfile::Counter* _compute_hash_value_timer = nullptr;
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index dbc4e37bbab..086a3b551fd 100644
--- 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++ 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -66,14 +66,16 @@ std::string LocalExchangeSourceLocalState::debug_string(int 
indentation_level) c
     fmt::memory_buffer debug_string_buffer;
     fmt::format_to(debug_string_buffer,
                    "{}, _channel_id: {}, _num_partitions: {}, _num_senders: 
{}, _num_sources: {}, "
-                   "_running_sink_operators: {}, _running_source_operators: 
{}",
+                   "_running_sink_operators: {}, _running_source_operators: 
{}, mem_usage: {}",
                    Base::debug_string(indentation_level), _channel_id, 
_exchanger->_num_partitions,
                    _exchanger->_num_senders, _exchanger->_num_sources,
-                   _exchanger->_running_sink_operators, 
_exchanger->_running_source_operators);
+                   _exchanger->_running_sink_operators, 
_exchanger->_running_source_operators,
+                   _shared_state->mem_usage.load());
     size_t i = 0;
     fmt::format_to(debug_string_buffer, ", MemTrackers: ");
     for (auto* mem_tracker : _shared_state->mem_trackers) {
         fmt::format_to(debug_string_buffer, "{}: {}, ", i, 
mem_tracker->consumption());
+        i++;
     }
     return fmt::to_string(debug_string_buffer);
 }
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
index c7583d1351c..7cefc1ca900 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
@@ -21,7 +21,7 @@
 
 namespace doris::pipeline {
 
-class Exchanger;
+class ExchangerBase;
 class ShuffleExchanger;
 class PassthroughExchanger;
 class BroadcastExchanger;
@@ -41,13 +41,14 @@ public:
 
 private:
     friend class LocalExchangeSourceOperatorX;
+    friend class ExchangerBase;
     friend class ShuffleExchanger;
     friend class PassthroughExchanger;
     friend class BroadcastExchanger;
     friend class PassToOneExchanger;
     friend class AdaptivePassthroughExchanger;
 
-    Exchanger* _exchanger = nullptr;
+    ExchangerBase* _exchanger = nullptr;
     int _channel_id;
     RuntimeProfile::Counter* _get_block_failed_counter = nullptr;
     RuntimeProfile::Counter* _copy_data_timer = nullptr;
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index abcc0161fd7..f02fd0e5f04 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -17,8 +17,10 @@
 
 #include "pipeline/pipeline_x/local_exchange/local_exchanger.h"
 
+#include "common/status.h"
 #include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h"
 #include "pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h"
+#include "vec/runtime/partitioner.h"
 
 namespace doris::pipeline {
 
@@ -41,6 +43,7 @@ Status ShuffleExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block,
 
 void ShuffleExchanger::close(LocalExchangeSourceLocalState& local_state) {
     PartitionedBlock partitioned_block;
+    _data_queue[local_state._channel_id].set_eos();
     while 
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
         auto block_wrapper = partitioned_block.first;
         local_state._shared_state->sub_mem_usage(
@@ -52,38 +55,36 @@ void ShuffleExchanger::close(LocalExchangeSourceLocalState& 
local_state) {
 Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos,
                                    LocalExchangeSourceLocalState& local_state) 
{
     PartitionedBlock partitioned_block;
-    std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr;
+    vectorized::MutableBlock mutable_block;
 
     auto get_data = [&](vectorized::Block* result_block) -> Status {
         do {
-            const auto* offset_start = &((
-                    
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
+            const auto* offset_start = 
partitioned_block.second.row_idxs->data() +
+                                       partitioned_block.second.offset_start;
             auto block_wrapper = partitioned_block.first;
             local_state._shared_state->sub_mem_usage(
                     local_state._channel_id, 
block_wrapper->data_block.allocated_bytes(), false);
-            RETURN_IF_ERROR(
-                    mutable_block->add_rows(&block_wrapper->data_block, 
offset_start,
-                                            offset_start + 
std::get<2>(partitioned_block.second)));
+            RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->data_block, 
offset_start,
+                                                   offset_start + 
partitioned_block.second.length));
             block_wrapper->unref(local_state._shared_state);
-        } while (mutable_block->rows() < state->batch_size() &&
+        } while (mutable_block.rows() < state->batch_size() &&
                  
_data_queue[local_state._channel_id].try_dequeue(partitioned_block));
-        *result_block = mutable_block->to_block();
         return Status::OK();
     };
+
     if (_running_sink_operators == 0) {
         if 
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
             SCOPED_TIMER(local_state._copy_data_timer);
-            mutable_block = vectorized::MutableBlock::create_unique(
-                    partitioned_block.first->data_block.clone_empty());
+            mutable_block = 
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
+                    block, partitioned_block.first->data_block);
             RETURN_IF_ERROR(get_data(block));
         } else {
-            COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
             *eos = true;
         }
     } else if 
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
         SCOPED_TIMER(local_state._copy_data_timer);
-        mutable_block = vectorized::MutableBlock::create_unique(
-                partitioned_block.first->data_block.clone_empty());
+        mutable_block = 
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
+                block, partitioned_block.first->data_block);
         RETURN_IF_ERROR(get_data(block));
     } else {
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
@@ -97,7 +98,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
                                      LocalExchangeSinkLocalState& local_state) 
{
     auto& data_queue = _data_queue;
     const auto rows = block->rows();
-    auto row_idx = std::make_shared<std::vector<uint32_t>>(rows);
+    auto row_idx = std::make_shared<vectorized::PODArray<uint32_t>>(rows);
     {
         local_state._partition_rows_histogram.assign(_num_partitions + 1, 0);
         for (size_t i = 0; i < rows; ++i) {
@@ -107,7 +108,6 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
             local_state._partition_rows_histogram[i] +=
                     local_state._partition_rows_histogram[i - 1];
         }
-
         for (int32_t i = rows - 1; i >= 0; --i) {
             (*row_idx)[local_state._partition_rows_histogram[channel_ids[i]] - 
1] = i;
             local_state._partition_rows_histogram[channel_ids[i]]--;
@@ -134,13 +134,16 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
         for (const auto& it : map) {
             DCHECK(it.second >= 0 && it.second < _num_partitions)
                     << it.first << " : " << it.second << " " << 
_num_partitions;
-            size_t start = local_state._partition_rows_histogram[it.first];
-            size_t size = local_state._partition_rows_histogram[it.first + 1] 
- start;
+            uint32_t start = local_state._partition_rows_histogram[it.first];
+            uint32_t size = local_state._partition_rows_histogram[it.first + 
1] - start;
             if (size > 0) {
                 local_state._shared_state->add_mem_usage(
                         it.second, 
new_block_wrapper->data_block.allocated_bytes(), false);
-                data_queue[it.second].enqueue({new_block_wrapper, {row_idx, 
start, size}});
-                local_state._shared_state->set_ready_to_read(it.second);
+                if (data_queue[it.second].enqueue({new_block_wrapper, 
{row_idx, start, size}})) {
+                    local_state._shared_state->set_ready_to_read(it.second);
+                } else {
+                    new_block_wrapper->unref(local_state._shared_state);
+                }
             } else {
                 new_block_wrapper->unref(local_state._shared_state);
             }
@@ -148,13 +151,17 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
     } else if (_num_senders != _num_sources || 
_ignore_source_data_distribution) {
         new_block_wrapper->ref(_num_partitions);
         for (size_t i = 0; i < _num_partitions; i++) {
-            size_t start = local_state._partition_rows_histogram[i];
-            size_t size = local_state._partition_rows_histogram[i + 1] - start;
+            uint32_t start = local_state._partition_rows_histogram[i];
+            uint32_t size = local_state._partition_rows_histogram[i + 1] - 
start;
             if (size > 0) {
                 local_state._shared_state->add_mem_usage(
                         i % _num_sources, 
new_block_wrapper->data_block.allocated_bytes(), false);
-                data_queue[i % _num_sources].enqueue({new_block_wrapper, 
{row_idx, start, size}});
-                local_state._shared_state->set_ready_to_read(i % _num_sources);
+                if (data_queue[i % _num_sources].enqueue(
+                            {new_block_wrapper, {row_idx, start, size}})) {
+                    local_state._shared_state->set_ready_to_read(i % 
_num_sources);
+                } else {
+                    new_block_wrapper->unref(local_state._shared_state);
+                }
             } else {
                 new_block_wrapper->unref(local_state._shared_state);
             }
@@ -164,13 +171,16 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
         auto map =
                 
local_state._parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx;
         for (size_t i = 0; i < _num_partitions; i++) {
-            size_t start = local_state._partition_rows_histogram[i];
-            size_t size = local_state._partition_rows_histogram[i + 1] - start;
+            uint32_t start = local_state._partition_rows_histogram[i];
+            uint32_t size = local_state._partition_rows_histogram[i + 1] - 
start;
             if (size > 0) {
                 local_state._shared_state->add_mem_usage(
                         map[i], 
new_block_wrapper->data_block.allocated_bytes(), false);
-                data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, 
start, size}});
-                local_state._shared_state->set_ready_to_read(map[i]);
+                if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx, 
start, size}})) {
+                    local_state._shared_state->set_ready_to_read(map[i]);
+                } else {
+                    new_block_wrapper->unref(local_state._shared_state);
+                }
             } else {
                 new_block_wrapper->unref(local_state._shared_state);
             }
@@ -189,14 +199,16 @@ Status PassthroughExchanger::sink(RuntimeState* state, 
vectorized::Block* in_blo
     new_block.swap(*in_block);
     auto channel_id = (local_state._channel_id++) % _num_partitions;
     local_state._shared_state->add_mem_usage(channel_id, 
new_block.allocated_bytes());
-    _data_queue[channel_id].enqueue(std::move(new_block));
-    local_state._shared_state->set_ready_to_read(channel_id);
+    if (_data_queue[channel_id].enqueue(std::move(new_block))) {
+        local_state._shared_state->set_ready_to_read(channel_id);
+    }
 
     return Status::OK();
 }
 
 void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
     vectorized::Block next_block;
+    _data_queue[local_state._channel_id].set_eos();
     while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
         local_state._shared_state->sub_mem_usage(local_state._channel_id,
                                                  next_block.allocated_bytes());
@@ -209,16 +221,21 @@ Status PassthroughExchanger::get_block(RuntimeState* 
state, vectorized::Block* b
     if (_running_sink_operators == 0) {
         if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
             block->swap(next_block);
-            _free_blocks.enqueue(std::move(next_block));
             local_state._shared_state->sub_mem_usage(local_state._channel_id,
                                                      block->allocated_bytes());
+            if (_free_block_limit == 0 ||
+                _free_blocks.size_approx() < _free_block_limit * _num_sources) 
{
+                _free_blocks.enqueue(std::move(next_block));
+            }
         } else {
-            COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
             *eos = true;
         }
     } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
         block->swap(next_block);
-        _free_blocks.enqueue(std::move(next_block));
+        if (_free_block_limit == 0 ||
+            _free_blocks.size_approx() < _free_block_limit * _num_sources) {
+            _free_blocks.enqueue(std::move(next_block));
+        }
         local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
     } else {
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
@@ -231,8 +248,9 @@ Status PassToOneExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block
                                 LocalExchangeSinkLocalState& local_state) {
     vectorized::Block new_block(in_block->clone_empty());
     new_block.swap(*in_block);
-    _data_queue[0].enqueue(std::move(new_block));
-    local_state._shared_state->set_ready_to_read(0);
+    if (_data_queue[0].enqueue(std::move(new_block))) {
+        local_state._shared_state->set_ready_to_read(0);
+    }
 
     return Status::OK();
 }
@@ -248,7 +266,6 @@ Status PassToOneExchanger::get_block(RuntimeState* state, 
vectorized::Block* blo
         if (_data_queue[0].try_dequeue(next_block)) {
             *block = std::move(next_block);
         } else {
-            COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
             *eos = true;
         }
     } else if (_data_queue[0].try_dequeue(next_block)) {
@@ -265,8 +282,9 @@ Status BroadcastExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block
     for (size_t i = 0; i < _num_partitions; i++) {
         auto mutable_block = 
vectorized::MutableBlock::create_unique(in_block->clone_empty());
         RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0, 
in_block->rows()));
-        _data_queue[i].enqueue(mutable_block->to_block());
-        local_state._shared_state->set_ready_to_read(i);
+        if (_data_queue[i].enqueue(mutable_block->to_block())) {
+            local_state._shared_state->set_ready_to_read(i);
+        }
     }
 
     return Status::OK();
@@ -274,6 +292,7 @@ Status BroadcastExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block
 
 void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
     vectorized::Block next_block;
+    _data_queue[local_state._channel_id].set_eos();
     while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
         // do nothing
     }
@@ -286,7 +305,6 @@ Status BroadcastExchanger::get_block(RuntimeState* state, 
vectorized::Block* blo
         if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
             *block = std::move(next_block);
         } else {
-            COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
             *eos = true;
         }
     } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
@@ -308,8 +326,9 @@ Status 
AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
     new_block.swap(*in_block);
     auto channel_id = (local_state._channel_id++) % _num_partitions;
     local_state._shared_state->add_mem_usage(channel_id, 
new_block.allocated_bytes());
-    _data_queue[channel_id].enqueue(std::move(new_block));
-    local_state._shared_state->set_ready_to_read(channel_id);
+    if (_data_queue[channel_id].enqueue(std::move(new_block))) {
+        local_state._shared_state->set_ready_to_read(channel_id);
+    }
 
     return Status::OK();
 }
@@ -365,9 +384,10 @@ Status 
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
             RETURN_IF_ERROR(mutable_block->add_rows(block, start, size));
             auto new_block = mutable_block->to_block();
             local_state._shared_state->add_mem_usage(i, 
new_block.allocated_bytes());
-            data_queue[i].enqueue(std::move(new_block));
+            if (data_queue[i].enqueue(std::move(new_block))) {
+                local_state._shared_state->set_ready_to_read(i);
+            }
         }
-        local_state._shared_state->set_ready_to_read(i);
     }
     return Status::OK();
 }
@@ -391,16 +411,21 @@ Status 
AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized::
     if (_running_sink_operators == 0) {
         if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
             block->swap(next_block);
-            _free_blocks.enqueue(std::move(next_block));
+            if (_free_block_limit == 0 ||
+                _free_blocks.size_approx() < _free_block_limit * _num_sources) 
{
+                _free_blocks.enqueue(std::move(next_block));
+            }
             local_state._shared_state->sub_mem_usage(local_state._channel_id,
                                                      block->allocated_bytes());
         } else {
-            COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
             *eos = true;
         }
     } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
         block->swap(next_block);
-        _free_blocks.enqueue(std::move(next_block));
+        if (_free_block_limit == 0 ||
+            _free_blocks.size_approx() < _free_block_limit * _num_sources) {
+            _free_blocks.enqueue(std::move(next_block));
+        }
         local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
     } else {
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
index f3b210b11f3..ee0b5e286de 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include "pipeline/pipeline_x/dependency.h"
 #include "pipeline/pipeline_x/operator.h"
 
 namespace doris::pipeline {
@@ -24,28 +25,34 @@ namespace doris::pipeline {
 class LocalExchangeSourceLocalState;
 class LocalExchangeSinkLocalState;
 struct ShuffleBlockWrapper;
+class SortSourceOperatorX;
 
-class Exchanger {
+class ExchangerBase {
 public:
-    Exchanger(int running_sink_operators, int num_partitions)
+    ExchangerBase(int running_sink_operators, int num_partitions, int 
free_block_limit)
             : _running_sink_operators(running_sink_operators),
               _running_source_operators(num_partitions),
               _num_partitions(num_partitions),
               _num_senders(running_sink_operators),
-              _num_sources(num_partitions) {}
-    Exchanger(int running_sink_operators, int num_sources, int num_partitions)
+              _num_sources(num_partitions),
+              _free_block_limit(free_block_limit) {}
+    ExchangerBase(int running_sink_operators, int num_sources, int 
num_partitions,
+                  int free_block_limit)
             : _running_sink_operators(running_sink_operators),
               _running_source_operators(num_partitions),
               _num_partitions(num_partitions),
               _num_senders(running_sink_operators),
-              _num_sources(num_sources) {}
-    virtual ~Exchanger() = default;
+              _num_sources(num_sources),
+              _free_block_limit(free_block_limit) {}
+    virtual ~ExchangerBase() = default;
     virtual Status get_block(RuntimeState* state, vectorized::Block* block, 
bool* eos,
                              LocalExchangeSourceLocalState& local_state) = 0;
     virtual Status sink(RuntimeState* state, vectorized::Block* in_block, bool 
eos,
                         LocalExchangeSinkLocalState& local_state) = 0;
     virtual ExchangeType get_type() const = 0;
-    virtual void close(LocalExchangeSourceLocalState& local_state) {}
+    virtual void close(LocalExchangeSourceLocalState& local_state) = 0;
+
+    virtual DependencySPtr get_local_state_dependency(int _channel_id) { 
return nullptr; }
 
 protected:
     friend struct LocalExchangeSharedState;
@@ -58,9 +65,60 @@ protected:
     const int _num_partitions;
     const int _num_senders;
     const int _num_sources;
+    const int _free_block_limit = 0;
     moodycamel::ConcurrentQueue<vectorized::Block> _free_blocks;
 };
 
+struct PartitionedRowIdxs {
+    std::shared_ptr<vectorized::PODArray<uint32_t>> row_idxs;
+    uint32_t offset_start;
+    uint32_t length;
+};
+
+using PartitionedBlock = std::pair<std::shared_ptr<ShuffleBlockWrapper>, 
PartitionedRowIdxs>;
+
+template <typename BlockType>
+struct BlockQueue {
+    std::atomic<bool> eos = false;
+    moodycamel::ConcurrentQueue<BlockType> data_queue;
+    BlockQueue() : eos(false), 
data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
+    BlockQueue(BlockQueue<BlockType>&& other)
+            : eos(other.eos.load()), data_queue(std::move(other.data_queue)) {}
+    inline bool enqueue(BlockType const& item) {
+        if (!eos) {
+            data_queue.enqueue(item);
+            return true;
+        }
+        return false;
+    }
+
+    inline bool enqueue(BlockType&& item) {
+        if (!eos) {
+            data_queue.enqueue(std::move(item));
+            return true;
+        }
+        return false;
+    }
+
+    bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }
+
+    void set_eos() { eos = true; }
+};
+
+template <typename BlockType>
+class Exchanger : public ExchangerBase {
+public:
+    Exchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
+            : ExchangerBase(running_sink_operators, num_partitions, 
free_block_limit) {}
+    Exchanger(int running_sink_operators, int num_sources, int num_partitions, 
int free_block_limit)
+            : ExchangerBase(running_sink_operators, num_sources, 
num_partitions, free_block_limit) {
+    }
+    ~Exchanger() override = default;
+
+protected:
+    std::vector<BlockQueue<BlockType>> _data_queue;
+};
+
 class LocalExchangeSourceLocalState;
 class LocalExchangeSinkLocalState;
 
@@ -71,23 +129,25 @@ struct ShuffleBlockWrapper {
     void unref(LocalExchangeSharedState* shared_state) {
         if (ref_count.fetch_sub(1) == 1) {
             shared_state->sub_total_mem_usage(data_block.allocated_bytes());
-            data_block.clear_column_data();
-            
shared_state->exchanger->_free_blocks.enqueue(std::move(data_block));
+            if (shared_state->exchanger->_free_block_limit == 0 ||
+                shared_state->exchanger->_free_blocks.size_approx() <
+                        shared_state->exchanger->_free_block_limit *
+                                shared_state->exchanger->_num_sources) {
+                data_block.clear_column_data();
+                
shared_state->exchanger->_free_blocks.enqueue(std::move(data_block));
+            }
         }
     }
     std::atomic<int> ref_count = 0;
     vectorized::Block data_block;
 };
 
-class ShuffleExchanger : public Exchanger {
-    using PartitionedBlock =
-            std::pair<std::shared_ptr<ShuffleBlockWrapper>,
-                      std::tuple<std::shared_ptr<std::vector<uint32_t>>, 
size_t, size_t>>;
-
+class ShuffleExchanger : public Exchanger<PartitionedBlock> {
 public:
     ENABLE_FACTORY_CREATOR(ShuffleExchanger);
-    ShuffleExchanger(int running_sink_operators, int num_partitions)
-            : Exchanger(running_sink_operators, num_partitions) {
+    ShuffleExchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
+            : Exchanger<PartitionedBlock>(running_sink_operators, 
num_partitions,
+                                          free_block_limit) {
         _data_queue.resize(num_partitions);
     }
     ~ShuffleExchanger() override = default;
@@ -101,8 +161,9 @@ public:
 
 protected:
     ShuffleExchanger(int running_sink_operators, int num_sources, int 
num_partitions,
-                     bool ignore_source_data_distribution)
-            : Exchanger(running_sink_operators, num_sources, num_partitions),
+                     bool ignore_source_data_distribution, int 
free_block_limit)
+            : Exchanger<PartitionedBlock>(running_sink_operators, num_sources, 
num_partitions,
+                                          free_block_limit),
               
_ignore_source_data_distribution(ignore_source_data_distribution) {
         _data_queue.resize(num_partitions);
     }
@@ -110,26 +171,25 @@ protected:
                        vectorized::Block* block, bool eos,
                        LocalExchangeSinkLocalState& local_state);
 
-    std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> _data_queue;
-
     const bool _ignore_source_data_distribution = false;
 };
 
-class BucketShuffleExchanger : public ShuffleExchanger {
+class BucketShuffleExchanger final : public ShuffleExchanger {
     ENABLE_FACTORY_CREATOR(BucketShuffleExchanger);
     BucketShuffleExchanger(int running_sink_operators, int num_sources, int 
num_partitions,
-                           bool ignore_source_data_distribution)
+                           bool ignore_source_data_distribution, int 
free_block_limit)
             : ShuffleExchanger(running_sink_operators, num_sources, 
num_partitions,
-                               ignore_source_data_distribution) {}
+                               ignore_source_data_distribution, 
free_block_limit) {}
     ~BucketShuffleExchanger() override = default;
     ExchangeType get_type() const override { return 
ExchangeType::BUCKET_HASH_SHUFFLE; }
 };
 
-class PassthroughExchanger final : public Exchanger {
+class PassthroughExchanger final : public Exchanger<vectorized::Block> {
 public:
     ENABLE_FACTORY_CREATOR(PassthroughExchanger);
-    PassthroughExchanger(int running_sink_operators, int num_partitions)
-            : Exchanger(running_sink_operators, num_partitions) {
+    PassthroughExchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
+            : Exchanger<vectorized::Block>(running_sink_operators, 
num_partitions,
+                                           free_block_limit) {
         _data_queue.resize(num_partitions);
     }
     ~PassthroughExchanger() override = default;
@@ -140,16 +200,14 @@ public:
                      LocalExchangeSourceLocalState& local_state) override;
     ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; 
}
     void close(LocalExchangeSourceLocalState& local_state) override;
-
-private:
-    std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
 };
 
-class PassToOneExchanger final : public Exchanger {
+class PassToOneExchanger final : public Exchanger<vectorized::Block> {
 public:
     ENABLE_FACTORY_CREATOR(PassToOneExchanger);
-    PassToOneExchanger(int running_sink_operators, int num_partitions)
-            : Exchanger(running_sink_operators, num_partitions) {
+    PassToOneExchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
+            : Exchanger<vectorized::Block>(running_sink_operators, 
num_partitions,
+                                           free_block_limit) {
         _data_queue.resize(num_partitions);
     }
     ~PassToOneExchanger() override = default;
@@ -159,16 +217,15 @@ public:
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
                      LocalExchangeSourceLocalState& local_state) override;
     ExchangeType get_type() const override { return ExchangeType::PASS_TO_ONE; 
}
-
-private:
-    std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
+    void close(LocalExchangeSourceLocalState& local_state) override {}
 };
 
-class BroadcastExchanger final : public Exchanger {
+class BroadcastExchanger final : public Exchanger<vectorized::Block> {
 public:
     ENABLE_FACTORY_CREATOR(BroadcastExchanger);
-    BroadcastExchanger(int running_sink_operators, int num_partitions)
-            : Exchanger(running_sink_operators, num_partitions) {
+    BroadcastExchanger(int running_sink_operators, int num_partitions, int 
free_block_limit)
+            : Exchanger<vectorized::Block>(running_sink_operators, 
num_partitions,
+                                           free_block_limit) {
         _data_queue.resize(num_partitions);
     }
     ~BroadcastExchanger() override = default;
@@ -179,18 +236,17 @@ public:
                      LocalExchangeSourceLocalState& local_state) override;
     ExchangeType get_type() const override { return ExchangeType::BROADCAST; }
     void close(LocalExchangeSourceLocalState& local_state) override;
-
-private:
-    std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
 };
 
 //The code in AdaptivePassthroughExchanger is essentially
 // a copy of ShuffleExchanger and PassthroughExchanger.
-class AdaptivePassthroughExchanger : public Exchanger {
+class AdaptivePassthroughExchanger : public Exchanger<vectorized::Block> {
 public:
     ENABLE_FACTORY_CREATOR(AdaptivePassthroughExchanger);
-    AdaptivePassthroughExchanger(int running_sink_operators, int 
num_partitions)
-            : Exchanger(running_sink_operators, num_partitions) {
+    AdaptivePassthroughExchanger(int running_sink_operators, int 
num_partitions,
+                                 int free_block_limit)
+            : Exchanger<vectorized::Block>(running_sink_operators, 
num_partitions,
+                                           free_block_limit) {
         _data_queue.resize(num_partitions);
     }
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
@@ -200,6 +256,8 @@ public:
                      LocalExchangeSourceLocalState& local_state) override;
     ExchangeType get_type() const override { return 
ExchangeType::ADAPTIVE_PASSTHROUGH; }
 
+    void close(LocalExchangeSourceLocalState& local_state) override {}
+
 private:
     Status _passthrough_sink(RuntimeState* state, vectorized::Block* in_block, 
bool eos,
                              LocalExchangeSinkLocalState& local_state);
@@ -208,7 +266,6 @@ private:
     Status _split_rows(RuntimeState* state, const uint32_t* __restrict 
channel_ids,
                        vectorized::Block* block, bool eos,
                        LocalExchangeSinkLocalState& local_state);
-    std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
 
     std::atomic_bool _is_pass_through = false;
     std::atomic_int32_t _total_block = 0;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index ccc51faa777..bcb3c53b532 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -801,28 +801,46 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
     case ExchangeType::HASH_SHUFFLE:
         shared_state->exchanger = ShuffleExchanger::create_unique(
                 std::max(cur_pipe->num_tasks(), _num_instances),
-                is_shuffled_hash_join ? _total_instances : _num_instances);
+                is_shuffled_hash_join ? _total_instances : _num_instances,
+                
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
+                        ? 
_runtime_state->query_options().local_exchange_free_blocks_limit
+                        : 0);
         break;
     case ExchangeType::BUCKET_HASH_SHUFFLE:
         shared_state->exchanger = BucketShuffleExchanger::create_unique(
                 std::max(cur_pipe->num_tasks(), _num_instances), 
_num_instances, num_buckets,
-                ignore_data_hash_distribution);
+                ignore_data_hash_distribution,
+                
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
+                        ? 
_runtime_state->query_options().local_exchange_free_blocks_limit
+                        : 0);
         break;
     case ExchangeType::PASSTHROUGH:
-        shared_state->exchanger =
-                PassthroughExchanger::create_unique(cur_pipe->num_tasks(), 
_num_instances);
+        shared_state->exchanger = PassthroughExchanger::create_unique(
+                cur_pipe->num_tasks(), _num_instances,
+                
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
+                        ? 
_runtime_state->query_options().local_exchange_free_blocks_limit
+                        : 0);
         break;
     case ExchangeType::BROADCAST:
-        shared_state->exchanger =
-                BroadcastExchanger::create_unique(cur_pipe->num_tasks(), 
_num_instances);
+        shared_state->exchanger = BroadcastExchanger::create_unique(
+                cur_pipe->num_tasks(), _num_instances,
+                
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
+                        ? 
_runtime_state->query_options().local_exchange_free_blocks_limit
+                        : 0);
         break;
     case ExchangeType::PASS_TO_ONE:
-        shared_state->exchanger =
-                BroadcastExchanger::create_unique(cur_pipe->num_tasks(), 
_num_instances);
+        shared_state->exchanger = BroadcastExchanger::create_unique(
+                cur_pipe->num_tasks(), _num_instances,
+                
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
+                        ? 
_runtime_state->query_options().local_exchange_free_blocks_limit
+                        : 0);
         break;
     case ExchangeType::ADAPTIVE_PASSTHROUGH:
-        shared_state->exchanger =
-                
AdaptivePassthroughExchanger::create_unique(cur_pipe->num_tasks(), 
_num_instances);
+        shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
+                cur_pipe->num_tasks(), _num_instances,
+                
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
+                        ? 
_runtime_state->query_options().local_exchange_free_blocks_limit
+                        : 0);
         break;
     default:
         return Status::InternalError("Unsupported local exchange type : " +
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 03479643522..01cbfa82c36 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -76,6 +76,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public static final Logger LOG = 
LogManager.getLogger(SessionVariable.class);
 
     public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
+    public static final String LOCAL_EXCHANGE_FREE_BLOCKS_LIMIT = 
"local_exchange_free_blocks_limit";
     public static final String SCAN_QUEUE_MEM_LIMIT = "scan_queue_mem_limit";
     public static final String NUM_SCANNER_THREADS = "num_scanner_threads";
     public static final String SCANNER_SCALE_UP_RATIO = 
"scanner_scale_up_ratio";
@@ -615,6 +616,9 @@ public class SessionVariable implements Serializable, 
Writable {
     })
     public int numScannerThreads = 0;
 
+    @VariableMgr.VarAttr(name = LOCAL_EXCHANGE_FREE_BLOCKS_LIMIT)
+    public int localExchangeFreeBlocksLimit = 4;
+
     @VariableMgr.VarAttr(name = SCANNER_SCALE_UP_RATIO, needForward = true, 
description = {
             "ScanNode自适应的增加扫描并发,最大允许增长的并发倍率,默认为0,关闭该功能",
             "The max multiple of increasing the concurrency of scanners 
adaptively, "
@@ -3157,6 +3161,7 @@ public class SessionVariable implements Serializable, 
Writable {
     public TQueryOptions toThrift() {
         TQueryOptions tResult = new TQueryOptions();
         tResult.setMemLimit(maxExecMemByte);
+        tResult.setLocalExchangeFreeBlocksLimit(localExchangeFreeBlocksLimit);
         tResult.setScanQueueMemLimit(maxScanQueueMemByte);
         tResult.setNumScannerThreads(numScannerThreads);
         tResult.setScannerScaleUpRatio(scannerScaleUpRatio);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index 54c6c57bdd5..ad8836fefbe 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -288,6 +288,7 @@ struct TQueryOptions {
 
   // max rows of each sub-queue in DataQueue.
   106: optional i64 data_queue_max_blocks = 0;
+  108: optional i64 local_exchange_free_blocks_limit;
   
   110: optional bool enable_parquet_filter_by_min_max = true
   111: optional bool enable_orc_filter_by_min_max = true


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to