This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new a5189156266 [fix](pipeline) Do not push data in local exchange if eos
(#35972) (#36010)
a5189156266 is described below
commit a5189156266b8cb1dfccdec29000eb834276ed9a
Author: Gabriel <[email protected]>
AuthorDate: Fri Jun 7 15:40:55 2024 +0800
[fix](pipeline) Do not push data in local exchange if eos (#35972) (#36010)
pick #35972 and #34536
---
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 9 ++
be/src/pipeline/exec/hashjoin_probe_operator.h | 1 +
.../exec/partitioned_hash_join_probe_operator.cpp | 20 +++
.../exec/partitioned_hash_join_probe_operator.h | 2 +
be/src/pipeline/pipeline_x/dependency.h | 4 +-
.../local_exchange/local_exchange_sink_operator.h | 4 +-
.../local_exchange_source_operator.cpp | 6 +-
.../local_exchange_source_operator.h | 5 +-
.../pipeline_x/local_exchange/local_exchanger.cpp | 115 +++++++++-------
.../pipeline_x/local_exchange/local_exchanger.h | 147 ++++++++++++++-------
.../pipeline_x/pipeline_x_fragment_context.cpp | 38 ++++--
.../java/org/apache/doris/qe/SessionVariable.java | 5 +
gensrc/thrift/PaloInternalService.thrift | 1 +
13 files changed, 249 insertions(+), 108 deletions(-)
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index a58ad62211c..00cf6a65eb0 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -373,6 +373,15 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
return Status::OK();
}
+std::string HashJoinProbeLocalState::debug_string(int indentation_level) const
{
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer, "{}, short_circuit_for_probe: {}",
+ JoinProbeLocalState<HashJoinSharedState,
HashJoinProbeLocalState>::debug_string(
+ indentation_level),
+ _shared_state ?
std::to_string(_shared_state->short_circuit_for_probe) : "NULL");
+ return fmt::to_string(debug_string_buffer);
+}
+
Status HashJoinProbeLocalState::_extract_join_column(vectorized::Block& block,
vectorized::ColumnUInt8::MutablePtr& null_map,
vectorized::ColumnRawPtrs& raw_ptrs,
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index b4930307bcc..5cdfe9feeb7 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -94,6 +94,7 @@ public:
const std::shared_ptr<vectorized::Block>& build_block() const {
return _shared_state->build_block;
}
+ std::string debug_string(int indentation_level) const override;
private:
void _prepare_probe_block();
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 10fa2effcc0..0576ae91dd6 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -437,6 +437,16 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
return st;
}
+std::string PartitionedHashJoinProbeOperatorX::debug_string(RuntimeState*
state,
+ int
indentation_level) const {
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer, "{}, in mem join probe: {}",
+
JoinProbeOperatorX<PartitionedHashJoinProbeLocalState>::debug_string(
+ state, indentation_level),
+ _inner_probe_operator ?
_inner_probe_operator->debug_string(state, 0) : "NULL");
+ return fmt::to_string(debug_string_buffer);
+}
+
Status
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(RuntimeState*
state,
uint32_t partition_index,
bool& has_data) {
@@ -872,6 +882,16 @@ Status
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
const auto need_to_spill = local_state._shared_state->need_to_spill;
+#ifndef NDEBUG
+ Defer eos_check_defer([&] {
+ if (*eos) {
+ LOG(INFO) << "query: " << print_id(state->query_id())
+ << ", hash probe node: " << node_id() << ", task: " <<
state->task_id()
+ << ", eos with child eos: " << local_state._child_eos
+ << ", need spill: " << need_to_spill;
+ }
+ });
+#endif
if (need_more_input_data(state)) {
if (need_to_spill && _should_revoke_memory(state)) {
bool wait_for_io = false;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 3b942d15755..d56a57ae428 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -155,6 +155,8 @@ public:
Status pull(doris::RuntimeState* state, vectorized::Block* output_block,
bool* eos) const override;
+ std::string debug_string(RuntimeState* state, int indentation_level = 0)
const override;
+
bool need_more_input_data(RuntimeState* state) const override;
DataDistribution required_data_distribution() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 72763d03c8f..b60b3e9ae3b 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -766,13 +766,13 @@ struct DataDistribution {
std::vector<TExpr> partition_exprs;
};
-class Exchanger;
+class ExchangerBase;
struct LocalExchangeSharedState : public BasicSharedState {
public:
ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
LocalExchangeSharedState(int num_instances);
- std::unique_ptr<Exchanger> exchanger {};
+ std::unique_ptr<ExchangerBase> exchanger {};
std::vector<MemTracker*> mem_trackers;
std::atomic<size_t> mem_usage = 0;
std::mutex le_lock;
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index db6662a221a..99b88747a98 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -21,7 +21,7 @@
namespace doris::pipeline {
-class Exchanger;
+class ExchangerBase;
class ShuffleExchanger;
class PassthroughExchanger;
class BroadcastExchanger;
@@ -50,7 +50,7 @@ private:
friend class PassToOneExchanger;
friend class AdaptivePassthroughExchanger;
- Exchanger* _exchanger = nullptr;
+ ExchangerBase* _exchanger = nullptr;
// Used by shuffle exchanger
RuntimeProfile::Counter* _compute_hash_value_timer = nullptr;
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index dbc4e37bbab..086a3b551fd 100644
---
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -66,14 +66,16 @@ std::string LocalExchangeSourceLocalState::debug_string(int
indentation_level) c
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"{}, _channel_id: {}, _num_partitions: {}, _num_senders:
{}, _num_sources: {}, "
- "_running_sink_operators: {}, _running_source_operators:
{}",
+ "_running_sink_operators: {}, _running_source_operators:
{}, mem_usage: {}",
Base::debug_string(indentation_level), _channel_id,
_exchanger->_num_partitions,
_exchanger->_num_senders, _exchanger->_num_sources,
- _exchanger->_running_sink_operators,
_exchanger->_running_source_operators);
+ _exchanger->_running_sink_operators,
_exchanger->_running_source_operators,
+ _shared_state->mem_usage.load());
size_t i = 0;
fmt::format_to(debug_string_buffer, ", MemTrackers: ");
for (auto* mem_tracker : _shared_state->mem_trackers) {
fmt::format_to(debug_string_buffer, "{}: {}, ", i,
mem_tracker->consumption());
+ i++;
}
return fmt::to_string(debug_string_buffer);
}
diff --git
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
index c7583d1351c..7cefc1ca900 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
@@ -21,7 +21,7 @@
namespace doris::pipeline {
-class Exchanger;
+class ExchangerBase;
class ShuffleExchanger;
class PassthroughExchanger;
class BroadcastExchanger;
@@ -41,13 +41,14 @@ public:
private:
friend class LocalExchangeSourceOperatorX;
+ friend class ExchangerBase;
friend class ShuffleExchanger;
friend class PassthroughExchanger;
friend class BroadcastExchanger;
friend class PassToOneExchanger;
friend class AdaptivePassthroughExchanger;
- Exchanger* _exchanger = nullptr;
+ ExchangerBase* _exchanger = nullptr;
int _channel_id;
RuntimeProfile::Counter* _get_block_failed_counter = nullptr;
RuntimeProfile::Counter* _copy_data_timer = nullptr;
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index abcc0161fd7..f02fd0e5f04 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -17,8 +17,10 @@
#include "pipeline/pipeline_x/local_exchange/local_exchanger.h"
+#include "common/status.h"
#include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h"
#include "pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h"
+#include "vec/runtime/partitioner.h"
namespace doris::pipeline {
@@ -41,6 +43,7 @@ Status ShuffleExchanger::sink(RuntimeState* state,
vectorized::Block* in_block,
void ShuffleExchanger::close(LocalExchangeSourceLocalState& local_state) {
PartitionedBlock partitioned_block;
+ _data_queue[local_state._channel_id].set_eos();
while
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
auto block_wrapper = partitioned_block.first;
local_state._shared_state->sub_mem_usage(
@@ -52,38 +55,36 @@ void ShuffleExchanger::close(LocalExchangeSourceLocalState&
local_state) {
Status ShuffleExchanger::get_block(RuntimeState* state, vectorized::Block*
block, bool* eos,
LocalExchangeSourceLocalState& local_state)
{
PartitionedBlock partitioned_block;
- std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr;
+ vectorized::MutableBlock mutable_block;
auto get_data = [&](vectorized::Block* result_block) -> Status {
do {
- const auto* offset_start = &((
-
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
+ const auto* offset_start =
partitioned_block.second.row_idxs->data() +
+ partitioned_block.second.offset_start;
auto block_wrapper = partitioned_block.first;
local_state._shared_state->sub_mem_usage(
local_state._channel_id,
block_wrapper->data_block.allocated_bytes(), false);
- RETURN_IF_ERROR(
- mutable_block->add_rows(&block_wrapper->data_block,
offset_start,
- offset_start +
std::get<2>(partitioned_block.second)));
+ RETURN_IF_ERROR(mutable_block.add_rows(&block_wrapper->data_block,
offset_start,
+ offset_start +
partitioned_block.second.length));
block_wrapper->unref(local_state._shared_state);
- } while (mutable_block->rows() < state->batch_size() &&
+ } while (mutable_block.rows() < state->batch_size() &&
_data_queue[local_state._channel_id].try_dequeue(partitioned_block));
- *result_block = mutable_block->to_block();
return Status::OK();
};
+
if (_running_sink_operators == 0) {
if
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
- mutable_block = vectorized::MutableBlock::create_unique(
- partitioned_block.first->data_block.clone_empty());
+ mutable_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
+ block, partitioned_block.first->data_block);
RETURN_IF_ERROR(get_data(block));
} else {
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
*eos = true;
}
} else if
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
- mutable_block = vectorized::MutableBlock::create_unique(
- partitioned_block.first->data_block.clone_empty());
+ mutable_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(
+ block, partitioned_block.first->data_block);
RETURN_IF_ERROR(get_data(block));
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
@@ -97,7 +98,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
LocalExchangeSinkLocalState& local_state)
{
auto& data_queue = _data_queue;
const auto rows = block->rows();
- auto row_idx = std::make_shared<std::vector<uint32_t>>(rows);
+ auto row_idx = std::make_shared<vectorized::PODArray<uint32_t>>(rows);
{
local_state._partition_rows_histogram.assign(_num_partitions + 1, 0);
for (size_t i = 0; i < rows; ++i) {
@@ -107,7 +108,6 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
local_state._partition_rows_histogram[i] +=
local_state._partition_rows_histogram[i - 1];
}
-
for (int32_t i = rows - 1; i >= 0; --i) {
(*row_idx)[local_state._partition_rows_histogram[channel_ids[i]] -
1] = i;
local_state._partition_rows_histogram[channel_ids[i]]--;
@@ -134,13 +134,16 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
for (const auto& it : map) {
DCHECK(it.second >= 0 && it.second < _num_partitions)
<< it.first << " : " << it.second << " " <<
_num_partitions;
- size_t start = local_state._partition_rows_histogram[it.first];
- size_t size = local_state._partition_rows_histogram[it.first + 1]
- start;
+ uint32_t start = local_state._partition_rows_histogram[it.first];
+ uint32_t size = local_state._partition_rows_histogram[it.first +
1] - start;
if (size > 0) {
local_state._shared_state->add_mem_usage(
it.second,
new_block_wrapper->data_block.allocated_bytes(), false);
- data_queue[it.second].enqueue({new_block_wrapper, {row_idx,
start, size}});
- local_state._shared_state->set_ready_to_read(it.second);
+ if (data_queue[it.second].enqueue({new_block_wrapper,
{row_idx, start, size}})) {
+ local_state._shared_state->set_ready_to_read(it.second);
+ } else {
+ new_block_wrapper->unref(local_state._shared_state);
+ }
} else {
new_block_wrapper->unref(local_state._shared_state);
}
@@ -148,13 +151,17 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
} else if (_num_senders != _num_sources ||
_ignore_source_data_distribution) {
new_block_wrapper->ref(_num_partitions);
for (size_t i = 0; i < _num_partitions; i++) {
- size_t start = local_state._partition_rows_histogram[i];
- size_t size = local_state._partition_rows_histogram[i + 1] - start;
+ uint32_t start = local_state._partition_rows_histogram[i];
+ uint32_t size = local_state._partition_rows_histogram[i + 1] -
start;
if (size > 0) {
local_state._shared_state->add_mem_usage(
i % _num_sources,
new_block_wrapper->data_block.allocated_bytes(), false);
- data_queue[i % _num_sources].enqueue({new_block_wrapper,
{row_idx, start, size}});
- local_state._shared_state->set_ready_to_read(i % _num_sources);
+ if (data_queue[i % _num_sources].enqueue(
+ {new_block_wrapper, {row_idx, start, size}})) {
+ local_state._shared_state->set_ready_to_read(i %
_num_sources);
+ } else {
+ new_block_wrapper->unref(local_state._shared_state);
+ }
} else {
new_block_wrapper->unref(local_state._shared_state);
}
@@ -164,13 +171,16 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
auto map =
local_state._parent->cast<LocalExchangeSinkOperatorX>()._bucket_seq_to_instance_idx;
for (size_t i = 0; i < _num_partitions; i++) {
- size_t start = local_state._partition_rows_histogram[i];
- size_t size = local_state._partition_rows_histogram[i + 1] - start;
+ uint32_t start = local_state._partition_rows_histogram[i];
+ uint32_t size = local_state._partition_rows_histogram[i + 1] -
start;
if (size > 0) {
local_state._shared_state->add_mem_usage(
map[i],
new_block_wrapper->data_block.allocated_bytes(), false);
- data_queue[map[i]].enqueue({new_block_wrapper, {row_idx,
start, size}});
- local_state._shared_state->set_ready_to_read(map[i]);
+ if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx,
start, size}})) {
+ local_state._shared_state->set_ready_to_read(map[i]);
+ } else {
+ new_block_wrapper->unref(local_state._shared_state);
+ }
} else {
new_block_wrapper->unref(local_state._shared_state);
}
@@ -189,14 +199,16 @@ Status PassthroughExchanger::sink(RuntimeState* state,
vectorized::Block* in_blo
new_block.swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_partitions;
local_state._shared_state->add_mem_usage(channel_id,
new_block.allocated_bytes());
- _data_queue[channel_id].enqueue(std::move(new_block));
- local_state._shared_state->set_ready_to_read(channel_id);
+ if (_data_queue[channel_id].enqueue(std::move(new_block))) {
+ local_state._shared_state->set_ready_to_read(channel_id);
+ }
return Status::OK();
}
void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
+ _data_queue[local_state._channel_id].set_eos();
while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
local_state._shared_state->sub_mem_usage(local_state._channel_id,
next_block.allocated_bytes());
@@ -209,16 +221,21 @@ Status PassthroughExchanger::get_block(RuntimeState*
state, vectorized::Block* b
if (_running_sink_operators == 0) {
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
- _free_blocks.enqueue(std::move(next_block));
local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
+ if (_free_block_limit == 0 ||
+ _free_blocks.size_approx() < _free_block_limit * _num_sources)
{
+ _free_blocks.enqueue(std::move(next_block));
+ }
} else {
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
*eos = true;
}
} else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
- _free_blocks.enqueue(std::move(next_block));
+ if (_free_block_limit == 0 ||
+ _free_blocks.size_approx() < _free_block_limit * _num_sources) {
+ _free_blocks.enqueue(std::move(next_block));
+ }
local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
@@ -231,8 +248,9 @@ Status PassToOneExchanger::sink(RuntimeState* state,
vectorized::Block* in_block
LocalExchangeSinkLocalState& local_state) {
vectorized::Block new_block(in_block->clone_empty());
new_block.swap(*in_block);
- _data_queue[0].enqueue(std::move(new_block));
- local_state._shared_state->set_ready_to_read(0);
+ if (_data_queue[0].enqueue(std::move(new_block))) {
+ local_state._shared_state->set_ready_to_read(0);
+ }
return Status::OK();
}
@@ -248,7 +266,6 @@ Status PassToOneExchanger::get_block(RuntimeState* state,
vectorized::Block* blo
if (_data_queue[0].try_dequeue(next_block)) {
*block = std::move(next_block);
} else {
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
*eos = true;
}
} else if (_data_queue[0].try_dequeue(next_block)) {
@@ -265,8 +282,9 @@ Status BroadcastExchanger::sink(RuntimeState* state,
vectorized::Block* in_block
for (size_t i = 0; i < _num_partitions; i++) {
auto mutable_block =
vectorized::MutableBlock::create_unique(in_block->clone_empty());
RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0,
in_block->rows()));
- _data_queue[i].enqueue(mutable_block->to_block());
- local_state._shared_state->set_ready_to_read(i);
+ if (_data_queue[i].enqueue(mutable_block->to_block())) {
+ local_state._shared_state->set_ready_to_read(i);
+ }
}
return Status::OK();
@@ -274,6 +292,7 @@ Status BroadcastExchanger::sink(RuntimeState* state,
vectorized::Block* in_block
void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
+ _data_queue[local_state._channel_id].set_eos();
while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
// do nothing
}
@@ -286,7 +305,6 @@ Status BroadcastExchanger::get_block(RuntimeState* state,
vectorized::Block* blo
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
*block = std::move(next_block);
} else {
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
*eos = true;
}
} else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
@@ -308,8 +326,9 @@ Status
AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
new_block.swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_partitions;
local_state._shared_state->add_mem_usage(channel_id,
new_block.allocated_bytes());
- _data_queue[channel_id].enqueue(std::move(new_block));
- local_state._shared_state->set_ready_to_read(channel_id);
+ if (_data_queue[channel_id].enqueue(std::move(new_block))) {
+ local_state._shared_state->set_ready_to_read(channel_id);
+ }
return Status::OK();
}
@@ -365,9 +384,10 @@ Status
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
RETURN_IF_ERROR(mutable_block->add_rows(block, start, size));
auto new_block = mutable_block->to_block();
local_state._shared_state->add_mem_usage(i,
new_block.allocated_bytes());
- data_queue[i].enqueue(std::move(new_block));
+ if (data_queue[i].enqueue(std::move(new_block))) {
+ local_state._shared_state->set_ready_to_read(i);
+ }
}
- local_state._shared_state->set_ready_to_read(i);
}
return Status::OK();
}
@@ -391,16 +411,21 @@ Status
AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized::
if (_running_sink_operators == 0) {
if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
- _free_blocks.enqueue(std::move(next_block));
+ if (_free_block_limit == 0 ||
+ _free_blocks.size_approx() < _free_block_limit * _num_sources)
{
+ _free_blocks.enqueue(std::move(next_block));
+ }
local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
} else {
- COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
*eos = true;
}
} else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
- _free_blocks.enqueue(std::move(next_block));
+ if (_free_block_limit == 0 ||
+ _free_blocks.size_approx() < _free_block_limit * _num_sources) {
+ _free_blocks.enqueue(std::move(next_block));
+ }
local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
index f3b210b11f3..ee0b5e286de 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
@@ -17,6 +17,7 @@
#pragma once
+#include "pipeline/pipeline_x/dependency.h"
#include "pipeline/pipeline_x/operator.h"
namespace doris::pipeline {
@@ -24,28 +25,34 @@ namespace doris::pipeline {
class LocalExchangeSourceLocalState;
class LocalExchangeSinkLocalState;
struct ShuffleBlockWrapper;
+class SortSourceOperatorX;
-class Exchanger {
+class ExchangerBase {
public:
- Exchanger(int running_sink_operators, int num_partitions)
+ ExchangerBase(int running_sink_operators, int num_partitions, int
free_block_limit)
: _running_sink_operators(running_sink_operators),
_running_source_operators(num_partitions),
_num_partitions(num_partitions),
_num_senders(running_sink_operators),
- _num_sources(num_partitions) {}
- Exchanger(int running_sink_operators, int num_sources, int num_partitions)
+ _num_sources(num_partitions),
+ _free_block_limit(free_block_limit) {}
+ ExchangerBase(int running_sink_operators, int num_sources, int
num_partitions,
+ int free_block_limit)
: _running_sink_operators(running_sink_operators),
_running_source_operators(num_partitions),
_num_partitions(num_partitions),
_num_senders(running_sink_operators),
- _num_sources(num_sources) {}
- virtual ~Exchanger() = default;
+ _num_sources(num_sources),
+ _free_block_limit(free_block_limit) {}
+ virtual ~ExchangerBase() = default;
virtual Status get_block(RuntimeState* state, vectorized::Block* block,
bool* eos,
LocalExchangeSourceLocalState& local_state) = 0;
virtual Status sink(RuntimeState* state, vectorized::Block* in_block, bool
eos,
LocalExchangeSinkLocalState& local_state) = 0;
virtual ExchangeType get_type() const = 0;
- virtual void close(LocalExchangeSourceLocalState& local_state) {}
+ virtual void close(LocalExchangeSourceLocalState& local_state) = 0;
+
+ virtual DependencySPtr get_local_state_dependency(int _channel_id) {
return nullptr; }
protected:
friend struct LocalExchangeSharedState;
@@ -58,9 +65,60 @@ protected:
const int _num_partitions;
const int _num_senders;
const int _num_sources;
+ const int _free_block_limit = 0;
moodycamel::ConcurrentQueue<vectorized::Block> _free_blocks;
};
+struct PartitionedRowIdxs {
+ std::shared_ptr<vectorized::PODArray<uint32_t>> row_idxs;
+ uint32_t offset_start;
+ uint32_t length;
+};
+
+using PartitionedBlock = std::pair<std::shared_ptr<ShuffleBlockWrapper>,
PartitionedRowIdxs>;
+
+template <typename BlockType>
+struct BlockQueue {
+ std::atomic<bool> eos = false;
+ moodycamel::ConcurrentQueue<BlockType> data_queue;
+ BlockQueue() : eos(false),
data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
+ BlockQueue(BlockQueue<BlockType>&& other)
+ : eos(other.eos.load()), data_queue(std::move(other.data_queue)) {}
+ inline bool enqueue(BlockType const& item) {
+ if (!eos) {
+ data_queue.enqueue(item);
+ return true;
+ }
+ return false;
+ }
+
+ inline bool enqueue(BlockType&& item) {
+ if (!eos) {
+ data_queue.enqueue(std::move(item));
+ return true;
+ }
+ return false;
+ }
+
+ bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }
+
+ void set_eos() { eos = true; }
+};
+
+template <typename BlockType>
+class Exchanger : public ExchangerBase {
+public:
+ Exchanger(int running_sink_operators, int num_partitions, int
free_block_limit)
+ : ExchangerBase(running_sink_operators, num_partitions,
free_block_limit) {}
+ Exchanger(int running_sink_operators, int num_sources, int num_partitions,
int free_block_limit)
+ : ExchangerBase(running_sink_operators, num_sources,
num_partitions, free_block_limit) {
+ }
+ ~Exchanger() override = default;
+
+protected:
+ std::vector<BlockQueue<BlockType>> _data_queue;
+};
+
class LocalExchangeSourceLocalState;
class LocalExchangeSinkLocalState;
@@ -71,23 +129,25 @@ struct ShuffleBlockWrapper {
void unref(LocalExchangeSharedState* shared_state) {
if (ref_count.fetch_sub(1) == 1) {
shared_state->sub_total_mem_usage(data_block.allocated_bytes());
- data_block.clear_column_data();
-
shared_state->exchanger->_free_blocks.enqueue(std::move(data_block));
+ if (shared_state->exchanger->_free_block_limit == 0 ||
+ shared_state->exchanger->_free_blocks.size_approx() <
+ shared_state->exchanger->_free_block_limit *
+ shared_state->exchanger->_num_sources) {
+ data_block.clear_column_data();
+
shared_state->exchanger->_free_blocks.enqueue(std::move(data_block));
+ }
}
}
std::atomic<int> ref_count = 0;
vectorized::Block data_block;
};
-class ShuffleExchanger : public Exchanger {
- using PartitionedBlock =
- std::pair<std::shared_ptr<ShuffleBlockWrapper>,
- std::tuple<std::shared_ptr<std::vector<uint32_t>>,
size_t, size_t>>;
-
+class ShuffleExchanger : public Exchanger<PartitionedBlock> {
public:
ENABLE_FACTORY_CREATOR(ShuffleExchanger);
- ShuffleExchanger(int running_sink_operators, int num_partitions)
- : Exchanger(running_sink_operators, num_partitions) {
+ ShuffleExchanger(int running_sink_operators, int num_partitions, int
free_block_limit)
+ : Exchanger<PartitionedBlock>(running_sink_operators,
num_partitions,
+ free_block_limit) {
_data_queue.resize(num_partitions);
}
~ShuffleExchanger() override = default;
@@ -101,8 +161,9 @@ public:
protected:
ShuffleExchanger(int running_sink_operators, int num_sources, int
num_partitions,
- bool ignore_source_data_distribution)
- : Exchanger(running_sink_operators, num_sources, num_partitions),
+ bool ignore_source_data_distribution, int
free_block_limit)
+ : Exchanger<PartitionedBlock>(running_sink_operators, num_sources,
num_partitions,
+ free_block_limit),
_ignore_source_data_distribution(ignore_source_data_distribution) {
_data_queue.resize(num_partitions);
}
@@ -110,26 +171,25 @@ protected:
vectorized::Block* block, bool eos,
LocalExchangeSinkLocalState& local_state);
- std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> _data_queue;
-
const bool _ignore_source_data_distribution = false;
};
-class BucketShuffleExchanger : public ShuffleExchanger {
+class BucketShuffleExchanger final : public ShuffleExchanger {
ENABLE_FACTORY_CREATOR(BucketShuffleExchanger);
BucketShuffleExchanger(int running_sink_operators, int num_sources, int
num_partitions,
- bool ignore_source_data_distribution)
+ bool ignore_source_data_distribution, int
free_block_limit)
: ShuffleExchanger(running_sink_operators, num_sources,
num_partitions,
- ignore_source_data_distribution) {}
+ ignore_source_data_distribution,
free_block_limit) {}
~BucketShuffleExchanger() override = default;
ExchangeType get_type() const override { return
ExchangeType::BUCKET_HASH_SHUFFLE; }
};
-class PassthroughExchanger final : public Exchanger {
+class PassthroughExchanger final : public Exchanger<vectorized::Block> {
public:
ENABLE_FACTORY_CREATOR(PassthroughExchanger);
- PassthroughExchanger(int running_sink_operators, int num_partitions)
- : Exchanger(running_sink_operators, num_partitions) {
+ PassthroughExchanger(int running_sink_operators, int num_partitions, int
free_block_limit)
+ : Exchanger<vectorized::Block>(running_sink_operators,
num_partitions,
+ free_block_limit) {
_data_queue.resize(num_partitions);
}
~PassthroughExchanger() override = default;
@@ -140,16 +200,14 @@ public:
LocalExchangeSourceLocalState& local_state) override;
ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH;
}
void close(LocalExchangeSourceLocalState& local_state) override;
-
-private:
- std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
};
-class PassToOneExchanger final : public Exchanger {
+class PassToOneExchanger final : public Exchanger<vectorized::Block> {
public:
ENABLE_FACTORY_CREATOR(PassToOneExchanger);
- PassToOneExchanger(int running_sink_operators, int num_partitions)
- : Exchanger(running_sink_operators, num_partitions) {
+ PassToOneExchanger(int running_sink_operators, int num_partitions, int
free_block_limit)
+ : Exchanger<vectorized::Block>(running_sink_operators,
num_partitions,
+ free_block_limit) {
_data_queue.resize(num_partitions);
}
~PassToOneExchanger() override = default;
@@ -159,16 +217,15 @@ public:
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) override;
ExchangeType get_type() const override { return ExchangeType::PASS_TO_ONE;
}
-
-private:
- std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
+ void close(LocalExchangeSourceLocalState& local_state) override {}
};
-class BroadcastExchanger final : public Exchanger {
+class BroadcastExchanger final : public Exchanger<vectorized::Block> {
public:
ENABLE_FACTORY_CREATOR(BroadcastExchanger);
- BroadcastExchanger(int running_sink_operators, int num_partitions)
- : Exchanger(running_sink_operators, num_partitions) {
+ BroadcastExchanger(int running_sink_operators, int num_partitions, int
free_block_limit)
+ : Exchanger<vectorized::Block>(running_sink_operators,
num_partitions,
+ free_block_limit) {
_data_queue.resize(num_partitions);
}
~BroadcastExchanger() override = default;
@@ -179,18 +236,17 @@ public:
LocalExchangeSourceLocalState& local_state) override;
ExchangeType get_type() const override { return ExchangeType::BROADCAST; }
void close(LocalExchangeSourceLocalState& local_state) override;
-
-private:
- std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
};
//The code in AdaptivePassthroughExchanger is essentially
// a copy of ShuffleExchanger and PassthroughExchanger.
-class AdaptivePassthroughExchanger : public Exchanger {
+class AdaptivePassthroughExchanger : public Exchanger<vectorized::Block> {
public:
ENABLE_FACTORY_CREATOR(AdaptivePassthroughExchanger);
- AdaptivePassthroughExchanger(int running_sink_operators, int
num_partitions)
- : Exchanger(running_sink_operators, num_partitions) {
+ AdaptivePassthroughExchanger(int running_sink_operators, int
num_partitions,
+ int free_block_limit)
+ : Exchanger<vectorized::Block>(running_sink_operators,
num_partitions,
+ free_block_limit) {
_data_queue.resize(num_partitions);
}
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
@@ -200,6 +256,8 @@ public:
LocalExchangeSourceLocalState& local_state) override;
ExchangeType get_type() const override { return
ExchangeType::ADAPTIVE_PASSTHROUGH; }
+ void close(LocalExchangeSourceLocalState& local_state) override {}
+
private:
Status _passthrough_sink(RuntimeState* state, vectorized::Block* in_block,
bool eos,
LocalExchangeSinkLocalState& local_state);
@@ -208,7 +266,6 @@ private:
Status _split_rows(RuntimeState* state, const uint32_t* __restrict
channel_ids,
vectorized::Block* block, bool eos,
LocalExchangeSinkLocalState& local_state);
- std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
std::atomic_bool _is_pass_through = false;
std::atomic_int32_t _total_block = 0;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index ccc51faa777..bcb3c53b532 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -801,28 +801,46 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
case ExchangeType::HASH_SHUFFLE:
shared_state->exchanger = ShuffleExchanger::create_unique(
std::max(cur_pipe->num_tasks(), _num_instances),
- is_shuffled_hash_join ? _total_instances : _num_instances);
+ is_shuffled_hash_join ? _total_instances : _num_instances,
+
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
+ ?
_runtime_state->query_options().local_exchange_free_blocks_limit
+ : 0);
break;
case ExchangeType::BUCKET_HASH_SHUFFLE:
shared_state->exchanger = BucketShuffleExchanger::create_unique(
std::max(cur_pipe->num_tasks(), _num_instances),
_num_instances, num_buckets,
- ignore_data_hash_distribution);
+ ignore_data_hash_distribution,
+
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
+ ?
_runtime_state->query_options().local_exchange_free_blocks_limit
+ : 0);
break;
case ExchangeType::PASSTHROUGH:
- shared_state->exchanger =
- PassthroughExchanger::create_unique(cur_pipe->num_tasks(),
_num_instances);
+ shared_state->exchanger = PassthroughExchanger::create_unique(
+ cur_pipe->num_tasks(), _num_instances,
+
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
+ ?
_runtime_state->query_options().local_exchange_free_blocks_limit
+ : 0);
break;
case ExchangeType::BROADCAST:
- shared_state->exchanger =
- BroadcastExchanger::create_unique(cur_pipe->num_tasks(),
_num_instances);
+ shared_state->exchanger = BroadcastExchanger::create_unique(
+ cur_pipe->num_tasks(), _num_instances,
+
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
+ ?
_runtime_state->query_options().local_exchange_free_blocks_limit
+ : 0);
break;
case ExchangeType::PASS_TO_ONE:
- shared_state->exchanger =
- BroadcastExchanger::create_unique(cur_pipe->num_tasks(),
_num_instances);
+ shared_state->exchanger = BroadcastExchanger::create_unique(
+ cur_pipe->num_tasks(), _num_instances,
+
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
+ ?
_runtime_state->query_options().local_exchange_free_blocks_limit
+ : 0);
break;
case ExchangeType::ADAPTIVE_PASSTHROUGH:
- shared_state->exchanger =
-
AdaptivePassthroughExchanger::create_unique(cur_pipe->num_tasks(),
_num_instances);
+ shared_state->exchanger = AdaptivePassthroughExchanger::create_unique(
+ cur_pipe->num_tasks(), _num_instances,
+
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
+ ?
_runtime_state->query_options().local_exchange_free_blocks_limit
+ : 0);
break;
default:
return Status::InternalError("Unsupported local exchange type : " +
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 03479643522..01cbfa82c36 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -76,6 +76,7 @@ public class SessionVariable implements Serializable,
Writable {
public static final Logger LOG =
LogManager.getLogger(SessionVariable.class);
public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
+ public static final String LOCAL_EXCHANGE_FREE_BLOCKS_LIMIT =
"local_exchange_free_blocks_limit";
public static final String SCAN_QUEUE_MEM_LIMIT = "scan_queue_mem_limit";
public static final String NUM_SCANNER_THREADS = "num_scanner_threads";
public static final String SCANNER_SCALE_UP_RATIO =
"scanner_scale_up_ratio";
@@ -615,6 +616,9 @@ public class SessionVariable implements Serializable,
Writable {
})
public int numScannerThreads = 0;
+ @VariableMgr.VarAttr(name = LOCAL_EXCHANGE_FREE_BLOCKS_LIMIT)
+ public int localExchangeFreeBlocksLimit = 4;
+
@VariableMgr.VarAttr(name = SCANNER_SCALE_UP_RATIO, needForward = true,
description = {
"ScanNode自适应的增加扫描并发,最大允许增长的并发倍率,默认为0,关闭该功能",
"The max multiple of increasing the concurrency of scanners
adaptively, "
@@ -3157,6 +3161,7 @@ public class SessionVariable implements Serializable,
Writable {
public TQueryOptions toThrift() {
TQueryOptions tResult = new TQueryOptions();
tResult.setMemLimit(maxExecMemByte);
+ tResult.setLocalExchangeFreeBlocksLimit(localExchangeFreeBlocksLimit);
tResult.setScanQueueMemLimit(maxScanQueueMemByte);
tResult.setNumScannerThreads(numScannerThreads);
tResult.setScannerScaleUpRatio(scannerScaleUpRatio);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 54c6c57bdd5..ad8836fefbe 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -288,6 +288,7 @@ struct TQueryOptions {
// max rows of each sub-queue in DataQueue.
106: optional i64 data_queue_max_blocks = 0;
+ 108: optional i64 local_exchange_free_blocks_limit;
110: optional bool enable_parquet_filter_by_min_max = true
111: optional bool enable_orc_filter_by_min_max = true
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]