This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d9a3aa5c068 [pipelineX](improvement) improve local shuffle (#25964)
d9a3aa5c068 is described below
commit d9a3aa5c068628c9650d9aabe5abdb7dfdc3bdc2
Author: Gabriel <[email protected]>
AuthorDate: Thu Oct 26 21:42:43 2023 +0800
[pipelineX](improvement) improve local shuffle (#25964)
---
be/src/pipeline/pipeline_x/dependency.h | 5 ++-
.../local_exchange_sink_operator.cpp | 47 ++++++++++++----------
.../local_exchange/local_exchange_sink_operator.h | 6 +--
.../local_exchange_source_operator.cpp | 21 +++++++++-
.../local_exchange_source_operator.h | 1 +
be/src/pipeline/pipeline_x/operator.cpp | 1 +
be/src/pipeline/pipeline_x/operator.h | 6 ++-
.../pipeline_x/pipeline_x_fragment_context.cpp | 1 -
8 files changed, 59 insertions(+), 29 deletions(-)
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 028b78957b4..131198c4496 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -840,11 +840,12 @@ private:
bool is_set_probe {false};
};
+using PartitionedBlock = std::pair<std::shared_ptr<vectorized::Block>,
+
std::tuple<std::shared_ptr<std::vector<int>>, size_t, size_t>>;
struct LocalExchangeSharedState {
public:
ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
- std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> data_queue;
- int num_partitions = 0;
+ std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> data_queue;
std::atomic<int> running_sink_operators = 0;
};
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
index ec959b20cea..b69150ba512 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
@@ -27,33 +27,38 @@ Status LocalExchangeSinkLocalState::init(RuntimeState*
state, LocalSinkStateInfo
_distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");
auto& p = _parent->cast<LocalExchangeSinkOperatorX>();
RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner));
- _mutable_block.resize(p._num_partitions);
_shared_state->running_sink_operators++;
return Status::OK();
}
-Status LocalExchangeSinkLocalState::channel_add_rows(RuntimeState* state,
- const uint32_t*
__restrict channel_ids,
- vectorized::Block* block,
- SourceState source_state)
{
+Status LocalExchangeSinkLocalState::split_rows(RuntimeState* state,
+ const uint32_t* __restrict
channel_ids,
+ vectorized::Block* block,
SourceState source_state) {
auto& data_queue = _shared_state->data_queue;
- std::vector<int> channel2rows[data_queue.size()];
-
- auto rows = block->rows();
- for (int i = 0; i < rows; i++) {
- channel2rows[channel_ids[i]].emplace_back(i);
- }
- for (size_t i = 0; i < data_queue.size(); i++) {
- if (_mutable_block[i] == nullptr) {
- _mutable_block[i] =
vectorized::MutableBlock::create_unique(block->clone_empty());
+ const auto num_partitions = data_queue.size();
+ const auto rows = block->rows();
+ auto row_idx = std::make_shared<std::vector<int>>(rows);
+ {
+ _partition_rows_histogram.assign(num_partitions + 1, 0);
+ for (size_t i = 0; i < rows; ++i) {
+ _partition_rows_histogram[channel_ids[i]]++;
+ }
+ for (int32_t i = 1; i <= num_partitions; ++i) {
+ _partition_rows_histogram[i] += _partition_rows_histogram[i - 1];
}
- const int* begin = channel2rows[i].data();
- _mutable_block[i]->add_rows(block, begin, begin +
channel2rows[i].size());
- if (_mutable_block[i]->rows() > state->batch_size() ||
- source_state == SourceState::FINISHED) {
- data_queue[i].enqueue(_mutable_block[i]->to_block());
- _mutable_block[i].reset(nullptr);
+ for (int32_t i = rows - 1; i >= 0; --i) {
+ (*row_idx)[_partition_rows_histogram[channel_ids[i]] - 1] = i;
+ _partition_rows_histogram[channel_ids[i]]--;
+ }
+ }
+ auto new_block = vectorized::Block::create_shared(block->clone_empty());
+ new_block->swap(*block);
+ for (size_t i = 0; i < num_partitions; i++) {
+ size_t start = _partition_rows_histogram[i];
+ size_t size = _partition_rows_histogram[i + 1] - start;
+ if (size > 0) {
+ data_queue[i].enqueue({new_block, {row_idx, start, size}});
}
}
@@ -72,7 +77,7 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block*
}
{
SCOPED_TIMER(local_state._distribute_timer);
- RETURN_IF_ERROR(local_state.channel_add_rows(
+ RETURN_IF_ERROR(local_state.split_rows(
state, (const
uint32_t*)local_state._partitioner->get_channel_ids(), in_block,
source_state));
}
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index 4025def28e7..56d3d774609 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -34,8 +34,8 @@ public:
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
- Status channel_add_rows(RuntimeState* state, const uint32_t* __restrict
channel_ids,
- vectorized::Block* block, SourceState
source_state);
+ Status split_rows(RuntimeState* state, const uint32_t* __restrict
channel_ids,
+ vectorized::Block* block, SourceState source_state);
private:
friend class LocalExchangeSinkOperatorX;
@@ -43,7 +43,7 @@ private:
RuntimeProfile::Counter* _compute_hash_value_timer = nullptr;
RuntimeProfile::Counter* _distribute_timer = nullptr;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
- std::vector<std::unique_ptr<vectorized::MutableBlock>> _mutable_block;
+ std::vector<size_t> _partition_rows_histogram;
};
// A single 32-bit division on a recent x64 processor has a throughput of one
instruction every six cycles with a latency of 26 cycles.
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index 83c712a8022..75d9b4ac471 100644
---
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -30,6 +30,7 @@ Status LocalExchangeSourceLocalState::init(RuntimeState*
state, LocalStateInfo&
_dependency->set_channel_id(_channel_id);
_get_block_failed_counter =
ADD_COUNTER_WITH_LEVEL(profile(), "GetBlockFailedTime",
TUnit::UNIT, 1);
+ _copy_data_timer = ADD_TIMER(profile(), "CopyDataTime");
return Status::OK();
}
@@ -37,7 +38,25 @@ Status LocalExchangeSourceOperatorX::get_block(RuntimeState*
state, vectorized::
SourceState& source_state) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.profile()->total_time_counter());
- if
(!local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(*block))
{
+ PartitionedBlock partitioned_block;
+ std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr;
+
+ if
(local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
+ partitioned_block)) {
+ SCOPED_TIMER(local_state._copy_data_timer);
+ mutable_block =
+
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
+
+ do {
+ const auto* offset_start = &((
+
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
+ mutable_block->add_rows(partitioned_block.first.get(),
offset_start,
+ offset_start +
std::get<2>(partitioned_block.second));
+ } while
(local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
+ partitioned_block) &&
+ mutable_block->rows() < state->batch_size());
+ *block = mutable_block->to_block();
+ } else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
if (local_state._shared_state->running_sink_operators == 0) {
source_state = SourceState::FINISHED;
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
index ddd47ec18cb..b992f454696 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
@@ -37,6 +37,7 @@ private:
int _channel_id;
RuntimeProfile::Counter* _get_block_failed_counter = nullptr;
+ RuntimeProfile::Counter* _copy_data_timer = nullptr;
};
class LocalExchangeSourceOperatorX final : public
OperatorX<LocalExchangeSourceLocalState> {
diff --git a/be/src/pipeline/pipeline_x/operator.cpp
b/be/src/pipeline/pipeline_x/operator.cpp
index 9377f5f2701..c29e1b8af87 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -175,6 +175,7 @@ void PipelineXLocalStateBase::clear_origin_block() {
Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block*
origin_block,
vectorized::Block* output_block) const {
auto local_state = state->get_local_state(operator_id());
+ SCOPED_TIMER(local_state->profile()->total_time_counter());
SCOPED_TIMER(local_state->_projection_timer);
using namespace vectorized;
vectorized::MutableBlock mutable_block =
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index bfad246d3e6..79762e6ee19 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -155,7 +155,11 @@ public:
}
OperatorXBase(ObjectPool* pool, int node_id, int operator_id)
- : OperatorBase(nullptr), _operator_id(operator_id),
_node_id(node_id), _pool(pool) {};
+ : OperatorBase(nullptr),
+ _operator_id(operator_id),
+ _node_id(node_id),
+ _pool(pool),
+ _limit(-1) {}
virtual Status init(const TPlanNode& tnode, RuntimeState* state);
Status init(const TDataSink& tsink) override {
LOG(FATAL) << "should not reach here!";
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 78a888af9df..5ca5829e1a8 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -577,7 +577,6 @@ Status
PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, OperatorX
auto shared_state = LocalExchangeSharedState::create_shared();
shared_state->data_queue.resize(_runtime_state->query_parallel_instance_num());
- shared_state->num_partitions =
_runtime_state->query_parallel_instance_num();
_op_id_to_le_state.insert({local_exchange_id, shared_state});
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]