This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c0aac043b66 [pipelineX](local shuffle) Use local shuffle to optimize
BHJ (#27823)
c0aac043b66 is described below
commit c0aac043b66e6b3f14468a7c77e40f7d7c5c8f64
Author: Gabriel <[email protected]>
AuthorDate: Thu Nov 30 21:08:45 2023 +0800
[pipelineX](local shuffle) Use local shuffle to optimize BHJ (#27823)
---
be/src/pipeline/exec/exchange_source_operator.h | 5 +
be/src/pipeline/pipeline.h | 20 +++
be/src/pipeline/pipeline_x/dependency.cpp | 9 +-
be/src/pipeline/pipeline_x/dependency.h | 15 +--
.../local_exchange_sink_operator.cpp | 59 ++-------
.../local_exchange/local_exchange_sink_operator.h | 40 ++++--
.../local_exchange_source_operator.cpp | 57 +++-----
.../local_exchange_source_operator.h | 17 +--
.../pipeline_x/local_exchange/local_exchanger.cpp | 147 +++++++++++++++++++++
.../pipeline_x/local_exchange/local_exchanger.h | 98 ++++++++++++++
be/src/pipeline/pipeline_x/operator.h | 4 +-
.../pipeline_x/pipeline_x_fragment_context.cpp | 67 ++++++++--
.../pipeline_x/pipeline_x_fragment_context.h | 5 +-
13 files changed, 412 insertions(+), 131 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_source_operator.h
b/be/src/pipeline/exec/exchange_source_operator.h
index abac33001bb..479a8799058 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -104,6 +104,11 @@ public:
return _sub_plan_query_statistics_recvr;
}
+ bool need_to_local_shuffle() const override {
+ // TODO(gabriel):
+ return false;
+ }
+
private:
friend class ExchangeLocalState;
const int _num_senders;
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index f4b7928887c..a0b6de5c62e 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -126,6 +126,17 @@ public:
return _collect_query_statistics_with_every_batch;
}
+ bool need_to_local_shuffle() const { return _need_to_local_shuffle; }
+ void set_need_to_local_shuffle(bool need_to_local_shuffle) {
+ _need_to_local_shuffle = need_to_local_shuffle;
+ }
+ void init_need_to_local_shuffle_by_source() {
+ set_need_to_local_shuffle(operatorXs.front()->need_to_local_shuffle());
+ }
+
+ std::vector<std::shared_ptr<Pipeline>>& children() { return _children; }
+ void set_children(std::shared_ptr<Pipeline> child) {
_children.push_back(child); }
+
private:
void _init_profile();
@@ -136,6 +147,8 @@ private:
std::vector<std::pair<int, std::weak_ptr<Pipeline>>> _parents;
std::vector<std::pair<int, std::shared_ptr<Pipeline>>> _dependencies;
+ std::vector<std::shared_ptr<Pipeline>> _children;
+
PipelineId _pipeline_id;
std::weak_ptr<PipelineFragmentContext> _context;
int _previous_schedule_id = -1;
@@ -178,6 +191,13 @@ private:
bool _always_can_write = false;
bool _is_root_pipeline = false;
bool _collect_query_statistics_with_every_batch = false;
+
+ // If source operator meets one of all conditions below:
+ // 1. is scan operator with Hash Bucket
+ // 2. is exchange operator with Hash/BucketHash partition
+ // then set `_need_to_local_shuffle` to false which means we should use
local shuffle in this fragment
+ // because data already be partitioned by storage/shuffling.
+ bool _need_to_local_shuffle = true;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/dependency.cpp
b/be/src/pipeline/pipeline_x/dependency.cpp
index 2fcf0906c1c..dcb149d0784 100644
--- a/be/src/pipeline/pipeline_x/dependency.cpp
+++ b/be/src/pipeline/pipeline_x/dependency.cpp
@@ -22,7 +22,7 @@
#include "common/logging.h"
#include "pipeline/pipeline_fragment_context.h"
-#include "pipeline/pipeline_task.h"
+#include "pipeline/pipeline_x/local_exchange/local_exchanger.h"
#include "pipeline/pipeline_x/pipeline_x_task.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker.h"
@@ -180,4 +180,11 @@ void RuntimeFilterDependency::sub_filters() {
}
}
+void LocalExchangeSharedState::sub_running_sink_operators() {
+ std::unique_lock<std::mutex> lc(le_lock);
+ if (exchanger->running_sink_operators.fetch_sub(1) == 1) {
+ _set_ready_for_read();
+ }
+}
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 16e98f11b28..9fbb25aaa2f 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -31,6 +31,7 @@
#include "gutil/integral_types.h"
#include "pipeline/exec/data_queue.h"
#include "pipeline/exec/multi_cast_data_streamer.h"
+#include "pipeline/exec/operator.h"
#include "vec/common/hash_table/hash_map_context_creator.h"
#include "vec/common/sort/partition_sorter.h"
#include "vec/common/sort/sorter.h"
@@ -579,21 +580,15 @@ public:
}
};
-using PartitionedBlock = std::pair<std::shared_ptr<vectorized::Block>,
-
std::tuple<std::shared_ptr<std::vector<int>>, size_t, size_t>>;
+class Exchanger;
+
struct LocalExchangeSharedState : public BasicSharedState {
public:
ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
- std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> data_queue;
+ std::unique_ptr<Exchanger> exchanger {};
std::vector<Dependency*> source_dependencies;
- std::atomic<int> running_sink_operators = 0;
std::mutex le_lock;
- void sub_running_sink_operators() {
- std::unique_lock<std::mutex> lc(le_lock);
- if (running_sink_operators.fetch_sub(1) == 1) {
- _set_ready_for_read();
- }
- }
+ void sub_running_sink_operators();
void _set_ready_for_read() {
for (auto* dep : source_dependencies) {
DCHECK(dep);
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
index 12cc5e042e8..3d1540cdc46 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.cpp
@@ -17,6 +17,8 @@
#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h"
+#include "pipeline/pipeline_x/local_exchange/local_exchanger.h"
+
namespace doris::pipeline {
Status LocalExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info) {
@@ -26,47 +28,12 @@ Status LocalExchangeSinkLocalState::init(RuntimeState*
state, LocalSinkStateInfo
_compute_hash_value_timer = ADD_TIMER(profile(), "ComputeHashValueTime");
_distribute_timer = ADD_TIMER(profile(), "DistributeDataTime");
- auto& p = _parent->cast<LocalExchangeSinkOperatorX>();
- _num_rows_in_queue.resize(p._num_partitions);
- for (size_t i = 0; i < p._num_partitions; i++) {
- _num_rows_in_queue[i] = ADD_COUNTER_WITH_LEVEL(
- profile(), "NumRowsInQueue" + std::to_string(i), TUnit::UNIT,
1);
- }
- RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner));
- return Status::OK();
-}
-
-Status LocalExchangeSinkLocalState::split_rows(RuntimeState* state,
- const uint32_t* __restrict
channel_ids,
- vectorized::Block* block,
SourceState source_state) {
- auto& data_queue = _shared_state->data_queue;
- const auto num_partitions = data_queue.size();
- const auto rows = block->rows();
- auto row_idx = std::make_shared<std::vector<int>>(rows);
- {
- _partition_rows_histogram.assign(num_partitions + 1, 0);
- for (size_t i = 0; i < rows; ++i) {
- _partition_rows_histogram[channel_ids[i]]++;
- }
- for (int32_t i = 1; i <= num_partitions; ++i) {
- _partition_rows_histogram[i] += _partition_rows_histogram[i - 1];
- }
+ _exchanger = _shared_state->exchanger.get();
+ DCHECK(_exchanger != nullptr);
- for (int32_t i = rows - 1; i >= 0; --i) {
- (*row_idx)[_partition_rows_histogram[channel_ids[i]] - 1] = i;
- _partition_rows_histogram[channel_ids[i]]--;
- }
- }
- auto new_block = vectorized::Block::create_shared(block->clone_empty());
- new_block->swap(*block);
- for (size_t i = 0; i < num_partitions; i++) {
- size_t start = _partition_rows_histogram[i];
- size_t size = _partition_rows_histogram[i + 1] - start;
- if (size > 0) {
- data_queue[i].enqueue({new_block, {row_idx, start, size}});
- _shared_state->set_ready_for_read(i);
- COUNTER_UPDATE(_num_rows_in_queue[i], size);
- }
+ if (_exchanger->get_type() == ExchangeType::SHUFFLE) {
+ auto& p = _parent->cast<LocalExchangeSinkOperatorX>();
+ RETURN_IF_ERROR(p._partitioner->clone(state, _partitioner));
}
return Status::OK();
@@ -77,17 +44,7 @@ Status LocalExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block*
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
- {
- SCOPED_TIMER(local_state._compute_hash_value_timer);
- RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state,
in_block,
-
local_state.mem_tracker()));
- }
- {
- SCOPED_TIMER(local_state._distribute_timer);
- RETURN_IF_ERROR(local_state.split_rows(
- state, (const
uint32_t*)local_state._partitioner->get_channel_ids(), in_block,
- source_state));
- }
+ RETURN_IF_ERROR(local_state._exchanger->sink(state, in_block,
source_state, local_state));
if (source_state == SourceState::FINISHED) {
local_state._shared_state->sub_running_sink_operators();
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index 45d61d4ff6b..c5fe9dc7dc4 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -30,6 +30,9 @@ public:
~LocalExchangeSinkDependency() override = default;
};
+class Exchanger;
+class ShuffleExchanger;
+class PassthroughExchanger;
class LocalExchangeSinkOperatorX;
class LocalExchangeSinkLocalState final
: public PipelineXSinkLocalState<LocalExchangeSinkDependency> {
@@ -43,17 +46,21 @@ public:
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
- Status split_rows(RuntimeState* state, const uint32_t* __restrict
channel_ids,
- vectorized::Block* block, SourceState source_state);
-
private:
friend class LocalExchangeSinkOperatorX;
+ friend class ShuffleExchanger;
+ friend class PassthroughExchanger;
+
+ Exchanger* _exchanger = nullptr;
+ // Used by shuffle exchanger
RuntimeProfile::Counter* _compute_hash_value_timer = nullptr;
RuntimeProfile::Counter* _distribute_timer = nullptr;
- std::vector<RuntimeProfile::Counter*> _num_rows_in_queue {};
std::unique_ptr<vectorized::PartitionerBase> _partitioner = nullptr;
- std::vector<size_t> _partition_rows_histogram {};
+ std::vector<size_t> _partition_rows_histogram;
+
+ // Used by random passthrough exchanger
+ int _channel_id = 0;
};
// A single 32-bit division on a recent x64 processor has a throughput of one
instruction every six cycles with a latency of 26 cycles.
@@ -82,21 +89,31 @@ public:
return Status::InternalError("{} should not init with TPlanNode",
Base::_name);
}
- Status init() override {
+ Status init(bool need_partitioner) override {
_name = "LOCAL_EXCHANGE_SINK_OPERATOR";
- _partitioner.reset(
- new
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_num_partitions));
- RETURN_IF_ERROR(_partitioner->init(_texprs));
+ _need_partitioner = need_partitioner;
+ if (_need_partitioner) {
+ _partitioner.reset(
+ new
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_num_partitions));
+ RETURN_IF_ERROR(_partitioner->init(_texprs));
+ }
+
return Status::OK();
}
Status prepare(RuntimeState* state) override {
- RETURN_IF_ERROR(_partitioner->prepare(state, _child_x->row_desc()));
+ if (_need_partitioner) {
+ RETURN_IF_ERROR(_partitioner->prepare(state,
_child_x->row_desc()));
+ }
+
return Status::OK();
}
Status open(RuntimeState* state) override {
- RETURN_IF_ERROR(_partitioner->open(state));
+ if (_need_partitioner) {
+ RETURN_IF_ERROR(_partitioner->open(state));
+ }
+
return Status::OK();
}
@@ -105,6 +122,7 @@ public:
private:
friend class LocalExchangeSinkLocalState;
+ bool _need_partitioner;
const int _num_partitions;
const std::vector<TExpr>& _texprs;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index 83dac5eb8f4..dd64852891f 100644
---
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -17,8 +17,21 @@
#include "pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h"
+#include "pipeline/pipeline_x/local_exchange/local_exchanger.h"
+
namespace doris::pipeline {
+void LocalExchangeSourceDependency::block() {
+ if
(((LocalExchangeSharedState*)_shared_state.get())->exchanger->running_sink_operators
== 0) {
+ return;
+ }
+ std::unique_lock<std::mutex>
lc(((LocalExchangeSharedState*)_shared_state.get())->le_lock);
+ if
(((LocalExchangeSharedState*)_shared_state.get())->exchanger->running_sink_operators
== 0) {
+ return;
+ }
+ Dependency::block();
+}
+
Status LocalExchangeSourceLocalState::init(RuntimeState* state,
LocalStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
SCOPED_TIMER(exec_time_counter());
@@ -28,9 +41,14 @@ Status LocalExchangeSourceLocalState::init(RuntimeState*
state, LocalStateInfo&
DCHECK(_shared_state != nullptr);
_channel_id = info.task_idx;
_shared_state->set_dep_by_channel_id(_dependency, _channel_id);
+ _exchanger = _shared_state->exchanger.get();
+ DCHECK(_exchanger != nullptr);
_get_block_failed_counter =
ADD_COUNTER_WITH_LEVEL(profile(), "GetBlockFailedTime",
TUnit::UNIT, 1);
- _copy_data_timer = ADD_TIMER(profile(), "CopyDataTime");
+ if (_exchanger->get_type() == ExchangeType::SHUFFLE) {
+ _copy_data_timer = ADD_TIMER(profile(), "CopyDataTime");
+ }
+
return Status::OK();
}
@@ -38,42 +56,7 @@ Status LocalExchangeSourceOperatorX::get_block(RuntimeState*
state, vectorized::
SourceState& source_state) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
- PartitionedBlock partitioned_block;
- std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr;
-
- auto get_data = [&](vectorized::Block* result_block) {
- do {
- const auto* offset_start = &((
-
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
- mutable_block->add_rows(partitioned_block.first.get(),
offset_start,
- offset_start +
std::get<2>(partitioned_block.second));
- } while (mutable_block->rows() < state->batch_size() &&
-
local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
- partitioned_block));
- *result_block = mutable_block->to_block();
- };
- if (local_state._shared_state->running_sink_operators == 0) {
- if
(local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
- partitioned_block)) {
- SCOPED_TIMER(local_state._copy_data_timer);
- mutable_block =
-
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
- get_data(block);
- } else {
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
- source_state = SourceState::FINISHED;
- }
- } else if
(local_state._shared_state->data_queue[local_state._channel_id].try_dequeue(
- partitioned_block)) {
- SCOPED_TIMER(local_state._copy_data_timer);
- mutable_block =
-
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
- get_data(block);
- } else {
- local_state._dependency->block();
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
- }
-
+ RETURN_IF_ERROR(local_state._exchanger->get_block(state, block,
source_state, local_state));
local_state.reached_limit(block, source_state);
return Status::OK();
}
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
index 3ccc38854f5..d94b9041fce 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
@@ -29,18 +29,12 @@ public:
: Dependency(id, node_id, "LocalExchangeSourceDependency",
query_ctx) {}
~LocalExchangeSourceDependency() override = default;
- void block() override {
- if
(((LocalExchangeSharedState*)_shared_state.get())->running_sink_operators == 0)
{
- return;
- }
- std::unique_lock<std::mutex>
lc(((LocalExchangeSharedState*)_shared_state.get())->le_lock);
- if
(((LocalExchangeSharedState*)_shared_state.get())->running_sink_operators == 0)
{
- return;
- }
- Dependency::block();
- }
+ void block() override;
};
+class Exchanger;
+class ShuffleExchanger;
+class PassthroughExchanger;
class LocalExchangeSourceOperatorX;
class LocalExchangeSourceLocalState final
: public PipelineXLocalState<LocalExchangeSourceDependency> {
@@ -54,7 +48,10 @@ public:
private:
friend class LocalExchangeSourceOperatorX;
+ friend class ShuffleExchanger;
+ friend class PassthroughExchanger;
+ Exchanger* _exchanger = nullptr;
int _channel_id;
RuntimeProfile::Counter* _get_block_failed_counter = nullptr;
RuntimeProfile::Counter* _copy_data_timer = nullptr;
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
new file mode 100644
index 00000000000..616b469a99f
--- /dev/null
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -0,0 +1,147 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/pipeline_x/local_exchange/local_exchanger.h"
+
+#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h"
+#include "pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h"
+
+namespace doris::pipeline {
+
+Status ShuffleExchanger::sink(RuntimeState* state, vectorized::Block* in_block,
+ SourceState source_state,
LocalExchangeSinkLocalState& local_state) {
+ {
+ SCOPED_TIMER(local_state._compute_hash_value_timer);
+ RETURN_IF_ERROR(local_state._partitioner->do_partitioning(state,
in_block,
+
local_state.mem_tracker()));
+ }
+ {
+ SCOPED_TIMER(local_state._distribute_timer);
+ RETURN_IF_ERROR(_split_rows(state,
+ (const
uint32_t*)local_state._partitioner->get_channel_ids(),
+ in_block, source_state, local_state));
+ }
+
+ return Status::OK();
+}
+
+Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block*
block,
+ SourceState& source_state,
+ LocalExchangeSourceLocalState& local_state)
{
+ PartitionedBlock partitioned_block;
+ std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr;
+
+ auto get_data = [&](vectorized::Block* result_block) {
+ do {
+ const auto* offset_start = &((
+
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
+ mutable_block->add_rows(partitioned_block.first.get(),
offset_start,
+ offset_start +
std::get<2>(partitioned_block.second));
+ } while (mutable_block->rows() < state->batch_size() &&
+
_data_queue[local_state._channel_id].try_dequeue(partitioned_block));
+ *result_block = mutable_block->to_block();
+ };
+ if (running_sink_operators == 0) {
+ if
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
+ SCOPED_TIMER(local_state._copy_data_timer);
+ mutable_block =
+
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
+ get_data(block);
+ } else {
+ COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
+ source_state = SourceState::FINISHED;
+ }
+ } else if
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
+ SCOPED_TIMER(local_state._copy_data_timer);
+ mutable_block =
+
vectorized::MutableBlock::create_unique(partitioned_block.first->clone_empty());
+ get_data(block);
+ } else {
+ COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
+ local_state._dependency->block();
+ }
+ return Status::OK();
+}
+
+Status ShuffleExchanger::_split_rows(RuntimeState* state, const uint32_t*
__restrict channel_ids,
+ vectorized::Block* block, SourceState
source_state,
+ LocalExchangeSinkLocalState& local_state)
{
+ auto& data_queue = _data_queue;
+ const auto rows = block->rows();
+ auto row_idx = std::make_shared<std::vector<int>>(rows);
+ {
+ local_state._partition_rows_histogram.assign(_num_instances + 1, 0);
+ for (size_t i = 0; i < rows; ++i) {
+ local_state._partition_rows_histogram[channel_ids[i]]++;
+ }
+ for (int32_t i = 1; i <= _num_instances; ++i) {
+ local_state._partition_rows_histogram[i] +=
+ local_state._partition_rows_histogram[i - 1];
+ }
+
+ for (int32_t i = rows - 1; i >= 0; --i) {
+ (*row_idx)[local_state._partition_rows_histogram[channel_ids[i]] -
1] = i;
+ local_state._partition_rows_histogram[channel_ids[i]]--;
+ }
+ }
+ auto new_block = vectorized::Block::create_shared(block->clone_empty());
+ new_block->swap(*block);
+ for (size_t i = 0; i < _num_instances; i++) {
+ size_t start = local_state._partition_rows_histogram[i];
+ size_t size = local_state._partition_rows_histogram[i + 1] - start;
+ if (size > 0) {
+ data_queue[i].enqueue({new_block, {row_idx, start, size}});
+ local_state._shared_state->set_ready_for_read(i);
+ }
+ }
+
+ return Status::OK();
+}
+
+Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block*
in_block,
+ SourceState source_state,
+ LocalExchangeSinkLocalState& local_state) {
+ auto new_block = vectorized::Block::create_unique(in_block->clone_empty());
+ new_block->swap(*in_block);
+ auto channel_id = (local_state._channel_id++) % _num_instances;
+ _data_queue[channel_id].enqueue(std::move(new_block));
+ local_state._shared_state->set_ready_for_read(channel_id);
+
+ return Status::OK();
+}
+
+Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block*
block,
+ SourceState& source_state,
+ LocalExchangeSourceLocalState&
local_state) {
+ std::unique_ptr<vectorized::Block> next_block;
+ if (running_sink_operators == 0) {
+ if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+ *block = *next_block.release();
+ } else {
+ COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
+ source_state = SourceState::FINISHED;
+ }
+ } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+ *block = *next_block.release();
+ } else {
+ COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
+ local_state._dependency->block();
+ }
+ return Status::OK();
+}
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
new file mode 100644
index 00000000000..13e3fe931e7
--- /dev/null
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "pipeline/pipeline_x/dependency.h"
+#include "pipeline/pipeline_x/operator.h"
+
+namespace doris::pipeline {
+
+enum class ExchangeType : uint8_t {
+ SHUFFLE = 0,
+ PASSTHROUGH = 1,
+};
+
+class LocalExchangeSourceLocalState;
+class LocalExchangeSinkLocalState;
+
+class Exchanger {
+public:
+ Exchanger(int num_instances)
+ : running_sink_operators(num_instances),
_num_instances(num_instances) {}
+ virtual ~Exchanger() = default;
+ virtual Status get_block(RuntimeState* state, vectorized::Block* block,
+ SourceState& source_state,
+ LocalExchangeSourceLocalState& local_state) = 0;
+ virtual Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state,
+ LocalExchangeSinkLocalState& local_state) = 0;
+ virtual ExchangeType get_type() const = 0;
+
+ std::atomic<int> running_sink_operators = 0;
+
+protected:
+ const int _num_instances;
+};
+
+class LocalExchangeSourceLocalState;
+class LocalExchangeSinkLocalState;
+
+class ShuffleExchanger final : public Exchanger {
+ using PartitionedBlock =
+ std::pair<std::shared_ptr<vectorized::Block>,
+ std::tuple<std::shared_ptr<std::vector<int>>, size_t,
size_t>>;
+
+public:
+ ENABLE_FACTORY_CREATOR(ShuffleExchanger);
+ ShuffleExchanger(int num_instances) : Exchanger(num_instances) {
+ _data_queue.resize(num_instances);
+ }
+ ~ShuffleExchanger() override = default;
+ Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState
source_state,
+ LocalExchangeSinkLocalState& local_state) override;
+
+ Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state,
+ LocalExchangeSourceLocalState& local_state) override;
+ ExchangeType get_type() const override { return ExchangeType::SHUFFLE; }
+
+private:
+ Status _split_rows(RuntimeState* state, const uint32_t* __restrict
channel_ids,
+ vectorized::Block* block, SourceState source_state,
+ LocalExchangeSinkLocalState& local_state);
+
+ std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> _data_queue;
+};
+
+class PassthroughExchanger final : public Exchanger {
+public:
+ ENABLE_FACTORY_CREATOR(PassthroughExchanger);
+ PassthroughExchanger(int num_instances) : Exchanger(num_instances) {
+ _data_queue.resize(num_instances);
+ }
+ ~PassthroughExchanger() override = default;
+ Status sink(RuntimeState* state, vectorized::Block* in_block, SourceState
source_state,
+ LocalExchangeSinkLocalState& local_state) override;
+
+ Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state,
+ LocalExchangeSourceLocalState& local_state) override;
+ ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH;
}
+
+private:
+
std::vector<moodycamel::ConcurrentQueue<std::unique_ptr<vectorized::Block>>>
_data_queue;
+};
+
+} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/operator.h
b/be/src/pipeline/pipeline_x/operator.h
index 5fa6785435b..294eb962ee1 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -179,6 +179,8 @@ public:
[[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) {
return false; }
+ [[nodiscard]] virtual bool need_to_local_shuffle() const { return true; }
+
bool can_read() override {
LOG(FATAL) << "should not reach here!";
return false;
@@ -423,7 +425,7 @@ public:
virtual Status init(const TPlanNode& tnode, RuntimeState* state);
Status init(const TDataSink& tsink) override;
- virtual Status init() {
+ virtual Status init(bool need_partitioner) {
return Status::InternalError("init() is only implemented in local
exchange!");
}
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index d49e290c044..636d4e235d7 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -231,6 +231,8 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
static_cast<void>(root_pipeline->set_sink(_sink));
+ // RETURN_IF_ERROR(_plan_local_shuffle());
+
// 4. Initialize global states in pipelines.
for (PipelinePtr& pipeline : _pipelines) {
DCHECK(pipeline->sink_x() != nullptr) <<
pipeline->operator_xs().size();
@@ -247,6 +249,17 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
return Status::OK();
}
+Status PipelineXFragmentContext::_plan_local_shuffle() {
+ for (int pip_idx = _pipelines.size() - 1; pip_idx >= 0; pip_idx--) {
+ auto& children = _pipelines[pip_idx]->children();
+ if (children.empty()) {
+ _pipelines[pip_idx]->init_need_to_local_shuffle_by_source();
+ } else {
+ }
+ }
+ return Status::OK();
+}
+
Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink,
const std::vector<TExpr>&
output_exprs,
const
TPipelineFragmentParams& params,
@@ -595,7 +608,8 @@ Status
PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool,
Status PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool,
OperatorXPtr& op,
PipelinePtr& cur_pipe,
const TPlanNode& tnode,
- const std::vector<TExpr>&
texprs) {
+ const std::vector<TExpr>&
texprs,
+ ExchangeType
exchange_type) {
if (!_runtime_state->enable_local_shuffle() || _num_instances <= 1) {
return Status::OK();
}
@@ -617,12 +631,23 @@ Status
PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, OperatorX
sink.reset(new LocalExchangeSinkOperatorX(next_sink_operator_id(),
local_exchange_id,
_num_instances, texprs));
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
- RETURN_IF_ERROR(cur_pipe->sink_x()->init());
+ bool need_partitioner = false;
auto shared_state = LocalExchangeSharedState::create_shared();
- shared_state->data_queue.resize(_num_instances);
shared_state->source_dependencies.resize(_num_instances, nullptr);
- shared_state->running_sink_operators = _num_instances;
+ switch (exchange_type) {
+ case ExchangeType::SHUFFLE:
+ shared_state->exchanger =
ShuffleExchanger::create_unique(_num_instances);
+ need_partitioner = true;
+ break;
+ case ExchangeType::PASSTHROUGH:
+ shared_state->exchanger =
PassthroughExchanger::create_unique(_num_instances);
+ break;
+ default:
+ return Status::InternalError("Unsupported local exchange type : " +
+ std::to_string((int)exchange_type));
+ }
+ RETURN_IF_ERROR(cur_pipe->sink_x()->init(need_partitioner));
_op_id_to_le_state.insert({local_exchange_id, shared_state});
return Status::OK();
}
@@ -687,6 +712,10 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode,
_runtime_state.get()));
+
+ // RETURN_IF_ERROR(_add_local_exchange(pool, op,
cur_pipe, tnode,
+ //
tnode.agg_node.grouping_exprs,
+ //
ExchangeType::PASSTHROUGH));
} else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
tnode.agg_node.use_streaming_preaggregation) {
op.reset(new StreamingAggSourceOperatorX(pool, tnode,
next_operator_id(), descs));
@@ -703,6 +732,10 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode,
_runtime_state.get()));
+
+ // RETURN_IF_ERROR(_add_local_exchange(pool, op,
cur_pipe, tnode,
+ //
tnode.agg_node.grouping_exprs,
+ //
ExchangeType::PASSTHROUGH));
} else {
op.reset(new AggSourceOperatorX(pool, tnode, next_operator_id(),
descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
@@ -720,10 +753,19 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
RETURN_IF_ERROR(cur_pipe->set_sink(sink));
RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode,
_runtime_state.get()));
- if (!tnode.agg_node.need_finalize) {
- RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode,
-
tnode.agg_node.grouping_exprs));
- }
+ // if (tnode.agg_node.grouping_exprs.empty()) {
+ // if (tnode.agg_node.need_finalize) {
+ // RETURN_IF_ERROR(_add_local_exchange(pool,
op, cur_pipe, tnode,
+ //
tnode.agg_node.grouping_exprs,
+ //
ExchangeType::PASSTHROUGH));
+ // } else {
+ // // TODO(gabriel): maybe use local shuffle
+ // }
+ // } else if (cur_pipe->need_to_local_shuffle()) {
+ // RETURN_IF_ERROR(_add_local_exchange(pool, op,
cur_pipe, tnode,
+ //
tnode.agg_node.grouping_exprs,
+ //
ExchangeType::SHUFFLE));
+ // }
}
break;
}
@@ -750,7 +792,14 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
for (const auto& eq_join_conjunct : eq_join_conjuncts) {
probe_exprs.push_back(eq_join_conjunct.left);
}
- RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode,
probe_exprs));
+ if (tnode.hash_join_node.__isset.is_broadcast_join &&
+ tnode.hash_join_node.is_broadcast_join) {
+ RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode,
probe_exprs,
+ ExchangeType::PASSTHROUGH));
+ } else if (cur_pipe->need_to_local_shuffle()) {
+ RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode,
probe_exprs,
+ ExchangeType::SHUFFLE));
+ }
_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
break;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 7f47052296e..1390f2a9544 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -35,6 +35,7 @@
#include "pipeline/pipeline.h"
#include "pipeline/pipeline_fragment_context.h"
#include "pipeline/pipeline_task.h"
+#include "pipeline/pipeline_x/local_exchange/local_exchanger.h"
#include "pipeline/pipeline_x/pipeline_x_task.h"
#include "runtime/query_context.h"
#include "runtime/runtime_state.h"
@@ -125,7 +126,8 @@ private:
void _close_fragment_instance() override;
Status _build_pipeline_tasks(const doris::TPipelineFragmentParams&
request) override;
Status _add_local_exchange(ObjectPool* pool, OperatorXPtr& op,
PipelinePtr& cur_pipe,
- const TPlanNode& tnode, const
std::vector<TExpr>& texprs);
+ const TPlanNode& tnode, const
std::vector<TExpr>& texprs,
+ ExchangeType exchange_type);
[[nodiscard]] Status _build_pipelines(ObjectPool* pool,
const
doris::TPipelineFragmentParams& request,
@@ -151,6 +153,7 @@ private:
const TPipelineFragmentParams& params, const
RowDescriptor& row_desc,
RuntimeState* state, DescriptorTbl& desc_tbl,
PipelineId cur_pipeline_id);
+ Status _plan_local_shuffle();
bool _has_inverted_index_or_partial_update(TOlapTableSink sink);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]