This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new c0aac043b66 [pipelineX](local shuffle) Use local shuffle to optimize 
BHJ (#27823)
c0aac043b66 is described below

commit c0aac043b66e6b3f14468a7c77e40f7d7c5c8f64
Author: Gabriel <[email protected]>
AuthorDate: Thu Nov 30 21:08:45 2023 +0800

    [pipelineX](local shuffle) Use local shuffle to optimize BHJ (#27823)
---
 be/src/pipeline/exec/exchange_source_operator.h    |   5 +
 be/src/pipeline/pipeline.h                         |  20 +++
 be/src/pipeline/pipeline_x/dependency.cpp          |   9 +-
 be/src/pipeline/pipeline_x/dependency.h            |  15 +--
 .../local_exchange_sink_operator.cpp               |  59 ++-------
 .../local_exchange/local_exchange_sink_operator.h  |  40 ++++--
 .../local_exchange_source_operator.cpp             |  57 +++-----
 .../local_exchange_source_operator.h               |  17 +--
 .../pipeline_x/local_exchange/local_exchanger.cpp  | 147 +++++++++++++++++++++
 .../pipeline_x/local_exchange/local_exchanger.h    |  98 ++++++++++++++
 be/src/pipeline/pipeline_x/operator.h              |   4 +-
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  67 ++++++++--
 .../pipeline_x/pipeline_x_fragment_context.h       |   5 +-
 13 files changed, 412 insertions(+), 131 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index abac33001bb..479a8799058 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -104,6 +104,11 @@ public:
         return _sub_plan_query_statistics_recvr;
     }
 
+    bool need_to_local_shuffle() const override {
+        // TODO(gabriel):
+        return false;
+    }
+
 private:
     friend class ExchangeLocalState;
     const int _num_senders;
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index f4b7928887c..a0b6de5c62e 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -126,6 +126,17 @@ public:
         return _collect_query_statistics_with_every_batch;
     }
 
+    bool need_to_local_shuffle() const { return _need_to_local_shuffle; }
+    void set_need_to_local_shuffle(bool need_to_local_shuffle) {
+        _need_to_local_shuffle = need_to_local_shuffle;
+    }
+    void init_need_to_local_shuffle_by_source() {
+        set_need_to_local_shuffle(operatorXs.front()->need_to_local_shuffle());
+    }
+
+    std::vector<std::shared_ptr<Pipeline>>& children() { return _children; }
+    void set_children(std::shared_ptr<Pipeline> child) { 
_children.push_back(child); }
+
 private:
     void _init_profile();
 
@@ -136,6 +147,8 @@ private:
     std::vector<std::pair<int, std::weak_ptr<Pipeline>>> _parents;
     std::vector<std::pair<int, std::shared_ptr<Pipeline>>> _dependencies;
 
+    std::vector<std::shared_ptr<Pipeline>> _children;
+
     PipelineId _pipeline_id;
     std::weak_ptr<PipelineFragmentContext> _context;
     int _previous_schedule_id = -1;
@@ -178,6 +191,13 @@ private:
     bool _always_can_write = false;
     bool _is_root_pipeline = false;
     bool _collect_query_statistics_with_every_batch = false;
+
+    // If source operator meets one of all conditions below:
+    // 1. is scan operator with Hash Bucket
+    // 2. is exchange operator with Hash/BucketHash partition
+    // then set `_need_to_local_shuffle` to false which means we should use 
local shuffle in this fragment
+    // because data already be partitioned by storage/shuffling.
+    bool _need_to_local_shuffle = true;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp 
b/be/src/pipeline/pipeline_x/dependency.cpp
index 2fcf0906c1c..dcb149d0784 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -22,7 +22,7 @@
 
 #include "common/logging.h"
 #include "pipeline/pipeline_fragment_context.h"
-#include "pipeline/pipeline_task.h"
+#include "pipeline/pipeline_x/local_exchange/local_exchanger.h"
 #include "pipeline/pipeline_x/pipeline_x_task.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker.h"
@@ -180,4 +180,11 @@ void RuntimeFilterDependency::sub_filters() {
     }
 }
 
