This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 07bd65abe08 [pipelineX](local shuffle) remove unused code in local
shuffle to improve performance #29292
07bd65abe08 is described below
commit 07bd65abe085a2f644f1c3533e03a851b36d2505
Author: Mryange <[email protected]>
AuthorDate: Fri Dec 29 20:30:28 2023 +0800
[pipelineX](local shuffle) remove unused code in local shuffle to improve
performance #29292
---
.../pipeline_x/local_exchange/local_exchanger.cpp | 141 +++++----------------
.../pipeline_x/local_exchange/local_exchanger.h | 15 +--
2 files changed, 31 insertions(+), 125 deletions(-)
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index fbe5098f639..b2f1bb00ebe 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -283,38 +283,12 @@ Status
AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
new_block.swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_partitions;
local_state._shared_state->add_mem_usage(channel_id,
new_block.allocated_bytes());
- _passthrough_data_queue[channel_id].enqueue(std::move(new_block));
+ _data_queue[channel_id].enqueue(std::move(new_block));
local_state._shared_state->set_ready_to_read(channel_id);
return Status::OK();
}
-bool AdaptivePassthroughExchanger::_passthrough_get_block(
- RuntimeState* state, vectorized::Block* block, SourceState&
source_state,
- LocalExchangeSourceLocalState& local_state) {
- vectorized::Block next_block;
- if (_running_sink_operators == 0) {
- if
(_passthrough_data_queue[local_state._channel_id].try_dequeue(next_block)) {
- block->swap(next_block);
- _free_blocks.enqueue(std::move(next_block));
- local_state._shared_state->sub_mem_usage(local_state._channel_id,
- block->allocated_bytes());
- } else {
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
- source_state = SourceState::FINISHED;
- return false;
- }
- } else if
(_passthrough_data_queue[local_state._channel_id].try_dequeue(next_block)) {
- block->swap(next_block);
- _free_blocks.enqueue(std::move(next_block));
- local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
- } else {
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
- return false;
- }
- return true;
-}
-
Status AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state,
vectorized::Block* block,
SourceState source_state,
LocalExchangeSinkLocalState& local_state) {
@@ -339,7 +313,7 @@ Status
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
const uint32_t* __restrict
channel_ids,
vectorized::Block* block,
SourceState source_state,
LocalExchangeSinkLocalState&
local_state) {
- auto& data_queue = _shuffle_data_queue;
+ auto& data_queue = _data_queue;
const auto rows = block->rows();
auto row_idx = std::make_shared<std::vector<uint32_t>>(rows);
{
@@ -357,79 +331,21 @@ Status
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
local_state._partition_rows_histogram[channel_ids[i]]--;
}
}
-
- vectorized::Block data_block;
- std::shared_ptr<ShuffleBlockWrapper> new_block_wrapper;
- if (_free_blocks.try_enqueue(data_block)) {
- new_block_wrapper =
ShuffleBlockWrapper::create_shared(std::move(data_block));
- } else {
- new_block_wrapper =
ShuffleBlockWrapper::create_shared(block->clone_empty());
- }
-
- new_block_wrapper->data_block.swap(*block);
- if (new_block_wrapper->data_block.empty()) {
- return Status::OK();
- }
-
local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes());
- new_block_wrapper->ref(_num_partitions);
-
for (size_t i = 0; i < _num_partitions; i++) {
- size_t start = local_state._partition_rows_histogram[i];
- size_t size = local_state._partition_rows_histogram[i + 1] - start;
+ const size_t start = local_state._partition_rows_histogram[i];
+ const size_t size = local_state._partition_rows_histogram[i + 1] -
start;
if (size > 0) {
- local_state._shared_state->add_mem_usage(
- i, new_block_wrapper->data_block.allocated_bytes(), false);
- data_queue[i].enqueue({new_block_wrapper, {row_idx, start, size}});
- local_state._shared_state->set_ready_to_read(i);
- } else {
- new_block_wrapper->unref(local_state._shared_state);
+ std::unique_ptr<vectorized::MutableBlock> mutable_block =
+
vectorized::MutableBlock::create_unique(block->clone_empty());
+ mutable_block->add_rows(block, start, size);
+ auto new_block = mutable_block->to_block();
+ local_state._shared_state->add_mem_usage(i,
new_block.allocated_bytes());
+ data_queue[i].enqueue(std::move(new_block));
}
+ local_state._shared_state->set_ready_to_read(i);
}
-
return Status::OK();
}
-bool AdaptivePassthroughExchanger::_shuffle_get_block(RuntimeState* state,
vectorized::Block* block,
- SourceState&
source_state,
-
LocalExchangeSourceLocalState& local_state) {
- PartitionedBlock partitioned_block;
- std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr;
-
- auto get_data = [&](vectorized::Block* result_block) {
- do {
- const auto* offset_start = &((
-
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
- auto block_wrapper = partitioned_block.first;
- local_state._shared_state->sub_mem_usage(
- local_state._channel_id,
block_wrapper->data_block.allocated_bytes(), false);
- mutable_block->add_rows(&block_wrapper->data_block, offset_start,
- offset_start +
std::get<2>(partitioned_block.second));
- block_wrapper->unref(local_state._shared_state);
- } while (mutable_block->rows() < state->batch_size() &&
-
_shuffle_data_queue[local_state._channel_id].try_dequeue(partitioned_block));
- *result_block = mutable_block->to_block();
- };
- if (_running_sink_operators == 0) {
- if
(_shuffle_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
- SCOPED_TIMER(local_state._copy_data_timer);
- mutable_block = vectorized::MutableBlock::create_unique(
- partitioned_block.first->data_block.clone_empty());
- get_data(block);
- } else {
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
- source_state = SourceState::FINISHED;
- return false;
- }
- } else if
(_shuffle_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
- SCOPED_TIMER(local_state._copy_data_timer);
- mutable_block = vectorized::MutableBlock::create_unique(
- partitioned_block.first->data_block.clone_empty());
- get_data(block);
- } else {
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
- return false;
- }
- return true;
-}
Status AdaptivePassthroughExchanger::sink(RuntimeState* state,
vectorized::Block* in_block,
SourceState source_state,
@@ -447,23 +363,24 @@ Status AdaptivePassthroughExchanger::sink(RuntimeState*
state, vectorized::Block
Status AdaptivePassthroughExchanger::get_block(RuntimeState* state,
vectorized::Block* block,
SourceState& source_state,
LocalExchangeSourceLocalState&
local_state) {
- auto is_shuffle_success = false, is_passthrough_success = false;
- SourceState shuffle_state = SourceState::MORE_DATA, passthrough_state =
SourceState::MORE_DATA;
-
- is_shuffle_success = _shuffle_get_block(state, block, shuffle_state,
local_state);
-
- if (is_shuffle_success) {
- return Status::OK();
- }
-
- is_passthrough_success = _passthrough_get_block(state, block,
passthrough_state, local_state);
-
- if (is_passthrough_success) {
- return Status::OK();
- }
-
- if (shuffle_state == SourceState::FINISHED && passthrough_state ==
SourceState::FINISHED) {
- source_state = SourceState::FINISHED;
+ vectorized::Block next_block;
+ if (_running_sink_operators == 0) {
+ if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+ block->swap(next_block);
+ _free_blocks.enqueue(std::move(next_block));
+ local_state._shared_state->sub_mem_usage(local_state._channel_id,
+ block->allocated_bytes());
+ } else {
+ COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
+ source_state = SourceState::FINISHED;
+ }
+ } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+ block->swap(next_block);
+ _free_blocks.enqueue(std::move(next_block));
+ local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
+ } else {
+ COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
+ local_state._dependency->block();
}
return Status::OK();
}
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
index ff64c88d5aa..28c99e751ea 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
@@ -183,14 +183,10 @@ private:
// a copy of ShuffleExchanger and PassthroughExchanger.
class AdaptivePassthroughExchanger : public Exchanger {
public:
- using PartitionedBlock =
- std::pair<std::shared_ptr<ShuffleBlockWrapper>,
- std::tuple<std::shared_ptr<std::vector<uint32_t>>,
size_t, size_t>>;
ENABLE_FACTORY_CREATOR(AdaptivePassthroughExchanger);
AdaptivePassthroughExchanger(int running_sink_operators, int
num_partitions)
: Exchanger(running_sink_operators, num_partitions) {
- _passthrough_data_queue.resize(num_partitions);
- _shuffle_data_queue.resize(num_partitions);
+ _data_queue.resize(num_partitions);
}
Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState
source_state,
LocalExchangeSinkLocalState& local_state) override;
@@ -204,17 +200,10 @@ private:
SourceState source_state,
LocalExchangeSinkLocalState& local_state);
Status _shuffle_sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state,
LocalExchangeSinkLocalState& local_state);
-
- bool _passthrough_get_block(RuntimeState* state, vectorized::Block* block,
- SourceState& source_state,
- LocalExchangeSourceLocalState& local_state);
- bool _shuffle_get_block(RuntimeState* state, vectorized::Block* block,
- SourceState& source_state,
LocalExchangeSourceLocalState& local_state);
Status _split_rows(RuntimeState* state, const uint32_t* __restrict
channel_ids,
vectorized::Block* block, SourceState source_state,
LocalExchangeSinkLocalState& local_state);
- std::vector<moodycamel::ConcurrentQueue<vectorized::Block>>
_passthrough_data_queue;
- std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>>
_shuffle_data_queue;
+ std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
std::atomic_bool _is_pass_through = false;
std::atomic_int32_t _total_block = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]