This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new db0a43bad2e [Chore](exchange) change
LocalExchangeSharedState:mem_usage signed ty… (#37981)
db0a43bad2e is described below
commit db0a43bad2ecf46974f90d2c3906d258abab5f43
Author: Pxl <[email protected]>
AuthorDate: Wed Jul 17 13:46:51 2024 +0800
[Chore](exchange) change LocalExchangeSharedState:mem_usage signed ty…
(#37981)
pick from #36682
---
be/src/pipeline/pipeline_x/dependency.h | 17 ++--
.../pipeline_x/local_exchange/local_exchanger.cpp | 95 ++++++++++------------
2 files changed, 47 insertions(+), 65 deletions(-)
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index b60b3e9ae3b..c82104f7535 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -365,9 +365,8 @@ public:
std::vector<size_t> make_nullable_keys;
struct MemoryRecord {
- MemoryRecord() : used_in_arena(0), used_in_state(0) {}
- int64_t used_in_arena;
- int64_t used_in_state;
+ int64_t used_in_arena {};
+ int64_t used_in_state {};
};
MemoryRecord mem_usage_record;
bool agg_data_created_without_key = false;
@@ -754,14 +753,9 @@ struct DataDistribution {
DataDistribution(ExchangeType type) : distribution_type(type) {}
DataDistribution(ExchangeType type, const std::vector<TExpr>&
partition_exprs_)
: distribution_type(type), partition_exprs(partition_exprs_) {}
- DataDistribution(const DataDistribution& other)
- : distribution_type(other.distribution_type),
partition_exprs(other.partition_exprs) {}
+ DataDistribution(const DataDistribution& other) = default;
bool need_local_exchange() const { return distribution_type !=
ExchangeType::NOOP; }
- DataDistribution& operator=(const DataDistribution& other) {
- distribution_type = other.distribution_type;
- partition_exprs = other.partition_exprs;
- return *this;
- }
+ DataDistribution& operator=(const DataDistribution& other) = default;
ExchangeType distribution_type;
std::vector<TExpr> partition_exprs;
};
@@ -774,7 +768,8 @@ public:
LocalExchangeSharedState(int num_instances);
std::unique_ptr<ExchangerBase> exchanger {};
std::vector<MemTracker*> mem_trackers;
- std::atomic<size_t> mem_usage = 0;
+ std::atomic<int64_t> mem_usage = 0;
+ // We need to make sure to add mem_usage first and then enqueue, otherwise
sub mem_usage may cause negative mem_usage during concurrent dequeue.
std::mutex le_lock;
void create_source_dependencies(int operator_id, int node_id,
QueryContext* ctx) {
for (size_t i = 0; i < source_deps.size(); i++) {
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index f02fd0e5f04..7a044aaa77f 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -72,20 +72,14 @@ Status ShuffleExchanger::get_block(RuntimeState* state,
vectorized::Block* block
return Status::OK();
};
- if (_running_sink_operators == 0) {
- if
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
- SCOPED_TIMER(local_state._copy_data_timer);
- mutable_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
- block, partitioned_block.first->data_block);
- RETURN_IF_ERROR(get_data(block));
- } else {
- *eos = true;
- }
- } else if
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
+ bool all_finished = _running_sink_operators == 0;
+ if (_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
block, partitioned_block.first->data_block);
RETURN_IF_ERROR(get_data(block));
+ } else if (all_finished) {
+ *eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
@@ -142,6 +136,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
if (data_queue[it.second].enqueue({new_block_wrapper,
{row_idx, start, size}})) {
local_state._shared_state->set_ready_to_read(it.second);
} else {
+ local_state._shared_state->sub_mem_usage(
+ it.second,
new_block_wrapper->data_block.allocated_bytes(), false);
new_block_wrapper->unref(local_state._shared_state);
}
} else {
@@ -160,6 +156,9 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
{new_block_wrapper, {row_idx, start, size}})) {
local_state._shared_state->set_ready_to_read(i %
_num_sources);
} else {
+ local_state._shared_state->sub_mem_usage(
+ i % _num_sources,
new_block_wrapper->data_block.allocated_bytes(),
+ false);
new_block_wrapper->unref(local_state._shared_state);
}
} else {
@@ -179,6 +178,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx,
start, size}})) {
local_state._shared_state->set_ready_to_read(map[i]);
} else {
+ local_state._shared_state->sub_mem_usage(
+ map[i],
new_block_wrapper->data_block.allocated_bytes(), false);
new_block_wrapper->unref(local_state._shared_state);
}
} else {
@@ -198,9 +199,12 @@ Status PassthroughExchanger::sink(RuntimeState* state,
vectorized::Block* in_blo
}
new_block.swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_partitions;
- local_state._shared_state->add_mem_usage(channel_id,
new_block.allocated_bytes());
+ size_t memory_usage = new_block.allocated_bytes();
+ local_state._shared_state->add_mem_usage(channel_id, memory_usage);
if (_data_queue[channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(channel_id);
+ } else {
+ local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
}
return Status::OK();
@@ -218,25 +222,16 @@ void
PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos,
LocalExchangeSourceLocalState&
local_state) {
vectorized::Block next_block;
- if (_running_sink_operators == 0) {
- if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
- block->swap(next_block);
- local_state._shared_state->sub_mem_usage(local_state._channel_id,
- block->allocated_bytes());
- if (_free_block_limit == 0 ||
- _free_blocks.size_approx() < _free_block_limit * _num_sources)
{
- _free_blocks.enqueue(std::move(next_block));
- }
- } else {
- *eos = true;
- }
- } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+ bool all_finished = _running_sink_operators == 0;
+ if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
+ local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit * _num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
- local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
+ } else if (all_finished) {
+ *eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
@@ -262,14 +257,11 @@ Status PassToOneExchanger::get_block(RuntimeState* state,
vectorized::Block* blo
return Status::OK();
}
vectorized::Block next_block;
- if (_running_sink_operators == 0) {
- if (_data_queue[0].try_dequeue(next_block)) {
- *block = std::move(next_block);
- } else {
- *eos = true;
- }
- } else if (_data_queue[0].try_dequeue(next_block)) {
+ bool all_finished = _running_sink_operators == 0;
+ if (_data_queue[0].try_dequeue(next_block)) {
*block = std::move(next_block);
+ } else if (all_finished) {
+ *eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
@@ -301,14 +293,11 @@ void
BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos,
LocalExchangeSourceLocalState&
local_state) {
vectorized::Block next_block;
- if (_running_sink_operators == 0) {
- if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
- *block = std::move(next_block);
- } else {
- *eos = true;
- }
- } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+ bool all_finished = _running_sink_operators == 0;
+ if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
*block = std::move(next_block);
+ } else if (all_finished) {
+ *eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
@@ -325,9 +314,12 @@ Status
AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
}
new_block.swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_partitions;
- local_state._shared_state->add_mem_usage(channel_id,
new_block.allocated_bytes());
+ size_t memory_usage = new_block.allocated_bytes();
+ local_state._shared_state->add_mem_usage(channel_id, memory_usage);
if (_data_queue[channel_id].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(channel_id);
+ } else {
+ local_state._shared_state->sub_mem_usage(channel_id, memory_usage);
}
return Status::OK();
@@ -383,9 +375,13 @@ Status
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
vectorized::MutableBlock::create_unique(block->clone_empty());
RETURN_IF_ERROR(mutable_block->add_rows(block, start, size));
auto new_block = mutable_block->to_block();
- local_state._shared_state->add_mem_usage(i,
new_block.allocated_bytes());
+
+ size_t memory_usage = new_block.allocated_bytes();
+ local_state._shared_state->add_mem_usage(i, memory_usage);
if (data_queue[i].enqueue(std::move(new_block))) {
local_state._shared_state->set_ready_to_read(i);
+ } else {
+ local_state._shared_state->sub_mem_usage(i, memory_usage);
}
}
}
@@ -408,25 +404,16 @@ Status
AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized::
bool* eos,
LocalExchangeSourceLocalState&
local_state) {
vectorized::Block next_block;
- if (_running_sink_operators == 0) {
- if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
- block->swap(next_block);
- if (_free_block_limit == 0 ||
- _free_blocks.size_approx() < _free_block_limit * _num_sources)
{
- _free_blocks.enqueue(std::move(next_block));
- }
- local_state._shared_state->sub_mem_usage(local_state._channel_id,
- block->allocated_bytes());
- } else {
- *eos = true;
- }
- } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
+ bool all_finished = _running_sink_operators == 0;
+ if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
if (_free_block_limit == 0 ||
_free_blocks.size_approx() < _free_block_limit * _num_sources) {
_free_blocks.enqueue(std::move(next_block));
}
local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
+ } else if (all_finished) {
+ *eos = true;
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
local_state._dependency->block();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]