This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d9a3aa5c068 [pipelineX](improvement) improve local shuffle (#25964)
d9a3aa5c068 is described below

commit d9a3aa5c068628c9650d9aabe5abdb7dfdc3bdc2
Author: Gabriel <[email protected]>
AuthorDate: Thu Oct 26 21:42:43 2023 +0800

    [pipelineX](improvement) improve local shuffle (#25964)
---
 be/src/pipeline/pipeline_x/dependency.h            |  5 ++-
 .../local_exchange_sink_operator.cpp               | 47 ++++++++++++----------
 .../local_exchange/local_exchange_sink_operator.h  |  6 +--
 .../local_exchange_source_operator.cpp             | 21 +++++++++-
 .../local_exchange_source_operator.h               |  1 +
 be/src/pipeline/pipeline_x/operator.cpp            |  1 +
 be/src/pipeline/pipeline_x/operator.h              |  6 ++-
 .../pipeline_x/pipeline_x_fragment_context.cpp     |  1 -
 8 files changed, 59 insertions(+), 29 deletions(-)

diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 028b78957b4..131198c4496 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -840,11 +840,12 @@ private:
     bool is_set_probe {false};
 };
 
+using PartitionedBlock = std::pair<std::shared_ptr<vectorized::Block>,
+                                   
std::tuple<std::shared_ptr<std::vector<int>>, size_t, size_t>>;
 struct LocalExchangeSharedState {
 public:
     ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
-    std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> data_queue;
-    int num_partitions = 0;
+    std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> data_queue;
     std::atomic<int> running_sink_operators = 0;
 };
 
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
index ec959b20cea..b69150ba512 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
@@ -27,33 +27,38 @@ Status LocalExchangeSinkLocalState::init(RuntimeState* 
state, LocalSinkStateInfo
     _distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");
     auto& p = _parent->cast<LocalExchangeSinkOperatorX>();
     RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner));
-    _mutable_block.resize(p._num_partitions);
     _shared_state->running_sink_operators++;
     return Status::OK();
 }
 