+void LocalExchangeSharedState::sub_running_sink_operators() {
+    std::unique_lock<std::mutex> lc(le_lock);
+    if (exchanger->running_sink_operators.fetch_sub(1) == 1) {
+        _set_ready_for_read();
+    }
+}
+
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 16e98f11b28..9fbb25aaa2f 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -31,6 +31,7 @@
 #include "gutil/integral_types.h"
 #include "pipeline/exec/data_queue.h"
 #include "pipeline/exec/multi_cast_data_streamer.h"
+#include "pipeline/exec/operator.h"
 #include "vec/common/hash_table/hash_map_context_creator.h"
 #include "vec/common/sort/partition_sorter.h"
 #include "vec/common/sort/sorter.h"
@@ -579,21 +580,15 @@ public:
     }
 };
 
-using PartitionedBlock = std::pair<std::shared_ptr<vectorized::Block>,
-                                   
std::tuple<std::shared_ptr<std::vector<int>>, size_t, size_t>>;
+class Exchanger;
+
 struct LocalExchangeSharedState : public BasicSharedState {
 public:
     ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
-    std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> data_queue;
+    std::unique_ptr<Exchanger> exchanger {};
     std::vector<Dependency*> source_dependencies;
-    std::atomic<int> running_sink_operators = 0;
     std::mutex le_lock;
-    void sub_running_sink_operators() {
-        std::unique_lock<std::mutex> lc(le_lock);
-        if (running_sink_operators.fetch_sub(1) == 1) {
-            _set_ready_for_read();
-        }
-    }
+    void sub_running_sink_operators();
     void _set_ready_for_read() {
         for (auto* dep : source_dependencies) {
             DCHECK(dep);
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
index 12cc5e042e8..3d1540cdc46 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
@@ -17,6 +17,8 @@
 
 #include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h"
 
+#include "pipeline/pipeline_x/local_exchange/local_exchanger.h"
+
 namespace doris::pipeline {
 
 Status LocalExchangeSinkLocalState::init(RuntimeState* state, 
LocalSinkStateInfo& info) {
@@ -26,47 +28,12 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
     _compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime");
     _distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");
 
-    auto& p = _parent->cast<LocalExchangeSinkOperatorX>();
-    _num_rows_in_queue.resize(p._num_partitions);
-    for (size_t i = 0; i < p._num_partitions; i++) {
-        _num_rows_in_queue[i] = ADD_COUNTER_WITH_LEVEL(
-                profile(), "NumRowsInQueue" + std::to_string(i), TUnit::UNIT, 
1);
-    }
-    RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner));
-    return Status::OK();
-}
-
-Status LocalExchangeSinkLocalState::split_rows(RuntimeState* state,
-                                               const uint32_t* __restrict 
channel_ids,
-                                               vectorized::Block* block, 
SourceState source_state) {
-    auto& data_queue = _shared_state->data_queue;
-    const auto num_partitions = data_queue.size();
-    const auto rows = block->rows();
-    auto row_idx = std::make_shared<std::vector<int>>(rows);
-    {
-        _partition_rows_histogram.assign(num_partitions + 1, 0);
-        for (size_t i = 0; i < rows; ++i) {
-            _partition_rows_histogram[channel_ids[i]]++;
-        }
-        for (int32_t i = 1; i <= num_partitions; ++i) {
-            _partition_rows_histogram[i] += _partition_rows_histogram[i - 1];
-        }
+    _exchanger = _shared_state->exchanger.get();
+    DCHECK(_exchanger != nullptr);
 
-        for (int32_t i = rows - 1; i >= 0; --i) {
-            (*row_idx)[_partition_rows_histogram[channel_ids[i]] - 1] = i;
-            _partition_rows_histogram[channel_ids[i]]--;
-        }
-    }
-    auto new_block = vectorized::Block::create_shared(block->clone_empty());
-    new_block->swap(*block);
-    for (size_t i = 0; i < num_partitions; i++) {
-        size_t start = _partition_rows_histogram[i];
-        size_t size = _partition_rows_histogram[i + 1] - start;
-        if (size > 0) {
-            data_queue[i].enqueue({new_block, {row_idx, start, size}});
-            _shared_state->set_ready_for_read(i);
-            COUNTER_UPDATE(_num_rows_in_queue[i], size);
-        }
+    if (_exchanger->get_type() == ExchangeType::SHUFFLE) {
+        auto& p = _parent->cast<LocalExchangeSinkOperatorX>();
+        RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner));
     }
 
     return Status::OK();
@@ -77,17 +44,7 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block*
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
     COUNTER_UPDATE(local_state.rows_input_counter(), 
(int64_t)in_block->rows());
-    {
-        SCOPED_TIMER(local_state._compute_hash_value_timer);
-        RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, 
in_block,
-                                                                  
local_state.mem_tracker()));
-    }
-    {
-        SCOPED_TIMER(local_state._distribute_timer);
-        RETURN_IF_ERROR(local_state.split_rows(
-                state, (const 
uint32_t*)local_state._partitioner->get_channel_ids(), in_block,
-                source_state));
-    }
+    RETURN_IF_ERROR(local_state._exchanger->sink(state, in_block, 
source_state, local_state));
 
     if (source_state == SourceState::FINISHED) {
         local_state._shared_state->sub_running_sink_operators();
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index 45d61d4ff6b..c5fe9dc7dc4 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -30,6 +30,9 @@ public:
     ~LocalExchangeSinkDependency() override = default;
 };
 
