This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 07bd65abe08 [pipelineX](local shuffle) remove unused code in local 
shuffle to improve performance #29292
07bd65abe08 is described below

commit 07bd65abe085a2f644f1c3533e03a851b36d2505
Author: Mryange <[email protected]>
AuthorDate: Fri Dec 29 20:30:28 2023 +0800

    [pipelineX](local shuffle) remove unused code in local shuffle to improve 
performance #29292
---
 .../pipeline_x/local_exchange/local_exchanger.cpp  | 141 +++++----------------
 .../pipeline_x/local_exchange/local_exchanger.h    |  15 +--
 2 files changed, 31 insertions(+), 125 deletions(-)

diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index fbe5098f639..b2f1bb00ebe 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -283,38 +283,12 @@ Status 
AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
     new_block.swap(*in_block);
     auto channel_id = (local_state._channel_id++) % _num_partitions;
     local_state._shared_state->add_mem_usage(channel_id, 
new_block.allocated_bytes());
-    _passthrough_data_queue[channel_id].enqueue(std::move(new_block));
+    _data_queue[channel_id].enqueue(std::move(new_block));
     local_state._shared_state->set_ready_to_read(channel_id);
 
     return Status::OK();
 }
 
-bool AdaptivePassthroughExchanger::_passthrough_get_block(
-        RuntimeState* state, vectorized::Block* block, SourceState& 
source_state,
-        LocalExchangeSourceLocalState& local_state) {
-    vectorized::Block next_block;
-    if (_running_sink_operators == 0) {
-        if 
(_passthrough_data_queue[local_state._channel_id].try_dequeue(next_block)) {
-            block->swap(next_block);
-            _free_blocks.enqueue(std::move(next_block));
-            local_state._shared_state->sub_mem_usage(local_state._channel_id,
-                                                     block->allocated_bytes());
-        } else {
-            COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
-            source_state = SourceState::FINISHED;
-            return false;
-        }
-    } else if 
(_passthrough_data_queue[local_state._channel_id].try_dequeue(next_block)) {
-        block->swap(next_block);
-        _free_blocks.enqueue(std::move(next_block));
-        local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
-    } else {
-        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
-        return false;
-    }
-    return true;
-}
-
 Status AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state, 
vectorized::Block* block,
                                                    SourceState source_state,
                                                    
LocalExchangeSinkLocalState& local_state) {
@@ -339,7 +313,7 @@ Status 
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
                                                  const uint32_t* __restrict 
channel_ids,
                                                  vectorized::Block* block, 
SourceState source_state,
                                                  LocalExchangeSinkLocalState& 
local_state) {
-    auto& data_queue = _shuffle_data_queue;
+    auto& data_queue = _data_queue;
     const auto rows = block->rows();
     auto row_idx = std::make_shared<std::vector<uint32_t>>(rows);
     {
@@ -357,79 +331,21 @@ Status 
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
             local_state._partition_rows_histogram[channel_ids[i]]--;
         }
     }
-
-    vectorized::Block data_block;
-    std::shared_ptr<ShuffleBlockWrapper> new_block_wrapper;
-    if (_free_blocks.try_enqueue(data_block)) {
-        new_block_wrapper = 
ShuffleBlockWrapper::create_shared(std::move(data_block));
-    } else {
-        new_block_wrapper = 
ShuffleBlockWrapper::create_shared(block->clone_empty());
-    }
-
-    new_block_wrapper->data_block.swap(*block);
-    if (new_block_wrapper->data_block.empty()) {
-        return Status::OK();
-    }
-    
local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes());
-    new_block_wrapper->ref(_num_partitions);
-
     for (size_t i = 0; i < _num_partitions; i++) {
-        size_t start = local_state._partition_rows_histogram[i];
-        size_t size = local_state._partition_rows_histogram[i + 1] - start;
+        const size_t start = local_state._partition_rows_histogram[i];
+        const size_t size = local_state._partition_rows_histogram[i + 1] - 
start;
         if (size > 0) {
-            local_state._shared_state->add_mem_usage(
-                    i, new_block_wrapper->data_block.allocated_bytes(), false);
-            data_queue[i].enqueue({new_block_wrapper, {row_idx, start, size}});
-            local_state._shared_state->set_ready_to_read(i);
-        } else {
-            new_block_wrapper->unref(local_state._shared_state);
+            std::unique_ptr<vectorized::MutableBlock> mutable_block =
+                    
vectorized::MutableBlock::create_unique(block->clone_empty());
+            mutable_block->add_rows(block, start, size);
+            auto new_block = mutable_block->to_block();
+            local_state._shared_state->add_mem_usage(i, 
new_block.allocated_bytes());
+            data_queue[i].enqueue(std::move(new_block));
         }
+        local_state._shared_state->set_ready_to_read(i);
     }
-
     return Status::OK();
 }