-Status LocalExchangeSinkLocalState::channel_add_rows(RuntimeState* state,
-                                                     const uint32_t* 
__restrict channel_ids,
-                                                     vectorized::Block* block,
-                                                     SourceState source_state) 
{
+Status LocalExchangeSinkLocalState::split_rows(RuntimeState* state,
+                                               const uint32_t* __restrict 
channel_ids,
+                                               vectorized::Block* block, 
SourceState source_state) {
     auto& data_queue = _shared_state->data_queue;
-    std::vector<int> channel2rows[data_queue.size()];
-
-    auto rows = block->rows();
-    for (int i = 0; i < rows; i++) {
-        channel2rows[channel_ids[i]].emplace_back(i);
-    }
-    for (size_t i = 0; i < data_queue.size(); i++) {
-        if (_mutable_block[i] == nullptr) {
-            _mutable_block[i] = 
vectorized::MutableBlock::create_unique(block->clone_empty());
+    const auto num_partitions = data_queue.size();
+    const auto rows = block->rows();
+    auto row_idx = std::make_shared<std::vector<int>>(rows);
+    {
+        _partition_rows_histogram.assign(num_partitions + 1, 0);
+        for (size_t i = 0; i < rows; ++i) {
+            _partition_rows_histogram[channel_ids[i]]++;
+        }
+        for (int32_t i = 1; i <= num_partitions; ++i) {
+            _partition_rows_histogram[i] += _partition_rows_histogram[i - 1];
         }
 
-        const int* begin = channel2rows[i].data();
-        _mutable_block[i]->add_rows(block, begin, begin + 
channel2rows[i].size());
-        if (_mutable_block[i]->rows() > state->batch_size() ||
-            source_state == SourceState::FINISHED) {
-            data_queue[i].enqueue(_mutable_block[i]->to_block());
-            _mutable_block[i].reset(nullptr);
+        for (int32_t i = rows - 1; i >= 0; --i) {
+            (*row_idx)[_partition_rows_histogram[channel_ids[i]] - 1] = i;
+            _partition_rows_histogram[channel_ids[i]]--;
+        }
+    }
+    auto new_block = vectorized::Block::create_shared(block->clone_empty());
+    new_block->swap(*block);
+    for (size_t i = 0; i < num_partitions; i++) {
+        size_t start = _partition_rows_histogram[i];
+        size_t size = _partition_rows_histogram[i + 1] - start;
+        if (size > 0) {
+            data_queue[i].enqueue({new_block, {row_idx, start, size}});
         }
     }
 
@@ -72,7 +77,7 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block*
     }
     {
         SCOPED_TIMER(local_state._distribute_timer);
-        RETURN_IF_ERROR(local_state.channel_add_rows(
+        RETURN_IF_ERROR(local_state.split_rows(
                 state, (const 
uint32_t*)local_state._partitioner->get_channel_ids(), in_block,
                 source_state));
     }
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index 4025def28e7..56d3d774609 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -34,8 +34,8 @@ public:
 
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
 
-    Status channel_add_rows(RuntimeState* state, const uint32_t* __restrict 
channel_ids,
-                            vectorized::Block* block, SourceState 
source_state);
+    Status split_rows(RuntimeState* state, const uint32_t* __restrict 
channel_ids,
+                      vectorized::Block* block, SourceState source_state);
 
 private:
     friend class LocalExchangeSinkOperatorX;
@@ -43,7 +43,7 @@ private:
     RuntimeProfile::Counter* _compute_hash_value_timer = nullptr;
     RuntimeProfile::Counter* _distribute_timer = nullptr;
     std::unique_ptr<vectorized::PartitionerBase> _partitioner;
-    std::vector<std::unique_ptr<vectorized::MutableBlock>> _mutable_block;
+    std::vector<size_t> _partition_rows_histogram;
 };
 
 // A single 32-bit division on a recent x64 processor has a throughput of one 
instruction every six cycles with a latency of 26 cycles.
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index 83c712a8022..75d9b4ac471 100644
--- 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++ 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -30,6 +30,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* 
state, LocalStateInfo&
     _dependency->set_channel_id(_channel_id);
     _get_block_failed_counter =
             ADD_COUNTER_WITH_LEVEL(profile(), "GetBlockFailedTime", 
TUnit::UNIT, 1);
+    _copy_data_timer = ADD_TIMER(profile(), "CopyDataTime");
     return Status::OK();
 }
 
@@ -37,7 +38,25 @@ Status LocalExchangeSourceOperatorX::get_block(RuntimeState* 
state, vectorized::
                                                SourceState& source_state) {
     auto& local_state = get_local_state(state);
     SCOPED_TIMER(local_state.profile()->total_time_counter());
-    if 
(!local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(*block))
 {
+    PartitionedBlock partitioned_block;
+    std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr;
+
+    if 
(local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
+                partitioned_block)) {
+        SCOPED_TIMER(local_state._copy_data_timer);
+        mutable_block =
+                
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
+
+        do {
+            const auto* offset_start = &((
+                    
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
+            mutable_block->add_rows(partitioned_block.first.get(), 
offset_start,
+                                    offset_start + 
std::get<2>(partitioned_block.second));
+        } while 
(local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
+                         partitioned_block) &&
+                 mutable_block->rows() < state->batch_size());
+        *block = mutable_block->to_block();
+    } else {
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
         if (local_state._shared_state->running_sink_operators == 0) {
             source_state = SourceState::FINISHED;
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
index ddd47ec18cb..b992f454696 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
@@ -37,6 +37,7 @@ private:
 
     int _channel_id;
     RuntimeProfile::Counter* _get_block_failed_counter = nullptr;
+    RuntimeProfile::Counter* _copy_data_timer = nullptr;
 };
 
 class LocalExchangeSourceOperatorX final : public 
OperatorX<LocalExchangeSourceLocalState> {
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 9377f5f2701..c29e1b8af87 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -175,6 +175,7 @@ void PipelineXLocalStateBase::clear_origin_block() {
 Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* 
origin_block,
                                      vectorized::Block* output_block) const {
     auto local_state = state->get_local_state(operator_id());
+    SCOPED_TIMER(local_state->profile()->total_time_counter());
     SCOPED_TIMER(local_state->_projection_timer);
     using namespace vectorized;
     vectorized::MutableBlock mutable_block =
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index bfad246d3e6..79762e6ee19 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -155,7 +155,11 @@ public:
     }
 
     OperatorXBase(ObjectPool* pool, int node_id, int operator_id)
-            : OperatorBase(nullptr), _operator_id(operator_id), 
_node_id(node_id), _pool(pool) {};
+            : OperatorBase(nullptr),
+              _operator_id(operator_id),
+              _node_id(node_id),
+              _pool(pool),
+              _limit(-1) {}
     virtual Status init(const TPlanNode& tnode, RuntimeState* state);
     Status init(const TDataSink& tsink) override {
         LOG(FATAL) << "should not reach here!";
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 78a888af9df..5ca5829e1a8 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -577,7 +577,6 @@ Status 
PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, OperatorX
 
     auto shared_state = LocalExchangeSharedState::create_shared();
     
shared_state->data_queue.resize(_runtime_state->query_parallel_instance_num());
-    shared_state->num_partitions = 
_runtime_state->query_parallel_instance_num();
     _op_id_to_le_state.insert({local_exchange_id, shared_state});
     return Status::OK();
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to