+class Exchanger;
+class ShuffleExchanger;
+class PassthroughExchanger;
 class LocalExchangeSinkOperatorX;
 class LocalExchangeSinkLocalState final
         : public PipelineXSinkLocalState<LocalExchangeSinkDependency> {
@@ -43,17 +46,21 @@ public:
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
 
-    Status split_rows(RuntimeState* state, const uint32_t* __restrict 
channel_ids,
-                      vectorized::Block* block, SourceState source_state);
-
 private:
     friend class LocalExchangeSinkOperatorX;
+    friend class ShuffleExchanger;
+    friend class PassthroughExchanger;
+
+    Exchanger* _exchanger = nullptr;
 
+    // Used by shuffle exchanger
     RuntimeProfile::Counter* _compute_hash_value_timer = nullptr;
     RuntimeProfile::Counter* _distribute_timer = nullptr;
-    std::vector<RuntimeProfile::Counter*> _num_rows_in_queue {};
     std::unique_ptr<vectorized::PartitionerBase> _partitioner = nullptr;
-    std::vector<size_t> _partition_rows_histogram {};
+    std::vector<size_t> _partition_rows_histogram;
+
+    // Used by random passthrough exchanger
+    int _channel_id = 0;
 };
 
 // A single 32-bit division on a recent x64 processor has a throughput of one 
instruction every six cycles with a latency of 26 cycles.
@@ -82,21 +89,31 @@ public:
         return Status::InternalError("{} should not init with TPlanNode", 
Base::_name);
     }
 
-    Status init() override {
+    Status init(bool need_partitioner) override {
         _name = "LOCAL_EXCHANGE_SINK_OPERATOR";
-        _partitioner.reset(
-                new 
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_num_partitions));
-        RETURN_IF_ERROR(_partitioner->init(_texprs));
+        _need_partitioner = need_partitioner;
+        if (_need_partitioner) {
+            _partitioner.reset(
+                    new 
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_num_partitions));
+            RETURN_IF_ERROR(_partitioner->init(_texprs));
+        }
+
         return Status::OK();
     }
 
     Status prepare(RuntimeState* state) override {
-        RETURN_IF_ERROR(_partitioner->prepare(state, _child_x->row_desc()));
+        if (_need_partitioner) {
+            RETURN_IF_ERROR(_partitioner->prepare(state, 
_child_x->row_desc()));
+        }
+
         return Status::OK();
     }
 
     Status open(RuntimeState* state) override {
-        RETURN_IF_ERROR(_partitioner->open(state));
+        if (_need_partitioner) {
+            RETURN_IF_ERROR(_partitioner->open(state));
+        }
+
         return Status::OK();
     }
 
