This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ee45ed206a3 [fix](pipeline) Do not push data in local exchange if eos
(#35972)
ee45ed206a3 is described below
commit ee45ed206a3cffcd79671b3ec3976fe95ea8d789
Author: Gabriel <[email protected]>
AuthorDate: Fri Jun 7 10:01:29 2024 +0800
[fix](pipeline) Do not push data in local exchange if eos (#35972)
---
be/src/pipeline/dependency.h | 4 +-
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 9 ++
be/src/pipeline/exec/hashjoin_probe_operator.h | 1 +
.../exec/partitioned_hash_join_probe_operator.cpp | 20 ++++
.../exec/partitioned_hash_join_probe_operator.h | 2 +
.../local_exchange/local_exchange_sink_operator.h | 4 +-
.../local_exchange_source_operator.cpp | 6 +-
.../local_exchange_source_operator.h | 6 +-
be/src/pipeline/local_exchange/local_exchanger.cpp | 55 ++++++----
be/src/pipeline/local_exchange/local_exchanger.h | 120 ++++++++++++++-------
10 files changed, 160 insertions(+), 67 deletions(-)
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index e5a019b4fa0..0f9c698a82e 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -835,13 +835,13 @@ struct DataDistribution {
std::vector<TExpr> partition_exprs;
};
-class Exchanger;
+class ExchangerBase;
struct LocalExchangeSharedState : public BasicSharedState {
public:
ENABLE_FACTORY_CREATOR(LocalExchangeSharedState);
LocalExchangeSharedState(int num_instances);
- std::unique_ptr<Exchanger> exchanger {};
+ std::unique_ptr<ExchangerBase> exchanger {};
std::vector<MemTracker*> mem_trackers;
std::atomic<size_t> mem_usage = 0;
std::mutex le_lock;
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 002ead551f6..dc2df872bd5 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -363,6 +363,15 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
return Status::OK();
}
+std::string HashJoinProbeLocalState::debug_string(int indentation_level) const
{
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer, "{}, short_circuit_for_probe: {}",
+ JoinProbeLocalState<HashJoinSharedState,
HashJoinProbeLocalState>::debug_string(
+ indentation_level),
+ _shared_state ?
std::to_string(_shared_state->short_circuit_for_probe) : "NULL");
+ return fmt::to_string(debug_string_buffer);
+}
+
Status HashJoinProbeLocalState::_extract_join_column(vectorized::Block& block,
const std::vector<int>&
res_col_ids) {
if (empty_right_table_shortcut()) {
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 028b0583167..b8bc892ef31 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -77,6 +77,7 @@ public:
// !Base::_projections.empty() means nereids planner
return _shared_state->empty_right_table_need_probe_dispose &&
!Base::_projections.empty();
}
+ std::string debug_string(int indentation_level) const override;
private:
void _prepare_probe_block();
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
index 17dc04b7828..7ee06f5ab4d 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
@@ -370,6 +370,16 @@ Status
PartitionedHashJoinProbeLocalState::recovery_build_blocks_from_disk(Runti
return spill_io_pool->submit_func(exception_catch_func);
}
+std::string PartitionedHashJoinProbeOperatorX::debug_string(RuntimeState*
state,
+ int
indentation_level) const {
+ fmt::memory_buffer debug_string_buffer;
+ fmt::format_to(debug_string_buffer, "{}, in mem join probe: {}",
+
JoinProbeOperatorX<PartitionedHashJoinProbeLocalState>::debug_string(
+ state, indentation_level),
+ _inner_probe_operator ?
_inner_probe_operator->debug_string(state, 0) : "NULL");
+ return fmt::to_string(debug_string_buffer);
+}
+
Status
PartitionedHashJoinProbeLocalState::recovery_probe_blocks_from_disk(RuntimeState*
state,
uint32_t partition_index,
bool& has_data) {
@@ -763,6 +773,16 @@ Status
PartitionedHashJoinProbeOperatorX::get_block(RuntimeState* state, vectori
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
const auto need_to_spill = local_state._shared_state->need_to_spill;
+#ifndef NDEBUG
+ Defer eos_check_defer([&] {
+ if (*eos) {
+ LOG(INFO) << "query: " << print_id(state->query_id())
+ << ", hash probe node: " << node_id() << ", task: " <<
state->task_id()
+ << ", eos with child eos: " << local_state._child_eos
+ << ", need spill: " << need_to_spill;
+ }
+ });
+#endif
if (need_more_input_data(state)) {
if (need_to_spill && _should_revoke_memory(state)) {
return _revoke_memory(state);
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index aecd8a22f91..5dced043214 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -153,6 +153,8 @@ public:
Status pull(doris::RuntimeState* state, vectorized::Block* output_block,
bool* eos) const override;
+ std::string debug_string(RuntimeState* state, int indentation_level = 0)
const override;
+
bool need_more_input_data(RuntimeState* state) const override;
DataDistribution required_data_distribution() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index 36530bc8ef1..c29d6a7ec90 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -25,7 +25,7 @@ class PartitionerBase;
namespace doris::pipeline {
-class Exchanger;
+class ExchangerBase;
class ShuffleExchanger;
class PassthroughExchanger;
class BroadcastExchanger;
@@ -63,7 +63,7 @@ private:
friend class LocalMergeSortExchanger;
friend class AdaptivePassthroughExchanger;
- Exchanger* _exchanger = nullptr;
+ ExchangerBase* _exchanger = nullptr;
// Used by shuffle exchanger
RuntimeProfile::Counter* _compute_hash_value_timer = nullptr;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
index 1c8dff51a29..79aba88fbaa 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.cpp
@@ -66,14 +66,16 @@ std::string LocalExchangeSourceLocalState::debug_string(int
indentation_level) c
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer,
"{}, _channel_id: {}, _num_partitions: {}, _num_senders:
{}, _num_sources: {}, "
- "_running_sink_operators: {}, _running_source_operators:
{}",
+ "_running_sink_operators: {}, _running_source_operators:
{}, mem_usage: {}",
Base::debug_string(indentation_level), _channel_id,
_exchanger->_num_partitions,
_exchanger->_num_senders, _exchanger->_num_sources,
- _exchanger->_running_sink_operators,
_exchanger->_running_source_operators);
+ _exchanger->_running_sink_operators,
_exchanger->_running_source_operators,
+ _shared_state->mem_usage.load());
size_t i = 0;
fmt::format_to(debug_string_buffer, ", MemTrackers: ");
for (auto* mem_tracker : _shared_state->mem_trackers) {
fmt::format_to(debug_string_buffer, "{}: {}, ", i,
mem_tracker->consumption());
+ i++;
}
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index f32261cd574..58086097d6d 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -21,7 +21,7 @@
namespace doris::pipeline {
-class Exchanger;
+class ExchangerBase;
class ShuffleExchanger;
class PassthroughExchanger;
class BroadcastExchanger;
@@ -42,7 +42,7 @@ public:
private:
friend class LocalExchangeSourceOperatorX;
- friend class Exchanger;
+ friend class ExchangerBase;
friend class ShuffleExchanger;
friend class PassthroughExchanger;
friend class BroadcastExchanger;
@@ -50,7 +50,7 @@ private:
friend class LocalMergeSortExchanger;
friend class AdaptivePassthroughExchanger;
- Exchanger* _exchanger = nullptr;
+ ExchangerBase* _exchanger = nullptr;
int _channel_id;
RuntimeProfile::Counter* _get_block_failed_counter = nullptr;
RuntimeProfile::Counter* _copy_data_timer = nullptr;
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index a7c6446be43..7b9186cb7c8 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -45,6 +45,7 @@ Status ShuffleExchanger::sink(RuntimeState* state,
vectorized::Block* in_block,
void ShuffleExchanger::close(LocalExchangeSourceLocalState& local_state) {
PartitionedBlock partitioned_block;
+ _data_queue[local_state._channel_id].set_eos();
while
(_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
auto block_wrapper = partitioned_block.first;
local_state._shared_state->sub_mem_usage(
@@ -140,8 +141,11 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
if (size > 0) {
local_state._shared_state->add_mem_usage(
it.second,
new_block_wrapper->data_block.allocated_bytes(), false);
- data_queue[it.second].enqueue({new_block_wrapper, {row_idx,
start, size}});
- local_state._shared_state->set_ready_to_read(it.second);
+ if (data_queue[it.second].enqueue({new_block_wrapper,
{row_idx, start, size}})) {
+ local_state._shared_state->set_ready_to_read(it.second);
+ } else {
+ new_block_wrapper->unref(local_state._shared_state);
+ }
} else {
new_block_wrapper->unref(local_state._shared_state);
}
@@ -154,8 +158,12 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
if (size > 0) {
local_state._shared_state->add_mem_usage(
i % _num_sources,
new_block_wrapper->data_block.allocated_bytes(), false);
- data_queue[i % _num_sources].enqueue({new_block_wrapper,
{row_idx, start, size}});
- local_state._shared_state->set_ready_to_read(i % _num_sources);
+ if (data_queue[i % _num_sources].enqueue(
+ {new_block_wrapper, {row_idx, start, size}})) {
+ local_state._shared_state->set_ready_to_read(i %
_num_sources);
+ } else {
+ new_block_wrapper->unref(local_state._shared_state);
+ }
} else {
new_block_wrapper->unref(local_state._shared_state);
}
@@ -170,8 +178,11 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state,
const uint32_t* __rest
if (size > 0) {
local_state._shared_state->add_mem_usage(
map[i],
new_block_wrapper->data_block.allocated_bytes(), false);
- data_queue[map[i]].enqueue({new_block_wrapper, {row_idx,
start, size}});
- local_state._shared_state->set_ready_to_read(map[i]);
+ if (data_queue[map[i]].enqueue({new_block_wrapper, {row_idx,
start, size}})) {
+ local_state._shared_state->set_ready_to_read(map[i]);
+ } else {
+ new_block_wrapper->unref(local_state._shared_state);
+ }
} else {
new_block_wrapper->unref(local_state._shared_state);
}
@@ -190,14 +201,16 @@ Status PassthroughExchanger::sink(RuntimeState* state,
vectorized::Block* in_blo
new_block.swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_partitions;
local_state._shared_state->add_mem_usage(channel_id,
new_block.allocated_bytes());
- _data_queue[channel_id].enqueue(std::move(new_block));
- local_state._shared_state->set_ready_to_read(channel_id);
+ if (_data_queue[channel_id].enqueue(std::move(new_block))) {
+ local_state._shared_state->set_ready_to_read(channel_id);
+ }
return Status::OK();
}
void PassthroughExchanger::close(LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
+ _data_queue[local_state._channel_id].set_eos();
while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
local_state._shared_state->sub_mem_usage(local_state._channel_id,
next_block.allocated_bytes());
@@ -237,8 +250,9 @@ Status PassToOneExchanger::sink(RuntimeState* state,
vectorized::Block* in_block
LocalExchangeSinkLocalState& local_state) {
vectorized::Block new_block(in_block->clone_empty());
new_block.swap(*in_block);
- _data_queue[0].enqueue(std::move(new_block));
- local_state._shared_state->set_ready_to_read(0);
+ if (_data_queue[0].enqueue(std::move(new_block))) {
+ local_state._shared_state->set_ready_to_read(0);
+ }
return Status::OK();
}
@@ -273,9 +287,10 @@ Status LocalMergeSortExchanger::sink(RuntimeState* state,
vectorized::Block* in_
}
new_block.swap(*in_block);
DCHECK_LE(local_state._channel_id, _data_queue.size());
- _data_queue[local_state._channel_id].enqueue(std::move(new_block));
add_mem_usage(local_state, new_block.allocated_bytes());
- local_state._shared_state->set_ready_to_read(0);
+ if (_data_queue[local_state._channel_id].enqueue(std::move(new_block))) {
+ local_state._shared_state->set_ready_to_read(0);
+ }
return Status::OK();
}
@@ -361,8 +376,9 @@ Status BroadcastExchanger::sink(RuntimeState* state,
vectorized::Block* in_block
for (size_t i = 0; i < _num_partitions; i++) {
auto mutable_block =
vectorized::MutableBlock::create_unique(in_block->clone_empty());
RETURN_IF_ERROR(mutable_block->add_rows(in_block, 0,
in_block->rows()));
- _data_queue[i].enqueue(mutable_block->to_block());
- local_state._shared_state->set_ready_to_read(i);
+ if (_data_queue[i].enqueue(mutable_block->to_block())) {
+ local_state._shared_state->set_ready_to_read(i);
+ }
}
return Status::OK();
@@ -370,6 +386,7 @@ Status BroadcastExchanger::sink(RuntimeState* state,
vectorized::Block* in_block
void BroadcastExchanger::close(LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
+ _data_queue[local_state._channel_id].set_eos();
while (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
// do nothing
}
@@ -403,8 +420,9 @@ Status
AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
new_block.swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_partitions;
local_state._shared_state->add_mem_usage(channel_id,
new_block.allocated_bytes());
- _data_queue[channel_id].enqueue(std::move(new_block));
- local_state._shared_state->set_ready_to_read(channel_id);
+ if (_data_queue[channel_id].enqueue(std::move(new_block))) {
+ local_state._shared_state->set_ready_to_read(channel_id);
+ }
return Status::OK();
}
@@ -460,9 +478,10 @@ Status
AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
RETURN_IF_ERROR(mutable_block->add_rows(block, start, size));
auto new_block = mutable_block->to_block();
local_state._shared_state->add_mem_usage(i,
new_block.allocated_bytes());
- data_queue[i].enqueue(std::move(new_block));
+ if (data_queue[i].enqueue(std::move(new_block))) {
+ local_state._shared_state->set_ready_to_read(i);
+ }
}
- local_state._shared_state->set_ready_to_read(i);
}
return Status::OK();
}
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h
b/be/src/pipeline/local_exchange/local_exchanger.h
index 806ac8b9131..113f4906ff8 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -27,29 +27,30 @@ class LocalExchangeSinkLocalState;
struct ShuffleBlockWrapper;
class SortSourceOperatorX;
-class Exchanger {
+class ExchangerBase {
public:
- Exchanger(int running_sink_operators, int num_partitions, int
free_block_limit)
+ ExchangerBase(int running_sink_operators, int num_partitions, int
free_block_limit)
: _running_sink_operators(running_sink_operators),
_running_source_operators(num_partitions),
_num_partitions(num_partitions),
_num_senders(running_sink_operators),
_num_sources(num_partitions),
_free_block_limit(free_block_limit) {}
- Exchanger(int running_sink_operators, int num_sources, int num_partitions,
int free_block_limit)
+ ExchangerBase(int running_sink_operators, int num_sources, int
num_partitions,
+ int free_block_limit)
: _running_sink_operators(running_sink_operators),
_running_source_operators(num_partitions),
_num_partitions(num_partitions),
_num_senders(running_sink_operators),
_num_sources(num_sources),
_free_block_limit(free_block_limit) {}
- virtual ~Exchanger() = default;
+ virtual ~ExchangerBase() = default;
virtual Status get_block(RuntimeState* state, vectorized::Block* block,
bool* eos,
LocalExchangeSourceLocalState& local_state) = 0;
virtual Status sink(RuntimeState* state, vectorized::Block* in_block, bool
eos,
LocalExchangeSinkLocalState& local_state) = 0;
virtual ExchangeType get_type() const = 0;
- virtual void close(LocalExchangeSourceLocalState& local_state) {}
+ virtual void close(LocalExchangeSourceLocalState& local_state) = 0;
virtual DependencySPtr get_local_state_dependency(int _channel_id) {
return nullptr; }
@@ -68,6 +69,56 @@ protected:
moodycamel::ConcurrentQueue<vectorized::Block> _free_blocks;
};
+struct PartitionedRowIdxs {
+ std::shared_ptr<vectorized::PODArray<uint32_t>> row_idxs;
+ uint32_t offset_start;
+ uint32_t length;
+};
+
+using PartitionedBlock = std::pair<std::shared_ptr<ShuffleBlockWrapper>,
PartitionedRowIdxs>;
+
+template <typename BlockType>
+struct BlockQueue {
+ std::atomic<bool> eos = false;
+ moodycamel::ConcurrentQueue<BlockType> data_queue;
+ BlockQueue() : eos(false),
data_queue(moodycamel::ConcurrentQueue<BlockType>()) {}
+ BlockQueue(BlockQueue<BlockType>&& other)
+ : eos(other.eos.load()), data_queue(std::move(other.data_queue)) {}
+ inline bool enqueue(BlockType const& item) {
+ if (!eos) {
+ data_queue.enqueue(item);
+ return true;
+ }
+ return false;
+ }
+
+ inline bool enqueue(BlockType&& item) {
+ if (!eos) {
+ data_queue.enqueue(std::move(item));
+ return true;
+ }
+ return false;
+ }
+
+ bool try_dequeue(BlockType& item) { return data_queue.try_dequeue(item); }
+
+ void set_eos() { eos = true; }
+};
+
+template <typename BlockType>
+class Exchanger : public ExchangerBase {
+public:
+ Exchanger(int running_sink_operators, int num_partitions, int
free_block_limit)
+ : ExchangerBase(running_sink_operators, num_partitions,
free_block_limit) {}
+ Exchanger(int running_sink_operators, int num_sources, int num_partitions,
int free_block_limit)
+ : ExchangerBase(running_sink_operators, num_sources,
num_partitions, free_block_limit) {
+ }
+ ~Exchanger() override = default;
+
+protected:
+ std::vector<BlockQueue<BlockType>> _data_queue;
+};
+
class LocalExchangeSourceLocalState;
class LocalExchangeSinkLocalState;
@@ -91,19 +142,12 @@ struct ShuffleBlockWrapper {
vectorized::Block data_block;
};
-class ShuffleExchanger : public Exchanger {
- struct PartitionedRowIdxs {
- std::shared_ptr<vectorized::PODArray<uint32_t>> row_idxs;
- uint32_t offset_start;
- uint32_t length;
- };
-
- using PartitionedBlock = std::pair<std::shared_ptr<ShuffleBlockWrapper>,
PartitionedRowIdxs>;
-
+class ShuffleExchanger : public Exchanger<PartitionedBlock> {
public:
ENABLE_FACTORY_CREATOR(ShuffleExchanger);
ShuffleExchanger(int running_sink_operators, int num_partitions, int
free_block_limit)
- : Exchanger(running_sink_operators, num_partitions,
free_block_limit) {
+ : Exchanger<PartitionedBlock>(running_sink_operators,
num_partitions,
+ free_block_limit) {
_data_queue.resize(num_partitions);
}
~ShuffleExchanger() override = default;
@@ -118,7 +162,8 @@ public:
protected:
ShuffleExchanger(int running_sink_operators, int num_sources, int
num_partitions,
bool ignore_source_data_distribution, int
free_block_limit)
- : Exchanger(running_sink_operators, num_sources, num_partitions,
free_block_limit),
+ : Exchanger<PartitionedBlock>(running_sink_operators, num_sources,
num_partitions,
+ free_block_limit),
_ignore_source_data_distribution(ignore_source_data_distribution) {
_data_queue.resize(num_partitions);
}
@@ -126,8 +171,6 @@ protected:
vectorized::Block* block, bool eos,
LocalExchangeSinkLocalState& local_state);
- std::vector<moodycamel::ConcurrentQueue<PartitionedBlock>> _data_queue;
-
const bool _ignore_source_data_distribution = false;
};
@@ -141,11 +184,12 @@ class BucketShuffleExchanger final : public
ShuffleExchanger {
ExchangeType get_type() const override { return
ExchangeType::BUCKET_HASH_SHUFFLE; }
};
-class PassthroughExchanger final : public Exchanger {
+class PassthroughExchanger final : public Exchanger<vectorized::Block> {
public:
ENABLE_FACTORY_CREATOR(PassthroughExchanger);
PassthroughExchanger(int running_sink_operators, int num_partitions, int
free_block_limit)
- : Exchanger(running_sink_operators, num_partitions,
free_block_limit) {
+ : Exchanger<vectorized::Block>(running_sink_operators,
num_partitions,
+ free_block_limit) {
_data_queue.resize(num_partitions);
}
~PassthroughExchanger() override = default;
@@ -156,16 +200,14 @@ public:
LocalExchangeSourceLocalState& local_state) override;
ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH;
}
void close(LocalExchangeSourceLocalState& local_state) override;
-
-private:
- std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
};
-class PassToOneExchanger final : public Exchanger {
+class PassToOneExchanger final : public Exchanger<vectorized::Block> {
public:
ENABLE_FACTORY_CREATOR(PassToOneExchanger);
PassToOneExchanger(int running_sink_operators, int num_partitions, int
free_block_limit)
- : Exchanger(running_sink_operators, num_partitions,
free_block_limit) {
+ : Exchanger<vectorized::Block>(running_sink_operators,
num_partitions,
+ free_block_limit) {
_data_queue.resize(num_partitions);
}
~PassToOneExchanger() override = default;
@@ -175,17 +217,16 @@ public:
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos,
LocalExchangeSourceLocalState& local_state) override;
ExchangeType get_type() const override { return ExchangeType::PASS_TO_ONE;
}
-
-private:
- std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
+ void close(LocalExchangeSourceLocalState& local_state) override {}
};
-class LocalMergeSortExchanger final : public Exchanger {
+class LocalMergeSortExchanger final : public Exchanger<vectorized::Block> {
public:
ENABLE_FACTORY_CREATOR(LocalMergeSortExchanger);
LocalMergeSortExchanger(std::shared_ptr<SortSourceOperatorX> sort_source,
int running_sink_operators, int num_partitions,
int free_block_limit)
- : Exchanger(running_sink_operators, num_partitions,
free_block_limit),
+ : Exchanger<vectorized::Block>(running_sink_operators,
num_partitions,
+ free_block_limit),
_sort_source(std::move(sort_source)),
_queues_mem_usege(num_partitions),
_each_queue_limit(config::local_exchange_buffer_mem_limit /
num_partitions) {
@@ -212,11 +253,10 @@ public:
}
void add_mem_usage(LocalExchangeSinkLocalState& local_state, int64_t
delta);
-
void sub_mem_usage(LocalExchangeSourceLocalState& local_state, int
channel_id, int64_t delta);
+ void close(LocalExchangeSourceLocalState& local_state) override {}
private:
- std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
std::unique_ptr<vectorized::VSortedRunMerger> _merger;
std::shared_ptr<SortSourceOperatorX> _sort_source;
std::vector<DependencySPtr> _sink_deps;
@@ -224,11 +264,12 @@ private:
const int64_t _each_queue_limit;
};
-class BroadcastExchanger final : public Exchanger {
+class BroadcastExchanger final : public Exchanger<vectorized::Block> {
public:
ENABLE_FACTORY_CREATOR(BroadcastExchanger);
BroadcastExchanger(int running_sink_operators, int num_partitions, int
free_block_limit)
- : Exchanger(running_sink_operators, num_partitions,
free_block_limit) {
+ : Exchanger<vectorized::Block>(running_sink_operators,
num_partitions,
+ free_block_limit) {
_data_queue.resize(num_partitions);
}
~BroadcastExchanger() override = default;
@@ -239,19 +280,17 @@ public:
LocalExchangeSourceLocalState& local_state) override;
ExchangeType get_type() const override { return ExchangeType::BROADCAST; }
void close(LocalExchangeSourceLocalState& local_state) override;
-
-private:
- std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
};
//The code in AdaptivePassthroughExchanger is essentially
// a copy of ShuffleExchanger and PassthroughExchanger.
-class AdaptivePassthroughExchanger : public Exchanger {
+class AdaptivePassthroughExchanger : public Exchanger<vectorized::Block> {
public:
ENABLE_FACTORY_CREATOR(AdaptivePassthroughExchanger);
AdaptivePassthroughExchanger(int running_sink_operators, int
num_partitions,
int free_block_limit)
- : Exchanger(running_sink_operators, num_partitions,
free_block_limit) {
+ : Exchanger<vectorized::Block>(running_sink_operators,
num_partitions,
+ free_block_limit) {
_data_queue.resize(num_partitions);
}
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos,
@@ -261,6 +300,8 @@ public:
LocalExchangeSourceLocalState& local_state) override;
ExchangeType get_type() const override { return
ExchangeType::ADAPTIVE_PASSTHROUGH; }
+ void close(LocalExchangeSourceLocalState& local_state) override {}
+
private:
Status _passthrough_sink(RuntimeState* state, vectorized::Block* in_block,
bool eos,
LocalExchangeSinkLocalState& local_state);
@@ -269,7 +310,6 @@ private:
Status _split_rows(RuntimeState* state, const uint32_t* __restrict
channel_ids,
vectorized::Block* block, bool eos,
LocalExchangeSinkLocalState& local_state);
- std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
std::atomic_bool _is_pass_through = false;
std::atomic_int32_t _total_block = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]