-bool AdaptivePassthroughExchanger::_shuffle_get_block(RuntimeState* state, 
vectorized::Block* block,
-                                                      SourceState& 
source_state,
-                                                      
LocalExchangeSourceLocalState& local_state) {
-    PartitionedBlock partitioned_block;
-    std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr;
-
-    auto get_data = [&](vectorized::Block* result_block) {
-        do {
-            const auto* offset_start = &((
-                    
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
-            auto block_wrapper = partitioned_block.first;
-            local_state._shared_state->sub_mem_usage(
-                    local_state._channel_id, 
block_wrapper->data_block.allocated_bytes(), false);
-            mutable_block->add_rows(&block_wrapper->data_block, offset_start,
-                                    offset_start + 
std::get<2>(partitioned_block.second));
-            block_wrapper->unref(local_state._shared_state);
-        } while (mutable_block->rows() < state->batch_size() &&
-                 
_shuffle_data_queue[local_state._channel_id].try_dequeue(partitioned_block));
-        *result_block = mutable_block->to_block();
-    };
-    if (_running_sink_operators == 0) {
-        if 
(_shuffle_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
-            SCOPED_TIMER(local_state._copy_data_timer);
-            mutable_block = vectorized::MutableBlock::create_unique(
-                    partitioned_block.first->data_block.clone_empty());
-            get_data(block);
-        } else {
-            COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
-            source_state = SourceState::FINISHED;
-            return false;
-        }
-    } else if 
(_shuffle_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
-        SCOPED_TIMER(local_state._copy_data_timer);
-        mutable_block = vectorized::MutableBlock::create_unique(
-                partitioned_block.first->data_block.clone_empty());
-        get_data(block);
-    } else {
-        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
-        return false;
-    }
-    return true;
-}
 
 Status AdaptivePassthroughExchanger::sink(RuntimeState* state, 
vectorized::Block* in_block,
                                           SourceState source_state,
@@ -447,23 +363,24 @@ Status AdaptivePassthroughExchanger::sink(RuntimeState* 
state, vectorized::Block
 Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, 
vectorized::Block* block,
                                                SourceState& source_state,
                                                LocalExchangeSourceLocalState& 
local_state) {
-    auto is_shuffle_success = false, is_passthrough_success = false;
-    SourceState shuffle_state = SourceState::MORE_DATA, passthrough_state = 
SourceState::MORE_DATA;
-
-    is_shuffle_success = _shuffle_get_block(state, block, shuffle_state, 
local_state);
-
-    if (is_shuffle_success) {
-        return Status::OK();
-    }
-
-    is_passthrough_success = _passthrough_get_block(state, block, 
passthrough_state, local_state);
-
-    if (is_passthrough_success) {
-        return Status::OK();
-    }
-
-    if (shuffle_state == SourceState::FINISHED && passthrough_state == 
SourceState::FINISHED) {
-        source_state = SourceState::FINISHED;
+    vectorized::Block next_block;
+    if (_running_sink_operators == 0) {
+        if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+            block->swap(next_block);
+            _free_blocks.enqueue(std::move(next_block));
+            local_state._shared_state->sub_mem_usage(local_state._channel_id,
+                                                     block->allocated_bytes());
+        } else {
+            COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
+            source_state = SourceState::FINISHED;
+        }
+    } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+        block->swap(next_block);
+        _free_blocks.enqueue(std::move(next_block));
+        local_state._shared_state->sub_mem_usage(local_state._channel_id, 
block->allocated_bytes());
+    } else {
+        COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
+        local_state._dependency->block();
     }
     return Status::OK();
 }
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
index ff64c88d5aa..28c99e751ea 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
@@ -183,14 +183,10 @@ private:
 // a copy of ShuffleExchanger and PassthroughExchanger.
 class AdaptivePassthroughExchanger : public Exchanger {
 public:
-    using PartitionedBlock =
-            std::pair<std::shared_ptr<ShuffleBlockWrapper>,
-                      std::tuple<std::shared_ptr<std::vector<uint32_t>>, 
size_t, size_t>>;
     ENABLE_FACTORY_CREATOR(AdaptivePassthroughExchanger);
     AdaptivePassthroughExchanger(int running_sink_operators, int 
num_partitions)
             : Exchanger(running_sink_operators, num_partitions) {
-        _passthrough_data_queue.resize(num_partitions);
-        _shuffle_data_queue.resize(num_partitions);
+        _data_queue.resize(num_partitions);
     }
     Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState 
source_state,
                 LocalExchangeSinkLocalState& local_state) override;
@@ -204,17 +200,10 @@ private:
                              SourceState source_state, 
LocalExchangeSinkLocalState& local_state);
     Status _shuffle_sink(RuntimeState* state, vectorized::Block* in_block, 
SourceState source_state,
                          LocalExchangeSinkLocalState& local_state);
-
-    bool _passthrough_get_block(RuntimeState* state, vectorized::Block* block,
-                                SourceState& source_state,
-                                LocalExchangeSourceLocalState& local_state);
-    bool _shuffle_get_block(RuntimeState* state, vectorized::Block* block,
-                            SourceState& source_state, 
LocalExchangeSourceLocalState& local_state);
     Status _split_rows(RuntimeState* state, const uint32_t* __restrict 
channel_ids,
                        vectorized::Block* block, SourceState source_state,
                        LocalExchangeSinkLocalState& local_state);
-    std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> 
_passthrough_data_queue;
-    std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> 
_shuffle_data_queue;
+    std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
 
     std::atomic_bool _is_pass_through = false;
     std::atomic_int32_t _total_block = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to