@@ -105,6 +122,7 @@ public:
 
 private:
     friend class LocalExchangeSinkLocalState;
+    bool _need_partitioner;
     const int _num_partitions;
     const std::vector<TExpr>& _texprs;
     std::unique_ptr<vectorized::PartitionerBase> _partitioner;
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index 83dac5eb8f4..dd64852891f 100644
--- 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++ 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -17,8 +17,21 @@
 
 #include "pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h"
 
+#include "pipeline/pipeline_x/local_exchange/local_exchanger.h"
+
 namespace doris::pipeline {
 
+void LocalExchangeSourceDependency::block() {
+    if 
(((LocalExchangeSharedState*)_shared_state.get())->exchanger->running_sink_operators
 == 0) {
+        return;
+    }
+    std::unique_lock<std::mutex> 
lc(((LocalExchangeSharedState*)_shared_state.get())->le_lock);
+    if 
(((LocalExchangeSharedState*)_shared_state.get())->exchanger->running_sink_operators
 == 0) {
+        return;
+    }
+    Dependency::block();
+}
+
 Status LocalExchangeSourceLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
@@ -28,9 +41,14 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* 
state, LocalStateInfo&
     DCHECK(_shared_state != nullptr);
     _channel_id = info.task_idx;
     _shared_state->set_dep_by_channel_id(_dependency, _channel_id);
+    _exchanger = _shared_state->exchanger.get();
+    DCHECK(_exchanger != nullptr);
     _get_block_failed_counter =
             ADD_COUNTER_WITH_LEVEL(profile(), "GetBlockFailedTime", 
TUnit::UNIT, 1);
-    _copy_data_timer = ADD_TIMER(profile(), "CopyDataTime");
+    if (_exchanger->get_type() == ExchangeType::SHUFFLE) {
+        _copy_data_timer = ADD_TIMER(profile(), "CopyDataTime");
+    }
+
     return Status::OK();
 }
 
@@ -38,42 +56,7 @@ Status LocalExchangeSourceOperatorX::get_block(RuntimeState* 
state, vectorized::
                                                SourceState& source_state) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.exec_time_counter());
-    PartitionedBlock partitioned_block;
-    std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr;
-
-    auto get_data = [&](vectorized::Block* result_block) {
-        do {
-            const auto* offset_start = &((
-                    
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
-            mutable_block->add_rows(partitioned_block.first.get(), 
offset_start,
-                                    offset_start + 
std::get<2>(partitioned_block.second));
-        } while (mutable_block->rows() < state->batch_size() &&
-                 
local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
-                         partitioned_block));
-        *result_block = mutable_block->to_block();
-    };
-    if (local_state._shared_state->running_sink_operators == 0) {
-        if 
(local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
-                    partitioned_block)) {
-            SCOPED_TIMER(local_state._copy_data_timer);
-            mutable_block =
-                    
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
-            get_data(block);
-        } else {
-            COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
-            source_state = SourceState::FINISHED;
-        }
-    } else if 
(local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
-                       partitioned_block)) {
-        SCOPED_TIMER(local_state._copy_data_timer);
-        mutable_block =
-                
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
-        get_data(block);
-    } else {
-        local_state._dependency->block();
-        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
-    }
-
+    RETURN_IF_ERROR(local_state._exchanger->get_block(state, block, 
source_state, local_state));
     local_state.reached_limit(block, source_state);
     return Status::OK();
 }
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
index 3ccc38854f5..d94b9041fce 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
@@ -29,18 +29,12 @@ public:
             : Dependency(id, node_id, "LocalExchangeSourceDependency", 
query_ctx) {}
     ~LocalExchangeSourceDependency() override = default;
 
-    void block() override {
-        if 
(((LocalExchangeSharedState*)_shared_state.get())->running_sink_operators == 0) 
{
-            return;
-        }
-        std::unique_lock<std::mutex> 
lc(((LocalExchangeSharedState*)_shared_state.get())->le_lock);
-        if 
(((LocalExchangeSharedState*)_shared_state.get())->running_sink_operators == 0) 
{
-            return;
-        }
-        Dependency::block();
-    }
+    void block() override;
 };
 
