This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ba3bb0ffed9 [Improvement](exchange) optimize for channel_add_rows 
(#57997)
ba3bb0ffed9 is described below

commit ba3bb0ffed9d784ac6812b0d06f1b495d5273e3a
Author: Pxl <[email protected]>
AuthorDate: Thu Nov 20 15:48:29 2025 +0800

    [Improvement](exchange) optimize for channel_add_rows (#57997)
    
    ```sql
    with wscs as (
        select
            ws_sold_date_sk sold_date_sk,
            ws_ext_sales_price sales_price
        from
            web_sales
    )
    select
        d_week_seq
    from
        wscs
        right semi join date_dim on d_date_sk = sold_date_sk
    group by
        d_week_seq;
    ```
    - LocalSentRows: 720.000376M (720000376)
    before:
    - DistributeRowsIntoChannelsTime: 4sec560ms
    after:
    - DistributeRowsIntoChannelsTime: 3sec856ms
    
    Problem Summary:
    
    This pull request refactors the `Writer` class in the shuffle pipeline
    to improve performance and memory management by changing how row indices
    and partition histograms are handled. The main changes include removing
    unnecessary `const` qualifiers, reusing internal buffers to avoid
    repeated allocations, and simplifying the logic for partitioning rows.
    
    **Refactoring and Performance Improvements:**
    
    * Removed `const` qualifiers from the `write` and `_channel_add_rows`
    methods in both `writer.cpp` and `writer.h`, allowing these methods to
    modify internal state and reuse buffers for better performance.
    
[[1]](diffhunk://#diff-907673827bb9b79f913966cf9a582f8dce272a622d8f1382bf8e297454955194L33-R33)
    
[[2]](diffhunk://#diff-f91c438b58a2e487a928dd9cddfbbce754b9206bd9f1e08cb3e2e36f7dbf1ab7L39-R53)
    * Introduced internal member buffers (`_row_idx`,
    `_partition_rows_histogram`, `_channel_start_offsets`) in the `Writer`
    class to avoid repeated allocation of temporary arrays during row
    partitioning.
    * Refactored the logic in `_channel_add_rows` to use these internal
    buffers, resulting in more efficient calculation and assignment of row
    indices and partition sizes for each channel.
    
    **Code Simplification:**
    
    * Simplified the computation of partition histograms and channel
    offsets, making the code easier to read and maintain.
    
    These changes should lead to reduced memory allocation overhead and
    improved runtime efficiency in the shuffle writer logic.
---
 be/src/pipeline/shuffle/writer.cpp | 43 +++++++++++++++++++++++++-------------
 be/src/pipeline/shuffle/writer.h   |  8 +++++--
 2 files changed, 34 insertions(+), 17 deletions(-)

diff --git a/be/src/pipeline/shuffle/writer.cpp 
b/be/src/pipeline/shuffle/writer.cpp
index e6215e22cba..eecabd451dc 100644
--- a/be/src/pipeline/shuffle/writer.cpp
+++ b/be/src/pipeline/shuffle/writer.cpp
@@ -17,6 +17,8 @@
 
 #include "writer.h"
 
+#include <type_traits>
+
 #include "pipeline/exec/exchange_sink_operator.h"
 #include "vec/core/block.h"
 
@@ -30,7 +32,7 @@ void Writer::_handle_eof_channel(RuntimeState* state, 
ChannelPtrType channel, St
 }
 
 Status Writer::write(ExchangeSinkLocalState* local_state, RuntimeState* state,
-                     vectorized::Block* block, bool eos) const {
+                     vectorized::Block* block, bool eos) {
     bool already_sent = false;
     {
         SCOPED_TIMER(local_state->split_block_hash_compute_timer());
@@ -64,35 +66,46 @@ Status Writer::_channel_add_rows(RuntimeState* state,
                                  
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
                                  size_t partition_count,
                                  const ChannelIdType* __restrict channel_ids, 
size_t rows,
-                                 vectorized::Block* block, bool eos) const {
-    std::vector<uint32_t> partition_rows_histogram;
-    auto row_idx = vectorized::PODArray<uint32_t>(rows);
+                                 vectorized::Block* block, bool eos) {
+    _row_idx.resize(rows);
     {
-        partition_rows_histogram.assign(partition_count + 2, 0);
+        _partition_rows_histogram.resize(partition_count);
+        _channel_start_offsets.resize(partition_count);
+        for (size_t i = 0; i < partition_count; ++i) {
+            _partition_rows_histogram[i] = 0;
+        }
         for (size_t i = 0; i < rows; ++i) {
-            partition_rows_histogram[channel_ids[i] + 1]++;
+            _partition_rows_histogram[channel_ids[i]]++;
         }
-        for (size_t i = 1; i <= partition_count + 1; ++i) {
-            partition_rows_histogram[i] += partition_rows_histogram[i - 1];
+        _channel_start_offsets[0] = 0;
+        for (size_t i = 1; i < partition_count; ++i) {
+            _channel_start_offsets[i] =
+                    _channel_start_offsets[i - 1] + 
_partition_rows_histogram[i - 1];
         }
-        for (int32_t i = cast_set<int32_t>(rows) - 1; i >= 0; --i) {
-            row_idx[partition_rows_histogram[channel_ids[i] + 1] - 1] = i;
-            partition_rows_histogram[channel_ids[i] + 1]--;
+        for (uint32_t i = 0; i < rows; i++) {
+            if constexpr (std::is_signed_v<ChannelIdType>) {
+                // -1 means this row is filtered by table sink hash partitioner
+                if (channel_ids[i] == -1) {
+                    continue;
+                }
+            }
+            _row_idx[_channel_start_offsets[channel_ids[i]]++] = i;
         }
     }
     Status status = Status::OK();
+    uint32_t offset = 0;
     for (size_t i = 0; i < partition_count; ++i) {
-        uint32_t start = partition_rows_histogram[i + 1];
-        uint32_t size = partition_rows_histogram[i + 2] - start;
+        uint32_t size = _partition_rows_histogram[i];
         if (!channels[i]->is_receiver_eof() && size > 0) {
-            status = channels[i]->add_rows(block, row_idx.data(), start, size, 
false);
+            status = channels[i]->add_rows(block, _row_idx.data(), offset, 
size, false);
             HANDLE_CHANNEL_STATUS(state, channels[i], status);
         }
+        offset += size;
     }
     if (eos) {
         for (int i = 0; i < partition_count; ++i) {
             if (!channels[i]->is_receiver_eof()) {
-                status = channels[i]->add_rows(block, row_idx.data(), 0, 0, 
true);
+                status = channels[i]->add_rows(block, _row_idx.data(), 0, 0, 
true);
                 HANDLE_CHANNEL_STATUS(state, channels[i], status);
             }
         }
diff --git a/be/src/pipeline/shuffle/writer.h b/be/src/pipeline/shuffle/writer.h
index 0eb77212029..3352d395682 100644
--- a/be/src/pipeline/shuffle/writer.h
+++ b/be/src/pipeline/shuffle/writer.h
@@ -36,17 +36,21 @@ public:
     Writer() = default;
 
     Status write(ExchangeSinkLocalState* local_state, RuntimeState* state, 
vectorized::Block* block,
-                 bool eos) const;
+                 bool eos);
 
 private:
     template <typename ChannelIdType>
     Status _channel_add_rows(RuntimeState* state,
                              
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
                              size_t partition_count, const ChannelIdType* 
__restrict channel_ids,
-                             size_t rows, vectorized::Block* block, bool eos) 
const;
+                             size_t rows, vectorized::Block* block, bool eos);
 
     template <typename ChannelPtrType>
     void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, 
Status st) const;
+
+    vectorized::PaddedPODArray<uint32_t> _row_idx;
+    vectorized::PaddedPODArray<uint32_t> _partition_rows_histogram;
+    vectorized::PaddedPODArray<uint32_t> _channel_start_offsets;
 };
 #include "common/compile_check_end.h"
 } // namespace pipeline


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to