This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 62bb1803f78 [fix](local exchange) Fix EOS processing in local
exchanger (#39031)
62bb1803f78 is described below
commit 62bb1803f785c3ac2f9dd7c3835adf0e3eca60f5
Author: Gabriel <[email protected]>
AuthorDate: Fri Aug 9 15:19:50 2024 +0800
[fix](local exchange) Fix EOS processing in local exchanger (#39031)
---
be/src/pipeline/dependency.cpp | 6 +-
be/src/pipeline/dependency.h | 85 ++++--
be/src/pipeline/exec/operator.cpp | 5 +-
.../local_exchange_sink_operator.cpp | 5 +-
.../local_exchange_source_operator.cpp | 11 +-
be/src/pipeline/local_exchange/local_exchanger.cpp | 297 +++++++++------------
be/src/pipeline/local_exchange/local_exchanger.h | 122 +++++----
be/src/pipeline/pipeline_fragment_context.cpp | 26 +-
8 files changed, 312 insertions(+), 245 deletions(-)
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 5e1ce79a1eb..560efec94e1 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -188,10 +188,12 @@ void
LocalExchangeSharedState::sub_running_sink_operators() {
}
}
-void LocalExchangeSharedState::sub_running_source_operators() {
+void LocalExchangeSharedState::sub_running_source_operators(
+ LocalExchangeSourceLocalState& local_state) {
std::unique_lock<std::mutex> lc(le_lock);
if (exchanger->_running_source_operators.fetch_sub(1) == 1) {
_set_always_ready();
+ exchanger->finalize(local_state);
}
}
@@ -397,4 +399,6 @@ Status
AggSharedState::_destroy_agg_status(vectorized::AggregateDataPtr data) {
return Status::OK();
}
+LocalExchangeSharedState::~LocalExchangeSharedState() = default;
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index ee1afaaf55e..36f06b91095 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -49,7 +49,7 @@ class Dependency;
class PipelineTask;
struct BasicSharedState;
using DependencySPtr = std::shared_ptr<Dependency>;
-using DependencyMap = std::map<int, std::vector<DependencySPtr>>;
+class LocalExchangeSourceLocalState;
static constexpr auto SLOW_DEPENDENCY_THRESHOLD = 60 * 1000L * 1000L * 1000L;
static constexpr auto TIME_UNIT_DEPENDENCY_LOG = 30 * 1000L * 1000L * 1000L;
@@ -811,20 +811,21 @@ struct LocalExchangeSharedState : public BasicSharedState
{
public:
ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
LocalExchangeSharedState(int num_instances);
+ ~LocalExchangeSharedState() override;
std::unique_ptr<ExchangerBase> exchanger {};
std::vector<MemTracker*> mem_trackers;
std::atomic<int64_t> mem_usage = 0;
// We need to make sure to add mem_usage first and then enqueue, otherwise
sub mem_usage may cause negative mem_usage during concurrent dequeue.
std::mutex le_lock;
- void create_source_dependencies(int operator_id, int node_id) {
+ virtual void create_dependencies(int operator_id, int node_id) {
for (auto& source_dep : source_deps) {
source_dep = std::make_shared<Dependency>(operator_id, node_id,
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
source_dep->set_shared_state(this);
}
- };
+ }
void sub_running_sink_operators();
- void sub_running_source_operators();
+ void sub_running_source_operators(LocalExchangeSourceLocalState&
local_state);
void _set_always_ready() {
for (auto& dep : source_deps) {
DCHECK(dep);
@@ -836,7 +837,10 @@ public:
}
}
- Dependency* get_dep_by_channel_id(int channel_id) { return
source_deps[channel_id].get(); }
+ virtual std::vector<DependencySPtr> get_dep_by_channel_id(int channel_id) {
+ return {source_deps[channel_id]};
+ }
+ virtual Dependency* get_sink_dep_by_channel_id(int channel_id) { return
nullptr; }
void set_ready_to_read(int channel_id) {
auto& dep = source_deps[channel_id];
@@ -847,28 +851,79 @@ public:
void add_mem_usage(int channel_id, size_t delta, bool
update_total_mem_usage = true) {
mem_trackers[channel_id]->consume(delta);
if (update_total_mem_usage) {
- add_total_mem_usage(delta);
+ add_total_mem_usage(delta, channel_id);
}
}
- void sub_mem_usage(int channel_id, size_t delta, bool
update_total_mem_usage = true) {
- mem_trackers[channel_id]->release(delta);
- if (update_total_mem_usage) {
- sub_total_mem_usage(delta);
- }
- }
+ void sub_mem_usage(int channel_id, size_t delta) {
mem_trackers[channel_id]->release(delta); }
- void add_total_mem_usage(size_t delta) {
+ virtual void add_total_mem_usage(size_t delta, int channel_id = 0) {
if (mem_usage.fetch_add(delta) + delta >
config::local_exchange_buffer_mem_limit) {
sink_deps.front()->block();
}
}
- void sub_total_mem_usage(size_t delta) {
- if (mem_usage.fetch_sub(delta) - delta <=
config::local_exchange_buffer_mem_limit) {
+ virtual void sub_total_mem_usage(size_t delta, int channel_id = 0) {
+ auto prev_usage = mem_usage.fetch_sub(delta);
+ DCHECK_GE(prev_usage - delta, 0) << "prev_usage: " << prev_usage << "
delta: " << delta
+ << " channel_id: " << channel_id;
+ if (prev_usage - delta <= config::local_exchange_buffer_mem_limit) {
sink_deps.front()->set_ready();
}
}
};
+struct LocalMergeExchangeSharedState : public LocalExchangeSharedState {
+ LocalMergeExchangeSharedState(int num_instances)
+ : LocalExchangeSharedState(num_instances),
+ _queues_mem_usage(num_instances),
+ _each_queue_limit(config::local_exchange_buffer_mem_limit /
num_instances) {
+ for (size_t i = 0; i < num_instances; i++) {
+ _queues_mem_usage[i] = 0;
+ }
+ }
+
+ void create_dependencies(int operator_id, int node_id) override {
+ sink_deps.resize(source_deps.size());
+ for (size_t i = 0; i < source_deps.size(); i++) {
+ source_deps[i] = std::make_shared<Dependency>(operator_id, node_id,
+
"LOCAL_EXCHANGE_OPERATOR_DEPENDENCY");
+ source_deps[i]->set_shared_state(this);
+ sink_deps[i] = std::make_shared<Dependency>(
+ operator_id, node_id,
"LOCAL_EXCHANGE_OPERATOR_SINK_DEPENDENCY", true);
+ sink_deps[i]->set_shared_state(this);
+ }
+ }
+
+ void sub_total_mem_usage(size_t delta, int channel_id) override {
+ auto prev_usage = _queues_mem_usage[channel_id].fetch_sub(delta);
+ DCHECK_GE(prev_usage - delta, 0) << "prev_usage: " << prev_usage << "
delta: " << delta
+ << " channel_id: " << channel_id;
+ if (prev_usage - delta <= _each_queue_limit) {
+ sink_deps[channel_id]->set_ready();
+ }
+ if (_queues_mem_usage[channel_id] == 0) {
+ source_deps[channel_id]->block();
+ }
+ }
+ void add_total_mem_usage(size_t delta, int channel_id) override {
+ if (_queues_mem_usage[channel_id].fetch_add(delta) + delta >
_each_queue_limit) {
+ sink_deps[channel_id]->block();
+ }
+ source_deps[channel_id]->set_ready();
+ }
+
+ Dependency* get_sink_dep_by_channel_id(int channel_id) override {
+ return sink_deps[channel_id].get();
+ }
+
+ std::vector<DependencySPtr> get_dep_by_channel_id(int channel_id) override
{
+ return source_deps;
+ }
+
+private:
+ std::vector<std::atomic_int64_t> _queues_mem_usage;
+ const int64_t _each_queue_limit;
+};
+
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index 07e0c3cf640..1e00b9fcbcb 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -452,7 +452,10 @@ Status
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
DCHECK(info.le_state_map.find(_parent->operator_id()) !=
info.le_state_map.end());
_shared_state =
info.le_state_map.at(_parent->operator_id()).first.get();
- _dependency = _shared_state->get_dep_by_channel_id(info.task_idx);
+ auto deps = _shared_state->get_dep_by_channel_id(info.task_idx);
+ if (deps.size() == 1) {
+ _dependency = deps.front().get();
+ }
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_runtime_profile, "WaitForDependency[" +
_dependency->name() + "]Time", 1);
} else if (info.shared_state) {
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index ba51a2da39b..97acd2a8070 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -27,8 +27,9 @@ LocalExchangeSinkLocalState::~LocalExchangeSinkLocalState() =
default;
std::vector<Dependency*> LocalExchangeSinkLocalState::dependencies() const {
auto deps = Base::dependencies();
- auto exchanger_deps = _exchanger->local_sink_state_dependency(_channel_id);
- for (auto* dep : exchanger_deps) {
+
+ auto dep = _shared_state->get_sink_dep_by_channel_id(_channel_id);
+ if (dep != nullptr) {
deps.push_back(dep);
}
return deps;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
index 6b0cca2d71a..32e93fbc5b2 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
@@ -56,7 +56,7 @@ Status LocalExchangeSourceLocalState::close(RuntimeState*
state) {
_exchanger->close(*this);
}
if (_shared_state) {
- _shared_state->sub_running_source_operators();
+ _shared_state->sub_running_source_operators(*this);
}
return Base::close(state);
@@ -64,9 +64,12 @@ Status LocalExchangeSourceLocalState::close(RuntimeState*
state) {
std::vector<Dependency*> LocalExchangeSourceLocalState::dependencies() const {
auto deps = Base::dependencies();
- auto exchanger_deps = _exchanger->local_state_dependency(_channel_id);
- for (auto* dep : exchanger_deps) {
- deps.push_back(dep);
+ auto le_deps = _shared_state->get_dep_by_channel_id(_channel_id);
+ if (le_deps.size() > 1) {
+ // If this is a local merge exchange, we should use all dependencies
here.
+ for (auto& dep : le_deps) {
+ deps.push_back(dep.get());
+ }
}
return deps;
}
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index 647ddcfba2d..e256419688e 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -27,28 +27,74 @@
namespace doris::pipeline {
template <typename BlockType>
-bool Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id,
+void Exchanger<BlockType>::_enqueue_data_and_set_ready(int channel_id,
LocalExchangeSinkLocalState& local_state,
BlockType&& block) {
+ size_t allocated_bytes = 0;
+ // PartitionedBlock is used by shuffle exchanger.
+ // PartitionedBlock will be push into multiple queues with different row
ranges, so it will be
+ // referenced multiple times. Otherwise, we only ref the block once
because it is only push into
+ // one queue.
+ if constexpr (std::is_same_v<PartitionedBlock, BlockType>) {
+ allocated_bytes = block.first->data_block.allocated_bytes();
+ } else {
+ block->ref(1);
+ allocated_bytes = block->data_block.allocated_bytes();
+ }
std::unique_lock l(_m);
+ local_state._shared_state->add_mem_usage(channel_id, allocated_bytes,
+ !std::is_same_v<PartitionedBlock,
BlockType>);
if (_data_queue[channel_id].enqueue(std::move(block))) {
local_state._shared_state->set_ready_to_read(channel_id);
- return true;
+ } else {
+ local_state._shared_state->sub_mem_usage(channel_id, allocated_bytes);
+ // `enqueue(block)` return false iff this queue's source operator is
already closed so we
+ // just unref the block.
+ if constexpr (std::is_same_v<PartitionedBlock, BlockType>) {
+ block.first->unref(local_state._shared_state, allocated_bytes);
+ } else {
+ block->unref(local_state._shared_state, allocated_bytes);
+ }
}
- return false;
}
template <typename BlockType>
bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState&
local_state,
- BlockType& block, bool* eos) {
+ BlockType& block, bool* eos,
+ vectorized::Block* data_block) {
+ return _dequeue_data(local_state, block, eos, data_block,
local_state._channel_id);
+}
+
+template <typename BlockType>
+bool Exchanger<BlockType>::_dequeue_data(LocalExchangeSourceLocalState&
local_state,
+ BlockType& block, bool* eos,
vectorized::Block* data_block,
+ int channel_id) {
bool all_finished = _running_sink_operators == 0;
- if (_data_queue[local_state._channel_id].try_dequeue(block)) {
+ if (_data_queue[channel_id].try_dequeue(block)) {
+ if constexpr (std::is_same_v<PartitionedBlock, BlockType>) {
+ local_state._shared_state->sub_mem_usage(channel_id,
+
block.first->data_block.allocated_bytes());
+ } else {
+ local_state._shared_state->sub_mem_usage(channel_id,
+
block->data_block.allocated_bytes());
+ data_block->swap(block->data_block);
+ block->unref(local_state._shared_state,
data_block->allocated_bytes());
+ }
return true;
} else if (all_finished) {
*eos = true;
} else {
std::unique_lock l(_m);
- if (_data_queue[local_state._channel_id].try_dequeue(block)) {
+ if (_data_queue[channel_id].try_dequeue(block)) {
+ if constexpr (std::is_same_v<PartitionedBlock, BlockType>) {
+ local_state._shared_state->sub_mem_usage(channel_id,
+
block.first->data_block.allocated_bytes());
+ } else {
+ local_state._shared_state->sub_mem_usage(channel_id,
+
block->data_block.allocated_bytes());
+ data_block->swap(block->data_block);
+ block->unref(local_state._shared_state,
data_block->allocated_bytes());
+ }
return true;
}
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
@@ -76,12 +122,11 @@ Status ShuffleExchanger::sink(RuntimeState* state,
vectorized::Block* in_block,
void ShuffleExchanger::close(LocalExchangeSourceLocalState& local_state) {
PartitionedBlock partitioned_block;
+ bool eos;
+ vectorized::Block block;
_data_queue[local_state._channel_id].set_eos();
- while
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
- auto block_wrapper = partitioned_block.first;
- local_state._shared_state->sub_mem_usage(
- local_state._channel_id,
block_wrapper->data_block.allocated_bytes(), false);
- block_wrapper->unref(local_state._shared_state);
+ while (_dequeue_data(local_state, partitioned_block, &eos, &block)) {
+ partitioned_block.first->unref(local_state._shared_state);
}
}
@@ -95,17 +140,15 @@ Status ShuffleExchanger::get_block(RuntimeState* state,
vectorized::Block* block
const auto* offset_start =
partitioned_block.second.row_idxs->data() +
partitioned_block.second.offset_start;
auto block_wrapper = partitioned_block.first;
- local_state._shared_state->sub_mem_usage(
- local_state._channel_id,
block_wrapper->data_block.allocated_bytes(), false);
RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->data_block,
offset_start,
offset_start +
partitioned_block.second.length));
block_wrapper->unref(local_state._shared_state);
- } while (mutable_block.rows() < state->batch_size() &&
-
_data_queue[local_state._channel_id].try_dequeue(partitioned_block));
+ } while (mutable_block.rows() < state->batch_size() && !*eos &&
+ _dequeue_data(local_state, partitioned_block, eos, block));
return Status::OK();
};
- if (_dequeue_data(local_state, partitioned_block, eos)) {
+ if (_dequeue_data(local_state, partitioned_block, eos, block)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
block, partitioned_block.first->data_block);
@@ -135,11 +178,11 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
}
vectorized::Block data_block;
- std::shared_ptr<ShuffleBlockWrapper> new_block_wrapper;
+ std::shared_ptr<BlockWrapper> new_block_wrapper;
if (_free_blocks.try_dequeue(data_block)) {
- new_block_wrapper =
ShuffleBlockWrapper::create_shared(std::move(data_block));
+ new_block_wrapper = BlockWrapper::create_shared(std::move(data_block));
} else {
- new_block_wrapper =
ShuffleBlockWrapper::create_shared(block->clone_empty());
+ new_block_wrapper = BlockWrapper::create_shared(block->clone_empty());
}
new_block_wrapper->data_block.swap(*block);
@@ -157,15 +200,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
uint32_t start = local_state._partition_rows_histogram[it.first];
uint32_t size = local_state._partition_rows_histogram[it.first +
1] - start;
if (size > 0) {
- local_state._shared_state->add_mem_usage(
- it.second,
new_block_wrapper->data_block.allocated_bytes(), false);
-
- if (!_enqueue_data_and_set_ready(it.second, local_state,
- {new_block_wrapper, {row_idx,
start, size}})) {
- local_state._shared_state->sub_mem_usage(
- it.second,
new_block_wrapper->data_block.allocated_bytes(), false);
- new_block_wrapper->unref(local_state._shared_state);
- }
+ _enqueue_data_and_set_ready(it.second, local_state,
+ {new_block_wrapper, {row_idx,
start, size}});
} else {
new_block_wrapper->unref(local_state._shared_state);
}
@@ -176,15 +212,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
uint32_t start = local_state._partition_rows_histogram[i];
uint32_t size = local_state._partition_rows_histogram[i + 1] -
start;
if (size > 0) {
- local_state._shared_state->add_mem_usage(
- i % _num_sources,
new_block_wrapper->data_block.allocated_bytes(), false);
- if (!_enqueue_data_and_set_ready(i % _num_sources, local_state,
- {new_block_wrapper, {row_idx,
start, size}})) {
- local_state._shared_state->sub_mem_usage(
- i % _num_sources,
new_block_wrapper->data_block.allocated_bytes(),
- false);
- new_block_wrapper->unref(local_state._shared_state);
- }
+ _enqueue_data_and_set_ready(i % _num_sources, local_state,
+ {new_block_wrapper, {row_idx,
start, size}});
} else {
new_block_wrapper->unref(local_state._shared_state);
}
@@ -197,14 +226,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
uint32_t start = local_state._partition_rows_histogram[i];
uint32_t size = local_state._partition_rows_histogram[i + 1] -
start;
if (size > 0) {
- local_state._shared_state->add_mem_usage(
- map[i],
new_block_wrapper->data_block.allocated_bytes(), false);
- if (!_enqueue_data_and_set_ready(map[i], local_state,
- {new_block_wrapper, {row_idx,
start, size}})) {
- local_state._shared_state->sub_mem_usage(
- map[i],
new_block_wrapper->data_block.allocated_bytes(), false);
- new_block_wrapper->unref(local_state._shared_state);
- }
+ _enqueue_data_and_set_ready(map[i], local_state,
+ {new_block_wrapper, {row_idx,
start, size}});
} else {
new_block_wrapper->unref(local_state._shared_state);
}
@@ -217,40 +240,32 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block*
in_block, bool eos,
LocalExchangeSinkLocalState& local_state) {
vectorized::Block new_block;
+ BlockWrapperSPtr wrapper;
if (!_free_blocks.try_dequeue(new_block)) {
new_block = {in_block->clone_empty()};
}
new_block.swap(*in_block);
+ wrapper = BlockWrapper::create_shared(std::move(new_block));
auto channel_id = (local_state._channel_id++) % _num_partitions;
- size_t memory_usage = new_block.allocated_bytes();
- local_state._shared_state->add_mem_usage(channel_id, memory_usage);
- if (!_enqueue_data_and_set_ready(channel_id, local_state,
std::move(new_block))) {
- local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
- }
+ _enqueue_data_and_set_ready(channel_id, local_state, std::move(wrapper));
return Status::OK();
}
void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
+ BlockWrapperSPtr wrapper;
+ bool eos;
_data_queue[local_state._channel_id].set_eos();
- while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
- local_state._shared_state->sub_mem_usage(local_state._channel_id,
- next_block.allocated_bytes());
+ while (_dequeue_data(local_state, wrapper, &eos, &next_block)) {
+ next_block = vectorized::Block();
}
}
Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos,
LocalExchangeSourceLocalState&
local_state) {
- vectorized::Block next_block;
- if (_dequeue_data(local_state, next_block, eos)) {
- block->swap(next_block);
- local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
- if (_free_block_limit == 0 ||
- _free_blocks.size_approx() < _free_block_limit * _num_sources) {
- _free_blocks.enqueue(std::move(next_block));
- }
- }
+ BlockWrapperSPtr next_block;
+ _dequeue_data(local_state, next_block, eos, block);
return Status::OK();
}
@@ -258,7 +273,9 @@ Status PassToOneExchanger::sink(RuntimeState* state,
vectorized::Block* in_block
LocalExchangeSinkLocalState& local_state) {
vectorized::Block new_block(in_block->clone_empty());
new_block.swap(*in_block);
- _enqueue_data_and_set_ready(0, local_state, std::move(new_block));
+
+ BlockWrapperSPtr wrapper =
BlockWrapper::create_shared(std::move(new_block));
+ _enqueue_data_and_set_ready(0, local_state, std::move(wrapper));
return Status::OK();
}
@@ -269,10 +286,8 @@ Status PassToOneExchanger::get_block(RuntimeState* state,
vectorized::Block* blo
*eos = true;
return Status::OK();
}
- vectorized::Block next_block;
- if (_dequeue_data(local_state, next_block, eos)) {
- *block = std::move(next_block);
- }
+ BlockWrapperSPtr next_block;
+ _dequeue_data(local_state, next_block, eos, block);
return Status::OK();
}
@@ -282,21 +297,39 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state,
vectorized::Block* in_
if (!_free_blocks.try_dequeue(new_block)) {
new_block = {in_block->clone_empty()};
}
- new_block.swap(*in_block);
DCHECK_LE(local_state._channel_id, _data_queue.size());
- size_t memory_usage = new_block.allocated_bytes();
- add_mem_usage(local_state, memory_usage);
-
- if (!_enqueue_data_and_set_ready(local_state._channel_id, local_state,
std::move(new_block))) {
- sub_mem_usage(local_state, memory_usage);
- }
+ new_block.swap(*in_block);
+ _enqueue_data_and_set_ready(local_state._channel_id, local_state,
+
BlockWrapper::create_shared(std::move(new_block)));
if (eos) {
- _queue_deps[local_state._channel_id]->set_always_ready();
+
local_state._shared_state->source_deps[local_state._channel_id]->set_always_ready();
}
return Status::OK();
}
+void ExchangerBase::finalize(LocalExchangeSourceLocalState& local_state) {
+ DCHECK(_running_source_operators == 0);
+ vectorized::Block block;
+ while (_free_blocks.try_dequeue(block)) {
+ // do nothing
+ }
+}
+void LocalMergeSortExchanger::finalize(LocalExchangeSourceLocalState&
local_state) {
+ BlockWrapperSPtr next_block;
+ vectorized::Block block;
+ bool eos;
+ int id = 0;
+ for (auto& data_queue : _data_queue) {
+ data_queue.set_eos();
+ while (_dequeue_data(local_state, next_block, &eos, &block, id)) {
+ block = vectorized::Block();
+ }
+ id++;
+ }
+ ExchangerBase::finalize(local_state);
+}
+
Status LocalMergeSortExchanger::build_merger(RuntimeState* state,
LocalExchangeSourceLocalState&
local_state) {
RETURN_IF_ERROR(_sort_source->build_merger(state, _merger,
local_state.profile()));
@@ -304,18 +337,8 @@ Status LocalMergeSortExchanger::build_merger(RuntimeState*
state,
for (int channel_id = 0; channel_id < _num_partitions; channel_id++) {
vectorized::BlockSupplier block_supplier = [&, id =
channel_id](vectorized::Block* block,
bool*
eos) {
- vectorized::Block next_block;
- bool all_finished = _running_sink_operators == 0;
- if (_data_queue[id].try_dequeue(next_block)) {
- block->swap(next_block);
- if (_free_block_limit == 0 ||
- _free_blocks.size_approx() < _free_block_limit *
_num_sources) {
- _free_blocks.enqueue(std::move(next_block));
- }
- sub_mem_usage(local_state, id, block->allocated_bytes());
- } else if (all_finished) {
- *eos = true;
- }
+ BlockWrapperSPtr next_block;
+ _dequeue_data(local_state, next_block, eos, block, id);
return Status::OK();
};
child_block_suppliers.push_back(block_supplier);
@@ -349,63 +372,13 @@ Status LocalMergeSortExchanger::get_block(RuntimeState*
state, vectorized::Block
return Status::OK();
}
-void LocalMergeSortExchanger::sub_mem_usage(LocalExchangeSinkLocalState&
local_state,
- int64_t delta) {
- const auto channel_id = local_state._channel_id;
- local_state._shared_state->mem_trackers[channel_id]->release(delta);
- if (_queues_mem_usege[channel_id].fetch_sub(delta) > _each_queue_limit) {
- _sink_deps[channel_id]->set_ready();
- }
- // if queue empty , block this queue
- if (_queues_mem_usege[channel_id] == 0) {
- _queue_deps[channel_id]->block();
- }
-}
-
-void LocalMergeSortExchanger::add_mem_usage(LocalExchangeSinkLocalState&
local_state,
- int64_t delta) {
- const auto channel_id = local_state._channel_id;
- local_state._shared_state->mem_trackers[channel_id]->consume(delta);
- if (_queues_mem_usege[channel_id].fetch_add(delta) > _each_queue_limit) {
- _sink_deps[channel_id]->block();
- }
- _queue_deps[channel_id]->set_ready();
-}
-
-void LocalMergeSortExchanger::sub_mem_usage(LocalExchangeSourceLocalState&
local_state,
- int channel_id, int64_t delta) {
- local_state._shared_state->mem_trackers[channel_id]->release(delta);
- if (_queues_mem_usege[channel_id].fetch_sub(delta) <= _each_queue_limit) {
- _sink_deps[channel_id]->set_ready();
- }
- // if queue empty , block this queue
- if (_queues_mem_usege[channel_id] == 0) {
- _queue_deps[channel_id]->block();
- }
-}
-
-std::vector<Dependency*>
LocalMergeSortExchanger::local_sink_state_dependency(int channel_id) {
- DCHECK(_sink_deps[channel_id]);
- return {_sink_deps[channel_id].get()};
-}
-
-std::vector<Dependency*> LocalMergeSortExchanger::local_state_dependency(int
channel_id) {
- if (channel_id != 0) {
- return {};
- }
- std::vector<Dependency*> deps;
- for (auto depSptr : _queue_deps) {
- deps.push_back(depSptr.get());
- }
- return deps;
-}
-
Status BroadcastExchanger::sink(RuntimeState* state, vectorized::Block*
in_block, bool eos,
LocalExchangeSinkLocalState& local_state) {
for (size_t i = 0; i < _num_partitions; i++) {
auto mutable_block =
vectorized::MutableBlock::create_unique(in_block->clone_empty());
RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0,
in_block->rows()));
- _enqueue_data_and_set_ready(i, local_state, mutable_block->to_block());
+ _enqueue_data_and_set_ready(i, local_state,
+
BlockWrapper::create_shared(mutable_block->to_block()));
}
return Status::OK();
@@ -413,18 +386,18 @@ Status BroadcastExchanger::sink(RuntimeState* state,
vectorized::Block* in_block
void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
+ bool eos;
+ BlockWrapperSPtr wrapper;
_data_queue[local_state._channel_id].set_eos();
- while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
- // do nothing
+ while (_dequeue_data(local_state, wrapper, &eos, &next_block)) {
+ next_block = vectorized::Block();
}
}
Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos,
LocalExchangeSourceLocalState&
local_state) {
- vectorized::Block next_block;
- if (_dequeue_data(local_state, next_block, eos)) {
- *block = std::move(next_block);
- }
+ BlockWrapperSPtr next_block;
+ _dequeue_data(local_state, next_block, eos, block);
return Status::OK();
}
@@ -437,12 +410,8 @@ Status
AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
}
new_block.swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_partitions;
- size_t memory_usage = new_block.allocated_bytes();
- local_state._shared_state->add_mem_usage(channel_id, memory_usage);
-
- if (!_enqueue_data_and_set_ready(channel_id, local_state,
std::move(new_block))) {
- local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
- }
+ _enqueue_data_and_set_ready(channel_id, local_state,
+
BlockWrapper::create_shared(std::move(new_block)));
return Status::OK();
}
@@ -497,11 +466,8 @@ Status
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
RETURN_IF_ERROR(mutable_block->add_rows(block, start, size));
auto new_block = mutable_block->to_block();
- size_t memory_usage = new_block.allocated_bytes();
- local_state._shared_state->add_mem_usage(i, memory_usage);
- if (!_enqueue_data_and_set_ready(i, local_state,
std::move(new_block))) {
- local_state._shared_state->sub_mem_usage(i, memory_usage);
- }
+ _enqueue_data_and_set_ready(i, local_state,
+
BlockWrapper::create_shared(std::move(new_block)));
}
}
return Status::OK();
@@ -522,16 +488,19 @@ Status AdaptivePassthroughExchanger::sink(RuntimeState*
state, vectorized::Block
Status AdaptivePassthroughExchanger::get_block(RuntimeState* state,
vectorized::Block* block,
bool* eos,
LocalExchangeSourceLocalState&
local_state) {
+ BlockWrapperSPtr next_block;
+ _dequeue_data(local_state, next_block, eos, block);
+ return Status::OK();
+}
+
+void AdaptivePassthroughExchanger::close(LocalExchangeSourceLocalState&
local_state) {
vectorized::Block next_block;
- if (_dequeue_data(local_state, next_block, eos)) {
- block->swap(next_block);
- if (_free_block_limit == 0 ||
- _free_blocks.size_approx() < _free_block_limit * _num_sources) {
- _free_blocks.enqueue(std::move(next_block));
- }
- local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
+ bool eos;
+ BlockWrapperSPtr wrapper;
+ _data_queue[local_state._channel_id].set_eos();
+ while (_dequeue_data(local_state, wrapper, &eos, &next_block)) {
+ next_block = vectorized::Block();
}
- return Status::OK();
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h
b/be/src/pipeline/local_exchange/local_exchanger.h
index afdebd21101..dfb5c31fff8 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -24,9 +24,22 @@ namespace doris::pipeline {
class LocalExchangeSourceLocalState;
class LocalExchangeSinkLocalState;
-struct ShuffleBlockWrapper;
+struct BlockWrapper;
class SortSourceOperatorX;
+/**
+ * One exchanger is hold by one `LocalExchangeSharedState`. And one
`LocalExchangeSharedState` is
+ * shared by all local exchange sink operators and source operators with the
same id.
+ *
+ * In exchanger, two block queues is maintained, one is data block queue and
another is free block queue.
+ *
+ * In details, data block queue has queues as many as source operators. Each
source operator will get
+ * data block from the corresponding queue. Data blocks is push into the queue
by sink operators. One
+ * sink operator will push blocks into one or more queues.
+ *
+ * Free block is used to reuse the allocated memory. To reduce the memory
limit, we also use a conf
+ * to limit the size of free block queue.
+ */
class ExchangerBase {
public:
ExchangerBase(int running_sink_operators, int num_partitions, int
free_block_limit)
@@ -50,16 +63,17 @@ public:
virtual Status sink(RuntimeState* state, vectorized::Block* in_block, bool
eos,
LocalExchangeSinkLocalState& local_state) = 0;
virtual ExchangeType get_type() const = 0;
+ // Called if a local exchanger source operator are closed. Free the unused
data block in data_queue.
virtual void close(LocalExchangeSourceLocalState& local_state) = 0;
-
- virtual std::vector<Dependency*> local_sink_state_dependency(int
channel_id) { return {}; }
- virtual std::vector<Dependency*> local_state_dependency(int channel_id) {
return {}; }
+ // Called if all local exchanger source operators are closed. We free the
memory in
+ // `_free_blocks` here.
+ virtual void finalize(LocalExchangeSourceLocalState& local_state);
virtual std::string data_queue_debug_string(int i) = 0;
protected:
friend struct LocalExchangeSharedState;
- friend struct ShuffleBlockWrapper;
+ friend struct BlockWrapper;
friend class LocalExchangeSourceLocalState;
friend class LocalExchangeSinkOperatorX;
friend class LocalExchangeSinkLocalState;
@@ -78,7 +92,7 @@ struct PartitionedRowIdxs {
uint32_t length;
};
-using PartitionedBlock = std::pair<std::shared_ptr<ShuffleBlockWrapper>,
PartitionedRowIdxs>;
+using PartitionedBlock = std::pair<std::shared_ptr<BlockWrapper>,
PartitionedRowIdxs>;
template <typename BlockType>
struct BlockQueue {
@@ -108,6 +122,8 @@ struct BlockQueue {
void set_eos() { eos = true; }
};
+using BlockWrapperSPtr = std::shared_ptr<BlockWrapper>;
+
template <typename BlockType>
class Exchanger : public ExchangerBase {
public:
@@ -123,9 +139,13 @@ public:
}
protected:
- bool _enqueue_data_and_set_ready(int channel_id,
LocalExchangeSinkLocalState& local_state,
+ // Enqueue data block and set downstream source operator to read.
+ void _enqueue_data_and_set_ready(int channel_id,
LocalExchangeSinkLocalState& local_state,
BlockType&& block);
- bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType&
block, bool* eos);
+ bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType&
block, bool* eos,
+ vectorized::Block* data_block);
+ bool _dequeue_data(LocalExchangeSourceLocalState& local_state, BlockType&
block, bool* eos,
+ vectorized::Block* data_block, int channel_id);
std::vector<BlockQueue<BlockType>> _data_queue;
private:
@@ -135,10 +155,33 @@ private:
class LocalExchangeSourceLocalState;
class LocalExchangeSinkLocalState;
-struct ShuffleBlockWrapper {
- ENABLE_FACTORY_CREATOR(ShuffleBlockWrapper);
- ShuffleBlockWrapper(vectorized::Block&& data_block_) :
data_block(std::move(data_block_)) {}
+/**
+ * `BlockWrapper` is used to wrap a data block with a reference count.
+ *
+ * In function `unref()`, if `ref_count` decremented to 0, which means this
block is not needed by
+ * operators, so we put it into `_free_blocks` to reuse its memory if needed
and refresh memory usage
+ * in current queue.
+ *
+ * Note: `ref_count` will be larger than 1 only if this block is shared
between multiple queues in
+ * shuffle exchanger.
+ */
+struct BlockWrapper {
+ ENABLE_FACTORY_CREATOR(BlockWrapper);
+ BlockWrapper(vectorized::Block&& data_block_) :
data_block(std::move(data_block_)) {}
+ ~BlockWrapper() { DCHECK_EQ(ref_count.load(), 0); }
void ref(int delta) { ref_count += delta; }
+ void unref(LocalExchangeSharedState* shared_state, size_t allocated_bytes)
{
+ if (ref_count.fetch_sub(1) == 1) {
+ shared_state->sub_total_mem_usage(allocated_bytes);
+ if (shared_state->exchanger->_free_block_limit == 0 ||
+ shared_state->exchanger->_free_blocks.size_approx() <
+ shared_state->exchanger->_free_block_limit *
+ shared_state->exchanger->_num_sources) {
+ data_block.clear_column_data();
+
shared_state->exchanger->_free_blocks.enqueue(std::move(data_block));
+ }
+ }
+ }
void unref(LocalExchangeSharedState* shared_state) {
if (ref_count.fetch_sub(1) == 1) {
shared_state->sub_total_mem_usage(data_block.allocated_bytes());
@@ -197,12 +240,12 @@ class BucketShuffleExchanger final : public
ShuffleExchanger {
ExchangeType get_type() const override { return
ExchangeType::BUCKET_HASH_SHUFFLE; }
};
-class PassthroughExchanger final : public Exchanger<vectorized::Block> {
+class PassthroughExchanger final : public Exchanger<BlockWrapperSPtr> {
public:
ENABLE_FACTORY_CREATOR(PassthroughExchanger);
PassthroughExchanger(int running_sink_operators, int num_partitions, int
free_block_limit)
- : Exchanger<vectorized::Block>(running_sink_operators,
num_partitions,
- free_block_limit) {
+ : Exchanger<BlockWrapperSPtr>(running_sink_operators,
num_partitions,
+ free_block_limit) {
_data_queue.resize(num_partitions);
}
~PassthroughExchanger() override = default;
@@ -215,12 +258,12 @@ public:
void close(LocalExchangeSourceLocalState& local_state) override;
};
-class PassToOneExchanger final : public Exchanger<vectorized::Block> {
+class PassToOneExchanger final : public Exchanger<BlockWrapperSPtr> {
public:
ENABLE_FACTORY_CREATOR(PassToOneExchanger);
PassToOneExchanger(int running_sink_operators, int num_partitions, int
free_block_limit)
- : Exchanger<vectorized::Block>(running_sink_operators,
num_partitions,
- free_block_limit) {
+ : Exchanger<BlockWrapperSPtr>(running_sink_operators,
num_partitions,
+ free_block_limit) {
_data_queue.resize(num_partitions);
}
~PassToOneExchanger() override = default;
@@ -233,25 +276,14 @@ public:
void close(LocalExchangeSourceLocalState& local_state) override {}
};
-class LocalMergeSortExchanger final : public Exchanger<vectorized::Block> {
+class LocalMergeSortExchanger final : public Exchanger<BlockWrapperSPtr> {
public:
ENABLE_FACTORY_CREATOR(LocalMergeSortExchanger);
LocalMergeSortExchanger(std::shared_ptr<SortSourceOperatorX> sort_source,
int running_sink_operators, int num_partitions,
int free_block_limit)
- : Exchanger<vectorized::Block>(running_sink_operators,
num_partitions,
- free_block_limit),
- _sort_source(std::move(sort_source)),
- _queues_mem_usege(num_partitions),
- _each_queue_limit(config::local_exchange_buffer_mem_limit /
num_partitions) {
+ : Exchanger<BlockWrapperSPtr>(running_sink_operators,
num_partitions, free_block_limit),
+ _sort_source(std::move(sort_source)) {
_data_queue.resize(num_partitions);
- for (size_t i = 0; i < num_partitions; i++) {
- _queues_mem_usege[i] = 0;
- _sink_deps.push_back(
- std::make_shared<Dependency>(0, 0,
"LOCAL_MERGE_SORT_SINK_DEPENDENCY", true));
- _queue_deps.push_back(
- std::make_shared<Dependency>(0, 0,
"LOCAL_MERGE_SORT_QUEUE_DEPENDENCY"));
- _queue_deps.back()->block();
- }
}
~LocalMergeSortExchanger() override = default;
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
@@ -263,33 +295,21 @@ public:
Status build_merger(RuntimeState* statem, LocalExchangeSourceLocalState&
local_state);
- std::vector<Dependency*> local_sink_state_dependency(int channel_id)
override;
-
- std::vector<Dependency*> local_state_dependency(int channel_id) override;
-
- void add_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t
delta);
- void sub_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t
delta);
- void sub_mem_usage(LocalExchangeSourceLocalState& local_state, int
channel_id, int64_t delta);
void close(LocalExchangeSourceLocalState& local_state) override {}
+ void finalize(LocalExchangeSourceLocalState& local_state) override;
private:
- // only channel_id = 0 , build _merger and use it
-
std::unique_ptr<vectorized::VSortedRunMerger> _merger;
std::shared_ptr<SortSourceOperatorX> _sort_source;
- std::vector<DependencySPtr> _sink_deps;
std::vector<std::atomic_int64_t> _queues_mem_usege;
- // if cur queue is empty, block this queue
- std::vector<DependencySPtr> _queue_deps;
- const int64_t _each_queue_limit;
};
-class BroadcastExchanger final : public Exchanger<vectorized::Block> {
+class BroadcastExchanger final : public Exchanger<BlockWrapperSPtr> {
public:
ENABLE_FACTORY_CREATOR(BroadcastExchanger);
BroadcastExchanger(int running_sink_operators, int num_partitions, int
free_block_limit)
- : Exchanger<vectorized::Block>(running_sink_operators,
num_partitions,
- free_block_limit) {
+ : Exchanger<BlockWrapperSPtr>(running_sink_operators,
num_partitions,
+ free_block_limit) {
_data_queue.resize(num_partitions);
}
~BroadcastExchanger() override = default;
@@ -304,13 +324,13 @@ public:
//The code in AdaptivePassthroughExchanger is essentially
// a copy of ShuffleExchanger and PassthroughExchanger.
-class AdaptivePassthroughExchanger : public Exchanger<vectorized::Block> {
+class AdaptivePassthroughExchanger : public Exchanger<BlockWrapperSPtr> {
public:
ENABLE_FACTORY_CREATOR(AdaptivePassthroughExchanger);
AdaptivePassthroughExchanger(int running_sink_operators, int
num_partitions,
int free_block_limit)
- : Exchanger<vectorized::Block>(running_sink_operators,
num_partitions,
- free_block_limit) {
+ : Exchanger<BlockWrapperSPtr>(running_sink_operators,
num_partitions,
+ free_block_limit) {
_data_queue.resize(num_partitions);
}
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
@@ -320,7 +340,7 @@ public:
LocalExchangeSourceLocalState& local_state) override;
ExchangeType get_type() const override { return
ExchangeType::ADAPTIVE_PASSTHROUGH; }
- void close(LocalExchangeSourceLocalState& local_state) override {}
+ void close(LocalExchangeSourceLocalState& local_state) override;
private:
Status _passthrough_sink(RuntimeState* state, vectorized::Block* in_block,
bool eos,
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index a2f26ac0a00..6f7a59c0f98 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -697,7 +697,10 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
is_shuffled_hash_join,
shuffle_idx_to_instance_idx));
// 2. Create and initialize LocalExchangeSharedState.
- auto shared_state =
LocalExchangeSharedState::create_shared(_num_instances);
+ std::shared_ptr<LocalExchangeSharedState> shared_state =
+ data_distribution.distribution_type ==
ExchangeType::LOCAL_MERGE_SORT
+ ?
LocalMergeExchangeSharedState::create_shared(_num_instances)
+ : LocalExchangeSharedState::create_shared(_num_instances);
switch (data_distribution.distribution_type) {
case ExchangeType::HASH_SHUFFLE:
shared_state->exchanger = ShuffleExchanger::create_unique(
@@ -730,11 +733,20 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
: 0);
break;
case ExchangeType::PASS_TO_ONE:
- shared_state->exchanger = BroadcastExchanger::create_unique(
- cur_pipe->num_tasks(), _num_instances,
-
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
- ?
_runtime_state->query_options().local_exchange_free_blocks_limit
- : 0);
+ if (_runtime_state->enable_share_hash_table_for_broadcast_join()) {
+ // If shared hash table is enabled for BJ, hash table will be
built by only one task
+ shared_state->exchanger = PassToOneExchanger::create_unique(
+ cur_pipe->num_tasks(), _num_instances,
+
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
+ ?
_runtime_state->query_options().local_exchange_free_blocks_limit
+ : 0);
+ } else {
+ shared_state->exchanger = BroadcastExchanger::create_unique(
+ cur_pipe->num_tasks(), _num_instances,
+
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
+ ?
_runtime_state->query_options().local_exchange_free_blocks_limit
+ : 0);
+ }
break;
case ExchangeType::LOCAL_MERGE_SORT: {
auto child_op = cur_pipe->sink_x()->child_x();
@@ -788,7 +800,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
}
operator_xs.insert(operator_xs.begin(), source_op);
- shared_state->create_source_dependencies(source_op->operator_id(),
source_op->node_id());
+ shared_state->create_dependencies(source_op->operator_id(),
source_op->node_id());
// 5. Set children for two pipelines separately.
std::vector<std::shared_ptr<Pipeline>> new_children;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]