This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ba3bb0ffed9 [Improvement](exchange) optimize for channel_add_rows
(#57997)
ba3bb0ffed9 is described below
commit ba3bb0ffed9d784ac6812b0d06f1b495d5273e3a
Author: Pxl <[email protected]>
AuthorDate: Thu Nov 20 15:48:29 2025 +0800
[Improvement](exchange) optimize for channel_add_rows (#57997)
```sql
with wscs as (
select
ws_sold_date_sk sold_date_sk,
ws_ext_sales_price sales_price
from
web_sales
)
select
d_week_seq
from
wscs
right semi join date_dim on d_date_sk = sold_date_sk
group by
d_week_seq;
```
- LocalSentRows: 720.000376M (720000376)
before:
- DistributeRowsIntoChannelsTime: 4sec560ms
after:
- DistributeRowsIntoChannelsTime: 3sec856ms
Problem Summary:
This pull request refactors the `Writer` class in the shuffle pipeline
to improve performance and memory management by changing how row indices
and partition histograms are handled. The main changes include removing
unnecessary `const` qualifiers, reusing internal buffers to avoid
repeated allocations, and simplifying the logic for partitioning rows.
**Refactoring and Performance Improvements:**
* Removed `const` qualifiers from the `write` and `_channel_add_rows`
methods in both `writer.cpp` and `writer.h`, allowing these methods to
modify internal state and reuse buffers for better performance.
[[1]](diffhunk://#diff-907673827bb9b79f913966cf9a582f8dce272a622d8f1382bf8e297454955194L33-R33)
[[2]](diffhunk://#diff-f91c438b58a2e487a928dd9cddfbbce754b9206bd9f1e08cb3e2e36f7dbf1ab7L39-R53)
* Introduced internal member buffers (`_row_idx`,
`_partition_rows_histogram`, `_channel_start_offsets`) in the `Writer`
class to avoid repeated allocation of temporary arrays during row
partitioning.
* Refactored the logic in `_channel_add_rows` to use these internal
buffers, resulting in more efficient calculation and assignment of row
indices and partition sizes for each channel.
**Code Simplification:**
* Simplified the computation of partition histograms and channel
offsets, making the code easier to read and maintain.
These changes should lead to reduced memory allocation overhead and
improved runtime efficiency in the shuffle writer logic.
---
be/src/pipeline/shuffle/writer.cpp | 43 +++++++++++++++++++++++++-------------
be/src/pipeline/shuffle/writer.h | 8 +++++--
2 files changed, 34 insertions(+), 17 deletions(-)
diff --git a/be/src/pipeline/shuffle/writer.cpp
b/be/src/pipeline/shuffle/writer.cpp
index e6215e22cba..eecabd451dc 100644
--- a/be/src/pipeline/shuffle/writer.cpp
+++ b/be/src/pipeline/shuffle/writer.cpp
@@ -17,6 +17,8 @@
#include "writer.h"
+#include <type_traits>
+
#include "pipeline/exec/exchange_sink_operator.h"
#include "vec/core/block.h"
@@ -30,7 +32,7 @@ void Writer::_handle_eof_channel(RuntimeState* state,
ChannelPtrType channel, St
}
Status Writer::write(ExchangeSinkLocalState* local_state, RuntimeState* state,
- vectorized::Block* block, bool eos) const {
+ vectorized::Block* block, bool eos) {
bool already_sent = false;
{
SCOPED_TIMER(local_state->split_block_hash_compute_timer());
@@ -64,35 +66,46 @@ Status Writer::_channel_add_rows(RuntimeState* state,
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
size_t partition_count,
const ChannelIdType* __restrict channel_ids,
size_t rows,
- vectorized::Block* block, bool eos) const {
- std::vector<uint32_t> partition_rows_histogram;
- auto row_idx = vectorized::PODArray<uint32_t>(rows);
+ vectorized::Block* block, bool eos) {
+ _row_idx.resize(rows);
{
- partition_rows_histogram.assign(partition_count + 2, 0);
+ _partition_rows_histogram.resize(partition_count);
+ _channel_start_offsets.resize(partition_count);
+ for (size_t i = 0; i < partition_count; ++i) {
+ _partition_rows_histogram[i] = 0;
+ }
for (size_t i = 0; i < rows; ++i) {
- partition_rows_histogram[channel_ids[i] + 1]++;
+ _partition_rows_histogram[channel_ids[i]]++;
}
- for (size_t i = 1; i <= partition_count + 1; ++i) {
- partition_rows_histogram[i] += partition_rows_histogram[i - 1];
+ _channel_start_offsets[0] = 0;
+ for (size_t i = 1; i < partition_count; ++i) {
+ _channel_start_offsets[i] =
+ _channel_start_offsets[i - 1] +
_partition_rows_histogram[i - 1];
}
- for (int32_t i = cast_set<int32_t>(rows) - 1; i >= 0; --i) {
- row_idx[partition_rows_histogram[channel_ids[i] + 1] - 1] = i;
- partition_rows_histogram[channel_ids[i] + 1]--;
+ for (uint32_t i = 0; i < rows; i++) {
+ if constexpr (std::is_signed_v<ChannelIdType>) {
+ // -1 means this row is filtered by table sink hash partitioner
+ if (channel_ids[i] == -1) {
+ continue;
+ }
+ }
+ _row_idx[_channel_start_offsets[channel_ids[i]]++] = i;
}
}
Status status = Status::OK();
+ uint32_t offset = 0;
for (size_t i = 0; i < partition_count; ++i) {
- uint32_t start = partition_rows_histogram[i + 1];
- uint32_t size = partition_rows_histogram[i + 2] - start;
+ uint32_t size = _partition_rows_histogram[i];
if (!channels[i]->is_receiver_eof() && size > 0) {
- status = channels[i]->add_rows(block, row_idx.data(), start, size,
false);
+ status = channels[i]->add_rows(block, _row_idx.data(), offset,
size, false);
HANDLE_CHANNEL_STATUS(state, channels[i], status);
}
+ offset += size;
}
if (eos) {
for (int i = 0; i < partition_count; ++i) {
if (!channels[i]->is_receiver_eof()) {
- status = channels[i]->add_rows(block, row_idx.data(), 0, 0,
true);
+ status = channels[i]->add_rows(block, _row_idx.data(), 0, 0,
true);
HANDLE_CHANNEL_STATUS(state, channels[i], status);
}
}
diff --git a/be/src/pipeline/shuffle/writer.h b/be/src/pipeline/shuffle/writer.h
index 0eb77212029..3352d395682 100644
--- a/be/src/pipeline/shuffle/writer.h
+++ b/be/src/pipeline/shuffle/writer.h
@@ -36,17 +36,21 @@ public:
Writer() = default;
Status write(ExchangeSinkLocalState* local_state, RuntimeState* state,
vectorized::Block* block,
- bool eos) const;
+ bool eos);
private:
template <typename ChannelIdType>
Status _channel_add_rows(RuntimeState* state,
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
size_t partition_count, const ChannelIdType*
__restrict channel_ids,
- size_t rows, vectorized::Block* block, bool eos)
const;
+ size_t rows, vectorized::Block* block, bool eos);
template <typename ChannelPtrType>
void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel,
Status st) const;
+
+ vectorized::PaddedPODArray<uint32_t> _row_idx;
+ vectorized::PaddedPODArray<uint32_t> _partition_rows_histogram;
+ vectorized::PaddedPODArray<uint32_t> _channel_start_offsets;
};
#include "common/compile_check_end.h"
} // namespace pipeline
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]