+class Exchanger;
+class ShuffleExchanger;
+class PassthroughExchanger;
 class LocalExchangeSourceOperatorX;
 class LocalExchangeSourceLocalState final
         : public PipelineXLocalState<LocalExchangeSourceDependency> {
@@ -54,7 +48,10 @@ public:
 
 private:
     friend class LocalExchangeSourceOperatorX;
+    friend class ShuffleExchanger;
+    friend class PassthroughExchanger;
 
+    Exchanger* _exchanger = nullptr;
     int _channel_id;
     RuntimeProfile::Counter* _get_block_failed_counter = nullptr;
     RuntimeProfile::Counter* _copy_data_timer = nullptr;
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
new file mode 100644
index 00000000000..616b469a99f
--- /dev/null
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/pipeline_x/local_exchange/local_exchanger.h"
+
+#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h"
+#include "pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h"
+
+namespace doris::pipeline {
+
+Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block,
+                              SourceState source_state, 
LocalExchangeSinkLocalState& local_state) {
+    {
+        SCOPED_TIMER(local_state._compute_hash_value_timer);
+        RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state, 
in_block,
+                                                                  
local_state.mem_tracker()));
+    }
+    {
+        SCOPED_TIMER(local_state._distribute_timer);
+        RETURN_IF_ERROR(_split_rows(state,
+                                    (const 
uint32_t*)local_state._partitioner->get_channel_ids(),
+                                    in_block, source_state, local_state));
+    }
+
+    return Status::OK();
+}
+
+Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block* 
block,
+                                   SourceState& source_state,
+                                   LocalExchangeSourceLocalState& local_state) 
{
+    PartitionedBlock partitioned_block;
+    std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr;
+
+    auto get_data = [&](vectorized::Block* result_block) {
+        do {
+            const auto* offset_start = &((
+                    
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
+            mutable_block->add_rows(partitioned_block.first.get(), 
offset_start,
+                                    offset_start + 
std::get<2>(partitioned_block.second));
+        } while (mutable_block->rows() < state->batch_size() &&
+                 
_data_queue[local_state._channel_id].try_dequeue(partitioned_block));
+        *result_block = mutable_block->to_block();
+    };
+    if (running_sink_operators == 0) {
+        if 
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
+            SCOPED_TIMER(local_state._copy_data_timer);
+            mutable_block =
+                    
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
+            get_data(block);
+        } else {
+            COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
+            source_state = SourceState::FINISHED;
+        }
+    } else if 
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
+        SCOPED_TIMER(local_state._copy_data_timer);
+        mutable_block =
+                
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
+        get_data(block);
+    } else {
+        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
+        local_state._dependency->block();
+    }
+    return Status::OK();
+}
+
+Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t* 
__restrict channel_ids,
+                                     vectorized::Block* block, SourceState 
source_state,
+                                     LocalExchangeSinkLocalState& local_state) 
{
+    auto& data_queue = _data_queue;
+    const auto rows = block->rows();
+    auto row_idx = std::make_shared<std::vector<int>>(rows);
+    {
+        local_state._partition_rows_histogram.assign(_num_instances + 1, 0);
+        for (size_t i = 0; i < rows; ++i) {
+            local_state._partition_rows_histogram[channel_ids[i]]++;
+        }
+        for (int32_t i = 1; i <= _num_instances; ++i) {
+            local_state._partition_rows_histogram[i] +=
+                    local_state._partition_rows_histogram[i - 1];
+        }
+
+        for (int32_t i = rows - 1; i >= 0; --i) {
+            (*row_idx)[local_state._partition_rows_histogram[channel_ids[i]] - 
1] = i;
+            local_state._partition_rows_histogram[channel_ids[i]]--;
+        }
+    }
+    auto new_block = vectorized::Block::create_shared(block->clone_empty());
+    new_block->swap(*block);
+    for (size_t i = 0; i < _num_instances; i++) {
+        size_t start = local_state._partition_rows_histogram[i];
+        size_t size = local_state._partition_rows_histogram[i + 1] - start;
+        if (size > 0) {
+            data_queue[i].enqueue({new_block, {row_idx, start, size}});
+            local_state._shared_state->set_ready_for_read(i);
+        }
+    }
+
+    return Status::OK();
+}
+
+Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block,
+                                  SourceState source_state,
+                                  LocalExchangeSinkLocalState& local_state) {
+    auto new_block = vectorized::Block::create_unique(in_block->clone_empty());
+    new_block->swap(*in_block);
+    auto channel_id = (local_state._channel_id++) % _num_instances;
+    _data_queue[channel_id].enqueue(std::move(new_block));
+    local_state._shared_state->set_ready_for_read(channel_id);
+
+    return Status::OK();
+}
+
+Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* 
block,
+                                       SourceState& source_state,
+                                       LocalExchangeSourceLocalState& 
local_state) {
+    std::unique_ptr<vectorized::Block> next_block;
+    if (running_sink_operators == 0) {
+        if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+            *block = *next_block.release();
+        } else {
+            COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
+            source_state = SourceState::FINISHED;
+        }
+    } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+        *block = *next_block.release();
+    } else {
+        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
+        local_state._dependency->block();
+    }
+    return Status::OK();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
new file mode 100644
index 00000000000..13e3fe931e7
--- /dev/null
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "pipeline/pipeline_x/dependency.h"
+#include "pipeline/pipeline_x/operator.h"
+
+namespace doris::pipeline {
+
+enum class ExchangeType : uint8_t {
+    SHUFFLE = 0,
+    PASSTHROUGH = 1,
+};
+
+class LocalExchangeSourceLocalState;
+class LocalExchangeSinkLocalState;
+
+class Exchanger {
+public:
+    Exchanger(int num_instances)
+            : running_sink_operators(num_instances), 
_num_instances(num_instances) {}
+    virtual ~Exchanger() = default;
+    virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+                             SourceState& source_state,
+                             LocalExchangeSourceLocalState& local_state) = 0;
+    virtual Status sink(RuntimeState* state, vectorized::Block* in_block, 
SourceState source_state,
+                        LocalExchangeSinkLocalState& local_state) = 0;
+    virtual ExchangeType get_type() const = 0;
+
+    std::atomic<int> running_sink_operators = 0;
+
+protected:
+    const int _num_instances;
+};
+
+class LocalExchangeSourceLocalState;
+class LocalExchangeSinkLocalState;
+
+class ShuffleExchanger final : public Exchanger {
+    using PartitionedBlock =
+            std::pair<std::shared_ptr<vectorized::Block>,
+                      std::tuple<std::shared_ptr<std::vector<int>>, size_t, 
size_t>>;
+
+public:
+    ENABLE_FACTORY_CREATOR(ShuffleExchanger);
+    ShuffleExchanger(int num_instances) : Exchanger(num_instances) {
+        _data_queue.resize(num_instances);
+    }
+    ~ShuffleExchanger() override = default;
+    Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState 
source_state,
+                LocalExchangeSinkLocalState& local_state) override;
+
+    Status get_block(RuntimeState* state, vectorized::Block* block, 
SourceState& source_state,
+                     LocalExchangeSourceLocalState& local_state) override;
+    ExchangeType get_type() const override { return ExchangeType::SHUFFLE; }
+
+private:
+    Status _split_rows(RuntimeState* state, const uint32_t* __restrict 
channel_ids,
+                       vectorized::Block* block, SourceState source_state,
+                       LocalExchangeSinkLocalState& local_state);
+
+    std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> _data_queue;
+};
+
+class PassthroughExchanger final : public Exchanger {
+public:
+    ENABLE_FACTORY_CREATOR(PassthroughExchanger);
+    PassthroughExchanger(int num_instances) : Exchanger(num_instances) {
+        _data_queue.resize(num_instances);
+    }
+    ~PassthroughExchanger() override = default;
+    Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState 
source_state,
+                LocalExchangeSinkLocalState& local_state) override;
+
+    Status get_block(RuntimeState* state, vectorized::Block* block, 
SourceState& source_state,
+                     LocalExchangeSourceLocalState& local_state) override;
+    ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; 
}
+
+private:
+    
std::vector<moodycamel::ConcurrentQueue<std::unique_ptr<vectorized::Block>>> 
_data_queue;
+};
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 5fa6785435b..294eb962ee1 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -179,6 +179,8 @@ public:
 
     [[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) { 
return false; }
 
+    [[nodiscard]] virtual bool need_to_local_shuffle() const { return true; }
+
     bool can_read() override {
         LOG(FATAL) << "should not reach here!";
         return false;
@@ -423,7 +425,7 @@ public:
     virtual Status init(const TPlanNode& tnode, RuntimeState* state);
 
     Status init(const TDataSink& tsink) override;
-    virtual Status init() {
+    virtual Status init(bool need_partitioner) {
         return Status::InternalError("init() is only implemented in local 
exchange!");
     }
 
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index d49e290c044..636d4e235d7 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -231,6 +231,8 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
     RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
     static_cast<void>(root_pipeline->set_sink(_sink));
 
+    //    RETURN_IF_ERROR(_plan_local_shuffle());
+
     // 4. Initialize global states in pipelines.
     for (PipelinePtr& pipeline : _pipelines) {
         DCHECK(pipeline->sink_x() != nullptr) << 
pipeline->operator_xs().size();
@@ -247,6 +249,17 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
     return Status::OK();
 }
 
+Status PipelineXFragmentContext::_plan_local_shuffle() {
+    for (int pip_idx = _pipelines.size() - 1; pip_idx >= 0; pip_idx--) {
+        auto& children = _pipelines[pip_idx]->children();
+        if (children.empty()) {
+            _pipelines[pip_idx]->init_need_to_local_shuffle_by_source();
+        } else {
+        }
+    }
+    return Status::OK();
+}
+
 Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink,
                                                    const std::vector<TExpr>& 
output_exprs,
                                                    const 
TPipelineFragmentParams& params,
@@ -595,7 +608,8 @@ Status 
PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool,
 
 Status PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, 
OperatorXPtr& op,
                                                      PipelinePtr& cur_pipe, 
const TPlanNode& tnode,
-                                                     const std::vector<TExpr>& 
texprs) {
+                                                     const std::vector<TExpr>& 
texprs,
+                                                     ExchangeType 
exchange_type) {
     if (!_runtime_state->enable_local_shuffle() || _num_instances <= 1) {
         return Status::OK();
     }
@@ -617,12 +631,23 @@ Status 
PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, OperatorX
     sink.reset(new LocalExchangeSinkOperatorX(next_sink_operator_id(), 
local_exchange_id,
                                               _num_instances, texprs));
     RETURN_IF_ERROR(cur_pipe->set_sink(sink));
-    RETURN_IF_ERROR(cur_pipe->sink_x()->init());
 
+    bool need_partitioner = false;
     auto shared_state = LocalExchangeSharedState::create_shared();
-    shared_state->data_queue.resize(_num_instances);
     shared_state->source_dependencies.resize(_num_instances, nullptr);
-    shared_state->running_sink_operators = _num_instances;
+    switch (exchange_type) {
+    case ExchangeType::SHUFFLE:
+        shared_state->exchanger = 
ShuffleExchanger::create_unique(_num_instances);
+        need_partitioner = true;
+        break;
+    case ExchangeType::PASSTHROUGH:
+        shared_state->exchanger = 
PassthroughExchanger::create_unique(_num_instances);
+        break;
+    default:
+        return Status::InternalError("Unsupported local exchange type : " +
+                                     std::to_string((int)exchange_type));
+    }
+    RETURN_IF_ERROR(cur_pipe->sink_x()->init(need_partitioner));
     _op_id_to_le_state.insert({local_exchange_id, shared_state});
     return Status::OK();
 }
@@ -687,6 +712,10 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             sink->set_dests_id({op->operator_id()});
             RETURN_IF_ERROR(cur_pipe->set_sink(sink));
             RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
+
+            //            RETURN_IF_ERROR(_add_local_exchange(pool, op, 
cur_pipe, tnode,
+            //                                                
tnode.agg_node.grouping_exprs,
+            //                                                
ExchangeType::PASSTHROUGH));
         } else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
                    tnode.agg_node.use_streaming_preaggregation) {
             op.reset(new StreamingAggSourceOperatorX(pool, tnode, 
next_operator_id(), descs));
@@ -703,6 +732,10 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             sink->set_dests_id({op->operator_id()});
             RETURN_IF_ERROR(cur_pipe->set_sink(sink));
             RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
+
+            //            RETURN_IF_ERROR(_add_local_exchange(pool, op, 
cur_pipe, tnode,
+            //                                                
tnode.agg_node.grouping_exprs,
+            //                                                
ExchangeType::PASSTHROUGH));
         } else {
             op.reset(new AggSourceOperatorX(pool, tnode, next_operator_id(), 
descs));
             RETURN_IF_ERROR(cur_pipe->add_operator(op));
@@ -720,10 +753,19 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             RETURN_IF_ERROR(cur_pipe->set_sink(sink));
             RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
 
-            if (!tnode.agg_node.need_finalize) {
-                RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode,
-                                                    
tnode.agg_node.grouping_exprs));
-            }
+            //            if (tnode.agg_node.grouping_exprs.empty()) {
+            //                if (tnode.agg_node.need_finalize) {
+            //                    RETURN_IF_ERROR(_add_local_exchange(pool, 
op, cur_pipe, tnode,
+            //                                                        
tnode.agg_node.grouping_exprs,
+            //                                                        
ExchangeType::PASSTHROUGH));
+            //                } else {
+            //                    // TODO(gabriel): maybe use local shuffle
+            //                }
+            //            } else if (cur_pipe->need_to_local_shuffle()) {
+            //                RETURN_IF_ERROR(_add_local_exchange(pool, op, 
cur_pipe, tnode,
+            //                                                    
tnode.agg_node.grouping_exprs,
+            //                                                    
ExchangeType::SHUFFLE));
+            //            }
         }
         break;
     }
@@ -750,7 +792,14 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         for (const auto& eq_join_conjunct : eq_join_conjuncts) {
             probe_exprs.push_back(eq_join_conjunct.left);
         }
-        RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, 
probe_exprs));
+        if (tnode.hash_join_node.__isset.is_broadcast_join &&
+            tnode.hash_join_node.is_broadcast_join) {
+            RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, 
probe_exprs,
+                                                ExchangeType::PASSTHROUGH));
+        } else if (cur_pipe->need_to_local_shuffle()) {
+            RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, 
probe_exprs,
+                                                ExchangeType::SHUFFLE));
+        }
         _pipeline_parent_map.push(op->node_id(), cur_pipe);
         _pipeline_parent_map.push(op->node_id(), build_side_pipe);
         break;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 7f47052296e..1390f2a9544 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -35,6 +35,7 @@
 #include "pipeline/pipeline.h"
 #include "pipeline/pipeline_fragment_context.h"
 #include "pipeline/pipeline_task.h"
+#include "pipeline/pipeline_x/local_exchange/local_exchanger.h"
 #include "pipeline/pipeline_x/pipeline_x_task.h"
 #include "runtime/query_context.h"
 #include "runtime/runtime_state.h"
@@ -125,7 +126,8 @@ private:
     void _close_fragment_instance() override;
     Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& 
request) override;
     Status _add_local_exchange(ObjectPool* pool, OperatorXPtr& op, 
PipelinePtr& cur_pipe,
-                               const TPlanNode& tnode, const 
std::vector<TExpr>& texprs);
+                               const TPlanNode& tnode, const 
std::vector<TExpr>& texprs,
+                               ExchangeType exchange_type);
 
     [[nodiscard]] Status _build_pipelines(ObjectPool* pool,
                                           const 
doris::TPipelineFragmentParams& request,
@@ -151,6 +153,7 @@ private:
                              const TPipelineFragmentParams& params, const 
RowDescriptor& row_desc,
                              RuntimeState* state, DescriptorTbl& desc_tbl,
                              PipelineId cur_pipeline_id);
+    Status _plan_local_shuffle();
 
     bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to