This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new b0244ff5544 [refactor](exchange) Optimize the logic related to sending
and closing (#41968) (#44175)
b0244ff5544 is described below
commit b0244ff55440b5c864df0e3f52e3bf407322bfd7
Author: Jerry Hu <[email protected]>
AuthorDate: Mon Nov 18 19:33:11 2024 +0800
[refactor](exchange) Optimize the logic related to sending and closing
(#41968) (#44175)
pick #41968
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 63 ++---
be/src/pipeline/exec/exchange_sink_buffer.h | 26 +-
be/src/pipeline/exec/exchange_sink_operator.cpp | 135 +++++----
be/src/pipeline/exec/exchange_sink_operator.h | 29 +-
be/src/pipeline/exec/result_file_sink_operator.cpp | 12 +-
be/src/pipeline/exec/result_file_sink_operator.h | 9 -
be/src/vec/sink/vdata_stream_sender.cpp | 307 ++++++---------------
be/src/vec/sink/vdata_stream_sender.h | 224 ++++-----------
8 files changed, 261 insertions(+), 544 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index e0a3725ad65..06971094c5e 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -86,14 +86,13 @@ void BroadcastPBlockHolderMemLimiter::release(const
BroadcastPBlockHolder& holde
} // namespace vectorized
namespace pipeline {
-
ExchangeSinkBuffer::ExchangeSinkBuffer(PUniqueId query_id, PlanNodeId
dest_node_id, int send_id,
int be_number, RuntimeState* state,
ExchangeSinkLocalState* parent)
: HasTaskExecutionCtx(state),
_queue_capacity(0),
_is_finishing(false),
- _query_id(query_id),
+ _query_id(std::move(query_id)),
_dest_node_id(dest_node_id),
_sender_id(send_id),
_be_number(be_number),
@@ -110,12 +109,6 @@ void ExchangeSinkBuffer::close() {
//_instance_to_request.clear();
}
-void ExchangeSinkBuffer::_set_ready_to_finish(bool all_done) {
- if (_finish_dependency && _should_stop && all_done) {
- _finish_dependency->set_ready();
- }
-}
-
void ExchangeSinkBuffer::register_sink(TUniqueId fragment_instance_id) {
if (_is_finishing) {
return;
@@ -135,7 +128,6 @@ void ExchangeSinkBuffer::register_sink(TUniqueId
fragment_instance_id) {
finst_id.set_hi(fragment_instance_id.hi);
finst_id.set_lo(fragment_instance_id.lo);
_rpc_channel_is_idle[low_id] = true;
- _instance_to_rpc_ctx[low_id] = {};
_instance_to_receiver_eof[low_id] = false;
_instance_to_rpc_time[low_id] = 0;
_construct_request(low_id, finst_id);
@@ -160,7 +152,6 @@ Status ExchangeSinkBuffer::add_block(TransmitInfo&&
request) {
if (_rpc_channel_is_idle[ins_id]) {
send_now = true;
_rpc_channel_is_idle[ins_id] = false;
- _busy_channels++;
}
if (request.block) {
RETURN_IF_ERROR(
@@ -198,7 +189,6 @@ Status
ExchangeSinkBuffer::add_block(BroadcastTransmitInfo&& request) {
if (_rpc_channel_is_idle[ins_id]) {
send_now = true;
_rpc_channel_is_idle[ins_id] = false;
- _busy_channels++;
}
if (request.block_holder->get_block()) {
RETURN_IF_ERROR(BeExecVersionManager::check_be_exec_version(
@@ -223,7 +213,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
_instance_to_broadcast_package_queue[id];
if (_is_finishing) {
- _turn_off_channel(id);
+ _turn_off_channel(id, lock);
return Status::OK();
}
@@ -241,9 +231,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
auto send_callback = request.channel->get_send_callback(id,
request.eos);
- _instance_to_rpc_ctx[id]._send_callback = send_callback;
- _instance_to_rpc_ctx[id].is_cancelled = false;
-
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
if (config::exchange_sink_ignore_eovercrowded) {
send_callback->cntl_->ignore_eovercrowded();
@@ -321,12 +308,6 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
brpc_request->set_allocated_block(request.block_holder->get_block());
}
auto send_callback = request.channel->get_send_callback(id,
request.eos);
-
- ExchangeRpcContext rpc_ctx;
- rpc_ctx._send_callback = send_callback;
- rpc_ctx.is_cancelled = false;
- _instance_to_rpc_ctx[id] = rpc_ctx;
-
send_callback->cntl_->set_timeout_ms(request.channel->_brpc_timeout_ms);
if (config::exchange_sink_ignore_eovercrowded) {
send_callback->cntl_->ignore_eovercrowded();
@@ -390,7 +371,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
}
broadcast_q.pop();
} else {
- _turn_off_channel(id);
+ _rpc_channel_is_idle[id] = true;
}
return Status::OK();
@@ -420,23 +401,31 @@ void ExchangeSinkBuffer::_ended(InstanceLoId id) {
__builtin_unreachable();
} else {
std::unique_lock<std::mutex>
lock(*_instance_to_package_queue_mutex[id]);
- _turn_off_channel(id);
+ _turn_off_channel(id, lock);
}
}
void ExchangeSinkBuffer::_failed(InstanceLoId id, const std::string& err) {
_is_finishing = true;
_context->cancel(Status::Cancelled(err));
- std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
- _turn_off_channel(id, true);
}
void ExchangeSinkBuffer::_set_receiver_eof(InstanceLoId id) {
std::unique_lock<std::mutex> lock(*_instance_to_package_queue_mutex[id]);
_instance_to_receiver_eof[id] = true;
- _turn_off_channel(id, true);
- std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>> empty;
- swap(empty, _instance_to_broadcast_package_queue[id]);
+ _turn_off_channel(id, lock);
+ std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>&
broadcast_q =
+ _instance_to_broadcast_package_queue[id];
+ {
+ std::queue<BroadcastTransmitInfo, std::list<BroadcastTransmitInfo>>
empty;
+ swap(empty, broadcast_q);
+ }
+
+ std::queue<TransmitInfo, std::list<TransmitInfo>>& q =
_instance_to_package_queue[id];
+ {
+ std::queue<TransmitInfo, std::list<TransmitInfo>> empty;
+ swap(empty, q);
+ }
}
bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId id) {
@@ -444,17 +433,17 @@ bool ExchangeSinkBuffer::_is_receiver_eof(InstanceLoId
id) {
return _instance_to_receiver_eof[id];
}
-void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id, bool cleanup) {
+// The unused parameter `with_lock` is to ensure that the function is called
when the lock is held.
+void ExchangeSinkBuffer::_turn_off_channel(InstanceLoId id,
+ std::unique_lock<std::mutex>&
/*with_lock*/) {
if (!_rpc_channel_is_idle[id]) {
_rpc_channel_is_idle[id] = true;
- auto all_done = _busy_channels.fetch_sub(1) == 1;
- _set_ready_to_finish(all_done);
- if (cleanup && all_done) {
- auto weak_task_ctx = weak_task_exec_ctx();
- if (auto pip_ctx = weak_task_ctx.lock()) {
- _parent->set_reach_limit();
- }
- }
+ }
+ _instance_to_receiver_eof[id] = true;
+
+ auto weak_task_ctx = weak_task_exec_ctx();
+ if (auto pip_ctx = weak_task_ctx.lock()) {
+ _parent->on_channel_finished(id);
}
}
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index 2d30a492a0d..13692532a33 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -22,9 +22,9 @@
#include <gen_cpp/internal_service.pb.h>
#include <gen_cpp/types.pb.h>
#include <parallel_hashmap/phmap.h>
-#include <stdint.h>
#include <atomic>
+#include <cstdint>
#include <list>
#include <memory>
#include <mutex>
@@ -51,7 +51,7 @@ class ExchangeSinkLocalState;
} // namespace pipeline
namespace vectorized {
-class PipChannel;
+class Channel;
// We use BroadcastPBlockHolder to hold a broadcasted PBlock. For broadcast
shuffle, one PBlock
// will be shared between different channel, so we have to use a ref count to
mark if this
@@ -102,14 +102,14 @@ private:
namespace pipeline {
struct TransmitInfo {
- vectorized::PipChannel* channel = nullptr;
+ vectorized::Channel* channel = nullptr;
std::unique_ptr<PBlock> block;
bool eos;
Status exec_status;
};
struct BroadcastTransmitInfo {
- vectorized::PipChannel* channel = nullptr;
+ vectorized::Channel* channel = nullptr;
std::shared_ptr<vectorized::BroadcastPBlockHolder> block_holder = nullptr;
bool eos;
};
@@ -169,11 +169,6 @@ private:
bool _eos;
};
-struct ExchangeRpcContext {
- std::shared_ptr<ExchangeSendCallback<PTransmitDataResult>> _send_callback;
- bool is_cancelled = false;
-};
-
// Each ExchangeSinkOperator have one ExchangeSinkBuffer
class ExchangeSinkBuffer final : public HasTaskExecutionCtx {
public:
@@ -198,14 +193,8 @@ public:
_broadcast_dependency = broadcast_dependency;
}
- void set_should_stop() {
- _should_stop = true;
- _set_ready_to_finish(_busy_channels == 0);
- }
-
private:
friend class ExchangeSinkLocalState;
- void _set_ready_to_finish(bool all_done);
phmap::flat_hash_map<InstanceLoId, std::unique_ptr<std::mutex>>
_instance_to_package_queue_mutex;
@@ -224,11 +213,9 @@ private:
phmap::flat_hash_map<InstanceLoId, std::shared_ptr<PTransmitDataParams>>
_instance_to_request;
// One channel is corresponding to a downstream instance.
phmap::flat_hash_map<InstanceLoId, bool> _rpc_channel_is_idle;
- // Number of busy channels;
- std::atomic<int> _busy_channels = 0;
+
phmap::flat_hash_map<InstanceLoId, bool> _instance_to_receiver_eof;
phmap::flat_hash_map<InstanceLoId, int64_t> _instance_to_rpc_time;
- phmap::flat_hash_map<InstanceLoId, ExchangeRpcContext>
_instance_to_rpc_ctx;
std::atomic<bool> _is_finishing;
PUniqueId _query_id;
@@ -247,7 +234,7 @@ private:
inline void _failed(InstanceLoId id, const std::string& err);
inline void _set_receiver_eof(InstanceLoId id);
inline bool _is_receiver_eof(InstanceLoId id);
- inline void _turn_off_channel(InstanceLoId id, bool cleanup = false);
+ inline void _turn_off_channel(InstanceLoId id,
std::unique_lock<std::mutex>& with_lock);
void get_max_min_rpc_time(int64_t* max_time, int64_t* min_time);
int64_t get_sum_rpc_time();
@@ -255,7 +242,6 @@ private:
std::shared_ptr<Dependency> _queue_dependency = nullptr;
std::shared_ptr<Dependency> _finish_dependency = nullptr;
std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
- std::atomic<bool> _should_stop = false;
ExchangeSinkLocalState* _parent = nullptr;
};
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 5b637a32672..2d79b0d8b2b 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -23,6 +23,7 @@
#include <gen_cpp/types.pb.h>
#include <memory>
+#include <mutex>
#include <random>
#include "common/status.h"
@@ -31,6 +32,8 @@
#include "pipeline/exec/operator.h"
#include "pipeline/exec/sort_source_operator.h"
#include "pipeline/local_exchange/local_exchange_sink_operator.h"
+#include "util/runtime_profile.h"
+#include "util/uid_util.h"
#include "vec/columns/column_const.h"
#include "vec/exprs/vexpr.h"
@@ -58,8 +61,10 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
_blocks_sent_counter = ADD_COUNTER_WITH_LEVEL(_profile, "BlocksProduced",
TUnit::UNIT, 1);
_overall_throughput = _profile->add_derived_counter(
"OverallThroughput", TUnit::BYTES_PER_SECOND,
- std::bind<int64_t>(&RuntimeProfile::units_per_second,
_bytes_sent_counter,
- _profile->total_time_counter()),
+ [this]() {
+ return RuntimeProfile::units_per_second(_bytes_sent_counter,
+
_profile->total_time_counter());
+ },
"");
_merge_block_timer = ADD_TIMER(profile(), "MergeBlockTime");
_local_bytes_send_counter = ADD_COUNTER(_profile, "LocalBytesSent",
TUnit::BYTES);
@@ -74,15 +79,15 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
const auto& fragment_instance_id = p._dests[i].fragment_instance_id;
if (fragment_id_to_channel_index.find(fragment_instance_id.lo) ==
fragment_id_to_channel_index.end()) {
- channel_shared_ptrs.emplace_back(
- new vectorized::PipChannel(this, p._row_desc,
p._dests[i].brpc_server,
- fragment_instance_id,
p._dest_node_id));
- fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
- channel_shared_ptrs.size() -
1);
- channels.push_back(channel_shared_ptrs.back().get());
+ channels.push_back(std::make_shared<vectorized::Channel>(
+ this, p._dests[i].brpc_server, fragment_instance_id,
p._dest_node_id));
+ fragment_id_to_channel_index.emplace(fragment_instance_id.lo,
channels.size() - 1);
+
+ if (fragment_instance_id.hi != -1 && fragment_instance_id.lo !=
-1) {
+ _working_channels_count++;
+ }
} else {
- channel_shared_ptrs.emplace_back(
-
channel_shared_ptrs[fragment_id_to_channel_index[fragment_instance_id.lo]]);
+
channels.emplace_back(channels[fragment_id_to_channel_index[fragment_instance_id.lo]]);
}
}
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
@@ -97,6 +102,24 @@ Status ExchangeSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& inf
return Status::OK();
}
+void ExchangeSinkLocalState::on_channel_finished(InstanceLoId channel_id) {
+ std::lock_guard<std::mutex> lock(_finished_channels_mutex);
+
+ if (_finished_channels.contains(channel_id)) {
+ LOG(WARNING) << "query: " << print_id(_state->query_id())
+ << ", on_channel_finished on already finished channel: "
<< channel_id;
+ return;
+ } else {
+ _finished_channels.emplace(channel_id);
+ if (_working_channels_count.fetch_sub(1) == 1) {
+ set_reach_limit();
+ if (_finish_dependency) {
+ _finish_dependency->set_ready();
+ }
+ }
+ }
+}
+
Status ExchangeSinkLocalState::open(RuntimeState* state) {
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
@@ -133,7 +156,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
_queue_dependency = Dependency::create_shared(_parent->operator_id(),
_parent->node_id(),
"ExchangeSinkQueueDependency", true);
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
- _finish_dependency->block();
}
if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1)
&&
@@ -145,7 +167,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
vectorized::BroadcastPBlockHolderMemLimiter::create_shared(_broadcast_dependency);
} else if (local_size > 0) {
size_t dep_id = 0;
- for (auto* channel : channels) {
+ for (auto& channel : channels) {
if (channel->is_local()) {
if (auto dep = channel->get_local_channel_dependency()) {
_local_channels_dependency.push_back(dep);
@@ -160,16 +182,18 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
}
if (_part_type == TPartitionType::HASH_PARTITIONED) {
_partition_count = channels.size();
- _partitioner.reset(new
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
- channels.size()));
+ _partitioner =
+
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+ channels.size());
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})",
_partition_count));
} else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
- _partition_count = channel_shared_ptrs.size();
- _partitioner.reset(new
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
- channel_shared_ptrs.size()));
+ _partition_count = channels.size();
+ _partitioner =
+
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+ channels.size());
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
_profile->add_info_string("Partitioner",
@@ -215,12 +239,13 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
} else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
_partition_count =
channels.size() *
config::table_sink_partition_write_max_partition_nums_per_writer;
- _partitioner.reset(new
vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>(
- _partition_count));
- _partition_function.reset(new
HashPartitionFunction(_partitioner.get()));
+ _partitioner =
+
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
+ _partition_count);
+ _partition_function =
std::make_unique<HashPartitionFunction>(_partitioner.get());
- scale_writer_partitioning_exchanger.reset(new
vectorized::ScaleWriterPartitioningExchanger<
- HashPartitionFunction>(
+ scale_writer_partitioning_exchanger = std::make_unique<
+
vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>>(
channels.size(), *_partition_function, _partition_count,
channels.size(), 1,
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
/
state->task_num() ==
@@ -233,7 +258,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
0
?
config::table_sink_partition_write_min_data_processed_rebalance_threshold
:
config::table_sink_partition_write_min_data_processed_rebalance_threshold /
- state->task_num()));
+ state->task_num());
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
@@ -353,7 +378,7 @@ void
ExchangeSinkOperatorX::_handle_eof_channel(RuntimeState* state, ChannelPtrT
Status st) {
channel->set_receiver_eof(st);
// Chanel will not send RPC to the downstream when eof, so close chanel by
OK status.
- static_cast<void>(channel->close(state, Status::OK()));
+ static_cast<void>(channel->close(state));
}
Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
block, bool eos) {
@@ -362,7 +387,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
SCOPED_TIMER(local_state.exec_time_counter());
local_state._peak_memory_usage_counter->set(local_state._mem_tracker->peak_consumption());
bool all_receiver_eof = true;
- for (auto* channel : local_state.channels) {
+ for (auto& channel : local_state.channels) {
if (!channel->is_receiver_eof()) {
all_receiver_eof = false;
break;
@@ -380,13 +405,13 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
if (!block->empty()) {
Status status;
size_t idx = 0;
- for (auto* channel : local_state.channels) {
+ for (auto& channel : local_state.channels) {
if (!channel->is_receiver_eof()) {
// If this channel is the last, we can move this block
to downstream pipeline.
// Otherwise, this block also need to be broadcasted
to other channels so should be copied.
DCHECK_GE(local_state._last_local_channel_idx, 0);
status = channel->send_local_block(
- block, idx ==
local_state._last_local_channel_idx);
+ block, eos, idx ==
local_state._last_local_channel_idx);
HANDLE_CHANNEL_STATUS(state, channel, status);
}
idx++;
@@ -415,7 +440,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
size_t idx = 0;
bool moved = false;
- for (auto* channel : local_state.channels) {
+ for (auto& channel : local_state.channels) {
if (!channel->is_receiver_eof()) {
Status status;
if (channel->is_local()) {
@@ -423,7 +448,8 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
// Otherwise, this block also need to be
broadcasted to other channels so should be copied.
DCHECK_GE(local_state._last_local_channel_idx,
0);
status = channel->send_local_block(
- &cur_block, idx ==
local_state._last_local_channel_idx);
+ &cur_block, eos,
+ idx ==
local_state._last_local_channel_idx);
moved = idx ==
local_state._last_local_channel_idx;
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
@@ -445,21 +471,18 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
}
} else if (_part_type == TPartitionType::RANDOM) {
// 1. select channel
- vectorized::PipChannel* current_channel =
- local_state.channels[local_state.current_channel_idx];
+ auto& current_channel =
local_state.channels[local_state.current_channel_idx];
if (!current_channel->is_receiver_eof()) {
// 2. serialize, send and rollover block
if (current_channel->is_local()) {
- auto status = current_channel->send_local_block(block, true);
+ auto status = current_channel->send_local_block(block, eos,
true);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
- RETURN_IF_ERROR(local_state._serializer.serialize_block(
- block, current_channel->ch_cur_pb_block()));
- auto status =
-
current_channel->send_remote_block(current_channel->ch_cur_pb_block(), eos);
+ auto pblock = std::make_unique<PBlock>();
+ RETURN_IF_ERROR(local_state._serializer.serialize_block(block,
pblock.get()));
+ auto status =
current_channel->send_remote_block(std::move(pblock), eos);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
- current_channel->ch_roll_pb_block();
}
}
local_state.current_channel_idx =
@@ -480,7 +503,7 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
} else {
SCOPED_TIMER(local_state._distribute_rows_into_channels_timer);
RETURN_IF_ERROR(channel_add_rows(
- state, local_state.channel_shared_ptrs,
local_state._partition_count,
+ state, local_state.channels, local_state._partition_count,
local_state._partitioner->get_channel_ids().get<uint32_t>(), rows, block, eos));
}
} else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
@@ -552,21 +575,18 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
} else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
// Control the number of channels according to the flow, thereby
controlling the number of table sink writers.
// 1. select channel
- vectorized::PipChannel* current_channel =
- local_state.channels[local_state.current_channel_idx];
+ auto& current_channel =
local_state.channels[local_state.current_channel_idx];
if (!current_channel->is_receiver_eof()) {
// 2. serialize, send and rollover block
if (current_channel->is_local()) {
- auto status = current_channel->send_local_block(block, true);
+ auto status = current_channel->send_local_block(block, eos,
true);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
- RETURN_IF_ERROR(local_state._serializer.serialize_block(
- block, current_channel->ch_cur_pb_block()));
- auto status =
-
current_channel->send_remote_block(current_channel->ch_cur_pb_block(), eos);
+ auto pblock = std::make_unique<PBlock>();
+ RETURN_IF_ERROR(local_state._serializer.serialize_block(block,
pblock.get()));
+ auto status =
current_channel->send_remote_block(std::move(pblock), eos);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
- current_channel->ch_roll_pb_block();
}
_data_processed += block->bytes();
}
@@ -588,22 +608,19 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
Status final_st = Status::OK();
if (eos) {
local_state._serializer.reset_block();
- for (int i = 0; i < local_state.channels.size(); ++i) {
- Status st = local_state.channels[i]->close(state, Status::OK());
+ for (auto& channel : local_state.channels) {
+ Status st = channel->close(state);
if (!st.ok() && final_st.ok()) {
final_st = st;
}
}
- if (local_state._sink_buffer) {
- local_state._sink_buffer->set_should_stop();
- }
}
return final_st;
}
void ExchangeSinkLocalState::register_channels(pipeline::ExchangeSinkBuffer*
buffer) {
- for (auto channel : channels) {
- ((vectorized::PipChannel*)channel)->register_exchange_buffer(buffer);
+ for (auto& channel : channels) {
+ channel->register_exchange_buffer(buffer);
}
}
@@ -650,12 +667,12 @@ std::string ExchangeSinkLocalState::debug_string(int
indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}",
Base::debug_string(indentation_level));
if (_sink_buffer) {
- fmt::format_to(
- debug_string_buffer,
- ", Sink Buffer: (_should_stop = {}, _busy_channels = {},
_is_finishing = {}), "
- "_reach_limit: {}",
- _sink_buffer->_should_stop.load(),
_sink_buffer->_busy_channels.load(),
- _sink_buffer->_is_finishing.load(), _reach_limit.load());
+ fmt::format_to(debug_string_buffer,
+ ", Sink Buffer: (_is_finishing = {}, blocks in queue:
{}, queue capacity: "
+ "{}, queue dep: {}), _reach_limit: {}, working
channels: {}",
+ _sink_buffer->_is_finishing.load(),
_sink_buffer->_total_queue_size,
+ _sink_buffer->_queue_capacity,
(void*)_sink_buffer->_queue_dependency.get(),
+ _reach_limit.load(), _working_channels_count.load());
}
return fmt::to_string(debug_string_buffer);
}
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index a4f78bdf61c..f0cabb1ffde 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -19,7 +19,9 @@
#include <stdint.h>
+#include <atomic>
#include <memory>
+#include <mutex>
#include "common/status.h"
#include "exchange_sink_buffer.h"
@@ -53,13 +55,10 @@ private:
public:
ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
- : Base(parent, state),
- current_channel_idx(0),
- only_local_exchange(false),
- _serializer(this) {
+ : Base(parent, state), _serializer(this) {
_finish_dependency =
std::make_shared<Dependency>(parent->operator_id(),
parent->node_id(),
- parent->get_name() +
"_FINISH_DEPENDENCY", true);
+ parent->get_name() +
"_FINISH_DEPENDENCY", false);
}
std::vector<Dependency*> dependencies() const override {
@@ -98,10 +97,11 @@ public:
return Status::OK();
}
Status _send_new_partition_batch();
- std::vector<vectorized::PipChannel*> channels;
- std::vector<std::shared_ptr<vectorized::PipChannel>> channel_shared_ptrs;
- int current_channel_idx; // index of current channel to send to if _random
== true
- bool only_local_exchange;
+ std::vector<std::shared_ptr<vectorized::Channel>> channels;
+ int current_channel_idx {0}; // index of current channel to send to if
_random == true
+ bool only_local_exchange {false};
+
+ void on_channel_finished(InstanceLoId channel_id);
// for external table sink hash partition
std::unique_ptr<vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>>
@@ -109,9 +109,8 @@ public:
private:
friend class ExchangeSinkOperatorX;
- friend class vectorized::Channel<ExchangeSinkLocalState>;
- friend class vectorized::PipChannel;
- friend class vectorized::BlockSerializer<ExchangeSinkLocalState>;
+ friend class vectorized::Channel;
+ friend class vectorized::BlockSerializer;
std::unique_ptr<ExchangeSinkBuffer> _sink_buffer = nullptr;
RuntimeProfile::Counter* _serialize_batch_timer = nullptr;
@@ -138,7 +137,7 @@ private:
std::shared_ptr<vectorized::BroadcastPBlockHolderMemLimiter>
_broadcast_pb_mem_limiter;
size_t _rpc_channels_num = 0;
- vectorized::BlockSerializer<ExchangeSinkLocalState> _serializer;
+ vectorized::BlockSerializer _serializer;
std::shared_ptr<Dependency> _queue_dependency = nullptr;
std::shared_ptr<Dependency> _broadcast_dependency = nullptr;
@@ -187,6 +186,10 @@ private:
std::unique_ptr<HashPartitionFunction> _partition_function = nullptr;
std::atomic<bool> _reach_limit = false;
int _last_local_channel_idx = -1;
+
+ std::atomic_int _working_channels_count = 0;
+ std::set<InstanceLoId> _finished_channels;
+ std::mutex _finished_channels_mutex;
};
class ExchangeSinkOperatorX final : public
DataSinkOperatorX<ExchangeSinkLocalState> {
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 0c16ae115af..7c9c38ece5c 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -31,9 +31,7 @@ namespace doris::pipeline {
ResultFileSinkLocalState::ResultFileSinkLocalState(DataSinkOperatorXBase*
parent,
RuntimeState* state)
- : AsyncWriterSink<vectorized::VFileResultWriter,
ResultFileSinkOperatorX>(parent, state),
- _serializer(
-
std::make_unique<vectorized::BlockSerializer<ResultFileSinkLocalState>>(this))
{}
+ : AsyncWriterSink<vectorized::VFileResultWriter,
ResultFileSinkOperatorX>(parent, state) {}
ResultFileSinkOperatorX::ResultFileSinkOperatorX(int operator_id, const
RowDescriptor& row_desc,
const std::vector<TExpr>&
t_output_expr)
@@ -139,14 +137,6 @@ Status ResultFileSinkLocalState::close(RuntimeState*
state, Status exec_status)
return Base::close(state, exec_status);
}
-template <typename ChannelPtrType>
-void ResultFileSinkLocalState::_handle_eof_channel(RuntimeState* state,
ChannelPtrType channel,
- Status st) {
- channel->set_receiver_eof(st);
- // Chanel will not send RPC to the downstream when eof, so close chanel by
OK status.
- static_cast<void>(channel->close(state, Status::OK()));
-}
-
Status ResultFileSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
in_block, bool eos) {
auto& local_state = get_local_state(state);
SCOPED_TIMER(local_state.exec_time_counter());
diff --git a/be/src/pipeline/exec/result_file_sink_operator.h
b/be/src/pipeline/exec/result_file_sink_operator.h
index d23ceacb816..e9f2b8eeb9c 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.h
+++ b/be/src/pipeline/exec/result_file_sink_operator.h
@@ -21,10 +21,6 @@
#include "vec/sink/writer/vfile_result_writer.h"
namespace doris::vectorized {
-template <typename Parent>
-class BlockSerializer;
-template <typename Parent>
-class Channel;
class BroadcastPBlockHolder;
} // namespace doris::vectorized
@@ -47,13 +43,8 @@ public:
private:
friend class ResultFileSinkOperatorX;
- template <typename ChannelPtrType>
- void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel,
Status st);
-
std::shared_ptr<BufferControlBlock> _sender;
- std::vector<vectorized::Channel<ResultFileSinkLocalState>*> _channels;
- std::unique_ptr<vectorized::BlockSerializer<ResultFileSinkLocalState>>
_serializer;
std::shared_ptr<vectorized::BroadcastPBlockHolder> _block_holder;
int _sender_id;
};
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index a93033297a0..fc11d254061 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -55,8 +55,7 @@
namespace doris::vectorized {
-template <typename Parent>
-Status Channel<Parent>::init(RuntimeState* state) {
+Status Channel::init(RuntimeState* state) {
if (_brpc_dest_addr.hostname.empty()) {
LOG(WARNING) << "there is no brpc destination address's hostname"
", maybe version is not compatible.";
@@ -65,9 +64,11 @@ Status Channel<Parent>::init(RuntimeState* state) {
if (state->query_options().__isset.enable_local_exchange) {
_is_local &= state->query_options().enable_local_exchange;
}
+
if (_is_local) {
return Status::OK();
}
+
if (_brpc_dest_addr.hostname == BackendOptions::get_localhost()) {
_brpc_stub =
state->exec_env()->brpc_internal_client_cache()->get_client(
"127.0.0.1", _brpc_dest_addr.port);
@@ -84,8 +85,7 @@ Status Channel<Parent>::init(RuntimeState* state) {
return Status::OK();
}
-template <typename Parent>
-Status Channel<Parent>::open(RuntimeState* state) {
+Status Channel::open(RuntimeState* state) {
if (_is_local) {
auto st = _parent->state()->exec_env()->vstream_mgr()->find_recvr(
_fragment_instance_id, _dest_node_id, &_local_recvr);
@@ -95,19 +95,6 @@ Status Channel<Parent>::open(RuntimeState* state) {
}
}
_be_number = state->be_number();
- _brpc_request = std::make_shared<PTransmitDataParams>();
- // initialize brpc request
- _brpc_request->mutable_finst_id()->set_hi(_fragment_instance_id.hi);
- _brpc_request->mutable_finst_id()->set_lo(_fragment_instance_id.lo);
- _finst_id = _brpc_request->finst_id();
-
- _brpc_request->mutable_query_id()->set_hi(state->query_id().hi);
- _brpc_request->mutable_query_id()->set_lo(state->query_id().lo);
- _query_id = _brpc_request->query_id();
-
- _brpc_request->set_node_id(_dest_node_id);
- _brpc_request->set_sender_id(_parent->sender_id());
- _brpc_request->set_be_number(_be_number);
_brpc_timeout_ms = std::min(3600, state->execution_timeout()) * 1000;
@@ -117,23 +104,26 @@ Status Channel<Parent>::open(RuntimeState* state) {
// to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
// so the empty channel not need call function close_internal()
_need_close = (_fragment_instance_id.hi != -1 && _fragment_instance_id.lo
!= -1);
+
_state = state;
return Status::OK();
}
-std::shared_ptr<pipeline::Dependency>
PipChannel::get_local_channel_dependency() {
- if (!Channel<pipeline::ExchangeSinkLocalState>::_local_recvr) {
+std::shared_ptr<pipeline::Dependency> Channel::get_local_channel_dependency() {
+ if (!_local_recvr) {
return nullptr;
}
- return
Channel<pipeline::ExchangeSinkLocalState>::_local_recvr->get_local_channel_dependency(
- Channel<pipeline::ExchangeSinkLocalState>::_parent->sender_id());
+ return _local_recvr->get_local_channel_dependency(_parent->sender_id());
}
-Status PipChannel::send_remote_block(PBlock* block, bool eos, Status
exec_status) {
-
COUNTER_UPDATE(Channel<pipeline::ExchangeSinkLocalState>::_parent->blocks_sent_counter(),
1);
- std::unique_ptr<PBlock> pblock_ptr;
- pblock_ptr.reset(block);
+int64_t Channel::mem_usage() const {
+ auto* mutable_block = _serializer.get_block();
+ int64_t mem_usage = mutable_block ? mutable_block->allocated_bytes() : 0;
+ return mem_usage;
+}
+Status Channel::send_remote_block(std::unique_ptr<PBlock>&& block, bool eos) {
+ COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
if (eos) {
if (_eos_send) {
return Status::OK();
@@ -142,13 +132,13 @@ Status PipChannel::send_remote_block(PBlock* block, bool
eos, Status exec_status
}
}
if (eos || block->column_metas_size()) {
- RETURN_IF_ERROR(_buffer->add_block({this, std::move(pblock_ptr), eos,
exec_status}));
+ RETURN_IF_ERROR(_buffer->add_block({this, std::move(block), eos,
Status::OK()}));
}
return Status::OK();
}
-Status
PipChannel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
bool eos) {
-
COUNTER_UPDATE(Channel<pipeline::ExchangeSinkLocalState>::_parent->blocks_sent_counter(),
1);
+Status Channel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>&
block, bool eos) {
+ COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
if (eos) {
if (_eos_send) {
return Status::OK();
@@ -161,219 +151,89 @@ Status
PipChannel::send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>&
return Status::OK();
}
-Status PipChannel::send_current_block(bool eos, Status exec_status) {
- if (Channel<pipeline::ExchangeSinkLocalState>::is_local()) {
- return
Channel<pipeline::ExchangeSinkLocalState>::send_local_block(exec_status, eos);
+Status Channel::_send_current_block(bool eos) {
+ if (is_local()) {
+ return _send_local_block(eos);
}
-
SCOPED_CONSUME_MEM_TRACKER(Channel<pipeline::ExchangeSinkLocalState>::_parent->mem_tracker());
- RETURN_IF_ERROR(send_remote_block(_pblock.release(), eos, exec_status));
- return Status::OK();
+
+ return send_remote_block(std::move(_pblock), eos);
}
-template <typename Parent>
-Status Channel<Parent>::send_current_block(bool eos, Status exec_status) {
- // FIXME: Now, local exchange will cause the performance problem is in a
multi-threaded scenario
- // so this feature is turned off here by default. We need to re-examine
this logic
- if (is_local()) {
- return send_local_block(exec_status, eos);
+Status Channel::_send_local_block(bool eos) {
+ Block block;
+ if (_serializer.get_block() != nullptr) {
+ block = _serializer.get_block()->to_block();
+
_serializer.get_block()->set_mutable_columns(block.clone_empty_columns());
}
- SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
- if (eos) {
- RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1));
+
+ if (!block.empty() || eos) {
+ RETURN_IF_ERROR(send_local_block(&block, eos, true));
}
- RETURN_IF_ERROR(send_remote_block(_ch_cur_pb_block, eos, exec_status));
- ch_roll_pb_block();
return Status::OK();
}
-template <typename Parent>
-Status Channel<Parent>::send_local_block(Status exec_status, bool eos) {
- Block block = _serializer.get_block()->to_block();
- _serializer.get_block()->set_mutable_columns(block.clone_empty_columns());
- if (_recvr_is_valid()) {
- if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState,
Parent>) {
- SCOPED_TIMER(_parent->local_send_timer());
- COUNTER_UPDATE(_parent->local_bytes_send_counter(), block.bytes());
- COUNTER_UPDATE(_parent->local_sent_rows(), block.rows());
- COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
- }
+Status Channel::send_local_block(Block* block, bool eos, bool can_be_moved) {
+ SCOPED_TIMER(_parent->local_send_timer());
- _local_recvr->add_block(&block, _parent->sender_id(), true);
- if (eos) {
- _local_recvr->remove_sender(_parent->sender_id(), _be_number,
exec_status);
+ if (eos) {
+ if (_eos_send) {
+ return Status::OK();
+ } else {
+ _eos_send = true;
}
- return Status::OK();
- } else {
- _serializer.reset_block();
- return _receiver_status;
}
-}
-template <typename Parent>
-Status Channel<Parent>::send_local_block(Block* block, bool can_be_moved) {
- if (_recvr_is_valid()) {
- if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState,
Parent>) {
- SCOPED_TIMER(_parent->local_send_timer());
- COUNTER_UPDATE(_parent->local_bytes_send_counter(),
block->bytes());
- COUNTER_UPDATE(_parent->local_sent_rows(), block->rows());
- COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
- }
- _local_recvr->add_block(block, _parent->sender_id(), can_be_moved);
- return Status::OK();
- } else {
+ if (is_receiver_eof()) {
return _receiver_status;
}
-}
-template <typename Parent>
-Status Channel<Parent>::send_remote_block(PBlock* block, bool eos, Status
exec_status) {
- if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>)
{
+ auto receiver_status = _recvr_status();
+ if (receiver_status.ok()) {
+ COUNTER_UPDATE(_parent->local_bytes_send_counter(), block->bytes());
+ COUNTER_UPDATE(_parent->local_sent_rows(), block->rows());
COUNTER_UPDATE(_parent->blocks_sent_counter(), 1);
- }
-
- if (_send_remote_block_callback == nullptr) {
- _send_remote_block_callback =
DummyBrpcCallback<PTransmitDataResult>::create_shared();
- } else {
- RETURN_IF_ERROR(_wait_last_brpc());
- _send_remote_block_callback->cntl_->Reset();
- }
- VLOG_ROW << "Channel<Parent>::send_batch() instance_id=" <<
print_id(_fragment_instance_id)
- << " dest_node=" << _dest_node_id << " to_host=" <<
_brpc_dest_addr.hostname
- << " _packet_seq=" << _packet_seq << " row_desc=" <<
_row_desc.debug_string();
-
- _brpc_request->set_eos(eos);
- if (!exec_status.ok()) {
- exec_status.to_protobuf(_brpc_request->mutable_exec_status());
- }
- if (block != nullptr && !block->column_metas().empty()) {
- _brpc_request->set_allocated_block(block);
- }
- _brpc_request->set_packet_seq(_packet_seq++);
- _send_remote_block_callback->cntl_->set_timeout_ms(_brpc_timeout_ms);
- if (config::exchange_sink_ignore_eovercrowded) {
- _send_remote_block_callback->cntl_->ignore_eovercrowded();
- }
-
- {
- auto send_remote_block_closure =
- AutoReleaseClosure<PTransmitDataParams,
DummyBrpcCallback<PTransmitDataResult>>::
- create_unique(_brpc_request,
_send_remote_block_callback);
- if (enable_http_send_block(*_brpc_request)) {
- RETURN_IF_ERROR(transmit_block_httpv2(
- _state->exec_env(), std::move(send_remote_block_closure),
_brpc_dest_addr));
- } else {
- transmit_blockv2(*_brpc_stub,
std::move(send_remote_block_closure));
+ const auto sender_id = _parent->sender_id();
+ if (!block->empty()) [[likely]] {
+ _local_recvr->add_block(block, sender_id, can_be_moved);
}
- }
- if (block != nullptr) {
- static_cast<void>(_brpc_request->release_block());
- }
- return Status::OK();
-}
-
-template <typename Parent>
-Status Channel<Parent>::add_rows(Block* block, const std::vector<uint32_t>&
rows, bool eos) {
- if (_fragment_instance_id.lo == -1) {
+ if (eos) [[unlikely]] {
+ _local_recvr->remove_sender(sender_id, _be_number, Status::OK());
+ _parent->on_channel_finished(_fragment_instance_id.lo);
+ }
return Status::OK();
+ } else {
+ _receiver_status = std::move(receiver_status);
+ _parent->on_channel_finished(_fragment_instance_id.lo);
+ return _receiver_status;
}
-
- bool serialized = false;
- RETURN_IF_ERROR(
- _serializer.next_serialized_block(block, _ch_cur_pb_block, 1,
&serialized, eos, &rows));
- if (serialized) {
- RETURN_IF_ERROR(send_current_block(false, Status::OK()));
- }
-
- return Status::OK();
}
-template <typename Parent>
-Status Channel<Parent>::close_wait(RuntimeState* state) {
- if (_need_close) {
- Status st = _wait_last_brpc();
- if (st.is<ErrorCode::END_OF_FILE>()) {
- st = Status::OK();
- } else if (!st.ok()) {
- state->log_error(st.to_string());
- }
- _need_close = false;
- return st;
+Status Channel::close(RuntimeState* state) {
+ if (_closed) {
+ return Status::OK();
}
- _serializer.reset_block();
- return Status::OK();
-}
+ _closed = true;
-template <typename Parent>
-Status Channel<Parent>::close_internal(Status exec_status) {
if (!_need_close) {
return Status::OK();
}
- VLOG_RPC << "Channel::close_internal() instance_id=" <<
print_id(_fragment_instance_id)
- << " dest_node=" << _dest_node_id << " #rows= "
- << ((_serializer.get_block() == nullptr) ? 0 :
_serializer.get_block()->rows())
- << " receiver status: " << _receiver_status << ", exec_status: "
<< exec_status;
+
if (is_receiver_eof()) {
_serializer.reset_block();
return Status::OK();
- }
- Status status;
- if (_serializer.get_block() != nullptr && _serializer.get_block()->rows()
> 0) {
- status = send_current_block(true, exec_status);
} else {
- SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
- if (is_local()) {
- if (_recvr_is_valid()) {
- _local_recvr->remove_sender(_parent->sender_id(), _be_number,
exec_status);
- }
- } else {
- // Non pipeline engine will send an empty eos block
- status = send_remote_block((PBlock*)nullptr, true, exec_status);
- }
+ return _send_current_block(true);
}
- // Don't wait for the last packet to finish, left it to close_wait.
- if (status.is<ErrorCode::END_OF_FILE>()) {
- return Status::OK();
- } else {
- return status;
- }
-}
-
-template <typename Parent>
-Status Channel<Parent>::close(RuntimeState* state, Status exec_status) {
- if (_closed) {
- return Status::OK();
- }
- _closed = true;
-
- Status st = close_internal(exec_status);
- if (!st.ok()) {
- state->log_error(st.to_string());
- }
- return st;
}
-template <typename Parent>
-int64_t Channel<Parent>::mem_usage() const {
- auto* mutable_block = _serializer.get_block();
- int64_t mem_usage = mutable_block ? mutable_block->allocated_bytes() : 0;
- return mem_usage;
-}
-
-template <typename Parent>
-void Channel<Parent>::ch_roll_pb_block() {
- _ch_cur_pb_block = (_ch_cur_pb_block == &_ch_pb_block1 ? &_ch_pb_block2 :
&_ch_pb_block1);
-}
-
-template <typename Parent>
-BlockSerializer<Parent>::BlockSerializer(Parent* parent, bool is_local)
+BlockSerializer::BlockSerializer(pipeline::ExchangeSinkLocalState* parent,
bool is_local)
: _parent(parent), _is_local(is_local),
_batch_size(parent->state()->batch_size()) {}
-template <typename Parent>
-Status BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock*
dest, int num_receivers,
- bool* serialized, bool
eos,
- const
std::vector<uint32_t>* rows) {
+Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, int
num_receivers,
+ bool* serialized, bool eos,
+ const std::vector<uint32_t>*
rows) {
if (_mutable_block == nullptr) {
SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
_mutable_block = MutableBlock::create_unique(block->clone_empty());
@@ -381,9 +241,6 @@ Status
BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* dest
{
SCOPED_CONSUME_MEM_TRACKER(_parent->mem_tracker());
- if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState,
Parent>) {
- SCOPED_TIMER(_parent->merge_block_timer());
- }
if (rows) {
if (!rows->empty()) {
const auto* begin = rows->data();
@@ -405,8 +262,7 @@ Status
BlockSerializer<Parent>::next_serialized_block(Block* block, PBlock* dest
return Status::OK();
}
-template <typename Parent>
-Status BlockSerializer<Parent>::serialize_block(PBlock* dest, int
num_receivers) {
+Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers) {
if (_mutable_block && _mutable_block->rows() > 0) {
auto block = _mutable_block->to_block();
RETURN_IF_ERROR(serialize_block(&block, dest, num_receivers));
@@ -417,29 +273,20 @@ Status BlockSerializer<Parent>::serialize_block(PBlock*
dest, int num_receivers)
return Status::OK();
}
-template <typename Parent>
-Status BlockSerializer<Parent>::serialize_block(const Block* src, PBlock*
dest, int num_receivers) {
- if constexpr (!std::is_same_v<pipeline::ResultFileSinkLocalState, Parent>)
{
- SCOPED_TIMER(_parent->_serialize_batch_timer);
- dest->Clear();
- size_t uncompressed_bytes = 0, compressed_bytes = 0;
- RETURN_IF_ERROR(src->serialize(
- _parent->_state->be_exec_version(), dest, &uncompressed_bytes,
&compressed_bytes,
- _parent->compression_type(),
_parent->transfer_large_data_by_brpc()));
- COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes *
num_receivers);
- COUNTER_UPDATE(_parent->_uncompressed_bytes_counter,
uncompressed_bytes * num_receivers);
- COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time());
-
_parent->get_query_statistics_ptr()->add_shuffle_send_bytes(compressed_bytes *
-
num_receivers);
- _parent->get_query_statistics_ptr()->add_shuffle_send_rows(src->rows()
* num_receivers);
- }
+Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, int
num_receivers) {
+ SCOPED_TIMER(_parent->_serialize_batch_timer);
+ dest->Clear();
+ size_t uncompressed_bytes = 0, compressed_bytes = 0;
+ RETURN_IF_ERROR(src->serialize(_parent->_state->be_exec_version(), dest,
&uncompressed_bytes,
+ &compressed_bytes,
_parent->compression_type(),
+ _parent->transfer_large_data_by_brpc()));
+ COUNTER_UPDATE(_parent->_bytes_sent_counter, compressed_bytes *
num_receivers);
+ COUNTER_UPDATE(_parent->_uncompressed_bytes_counter, uncompressed_bytes *
num_receivers);
+ COUNTER_UPDATE(_parent->_compress_timer, src->get_compress_time());
+
_parent->get_query_statistics_ptr()->add_shuffle_send_bytes(compressed_bytes *
num_receivers);
+ _parent->get_query_statistics_ptr()->add_shuffle_send_rows(src->rows() *
num_receivers);
return Status::OK();
}
-template class Channel<pipeline::ExchangeSinkLocalState>;
-template class Channel<pipeline::ResultFileSinkLocalState>;
-template class BlockSerializer<pipeline::ResultFileSinkLocalState>;
-template class BlockSerializer<pipeline::ExchangeSinkLocalState>;
-
} // namespace doris::vectorized
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 12e72ee9418..e4156dc4d44 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -72,13 +72,10 @@ class ExchangeSinkLocalState;
} // namespace pipeline
namespace vectorized {
-template <typename>
-class Channel;
-template <typename Parent>
class BlockSerializer {
public:
- BlockSerializer(Parent* parent, bool is_local = true);
+ BlockSerializer(pipeline::ExchangeSinkLocalState* parent, bool is_local =
true);
Status next_serialized_block(Block* src, PBlock* dest, int num_receivers,
bool* serialized,
bool eos, const std::vector<uint32_t>* rows =
nullptr);
Status serialize_block(PBlock* dest, int num_receivers = 1);
@@ -92,7 +89,7 @@ public:
bool is_local() const { return _is_local; }
private:
- Parent* _parent;
+ pipeline::ExchangeSinkLocalState* _parent;
std::unique_ptr<MutableBlock> _mutable_block;
bool _is_local;
@@ -106,7 +103,6 @@ struct ShuffleChannelIds {
}
};
-template <typename Parent>
class Channel {
public:
friend class pipeline::ExchangeSinkBuffer;
@@ -114,23 +110,15 @@ public:
// combination. buffer_size is specified in bytes and a soft limit on
// how much tuple data is getting accumulated before being sent; it only
applies
// when data is added via add_row() and not sent directly via send_batch().
- Channel(Parent* parent, const RowDescriptor& row_desc, TNetworkAddress
brpc_dest,
+ Channel(pipeline::ExchangeSinkLocalState* parent, TNetworkAddress
brpc_dest,
TUniqueId fragment_instance_id, PlanNodeId dest_node_id)
: _parent(parent),
- _row_desc(row_desc),
_fragment_instance_id(std::move(fragment_instance_id)),
_dest_node_id(dest_node_id),
- _need_close(false),
- _closed(false),
_brpc_dest_addr(std::move(brpc_dest)),
_is_local((_brpc_dest_addr.hostname ==
BackendOptions::get_localhost()) &&
(_brpc_dest_addr.port == config::brpc_port)),
- _serializer(_parent, _is_local) {
- if (_is_local) {
- VLOG_NOTICE << "will use local Exchange, dest_node_id is : " <<
_dest_node_id;
- }
- _ch_cur_pb_block = &_ch_pb_block1;
- }
+ _serializer(_parent, _is_local) {}
virtual ~Channel() = default;
@@ -139,38 +127,13 @@ public:
Status init(RuntimeState* state);
Status open(RuntimeState* state);
- // Asynchronously sends a row batch.
- // Returns the status of the most recently finished transmit_data
- // rpc (or OK if there wasn't one that hasn't been reported yet).
- // if batch is nullptr, send the eof packet
- virtual Status send_remote_block(PBlock* block, bool eos = false,
- Status exec_status = Status::OK());
-
- virtual Status
send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
- bool eos = false) {
- return Status::InternalError("Send BroadcastPBlockHolder is not
allowed!");
- }
-
- virtual Status add_rows(Block* block, const std::vector<uint32_t>& row,
bool eos);
-
- virtual Status send_current_block(bool eos, Status exec_status);
-
- Status send_local_block(Status exec_status, bool eos = false);
-
- Status send_local_block(Block* block, bool can_be_moved);
+ Status send_local_block(Block* block, bool eos, bool can_be_moved);
// Flush buffered rows and close channel. This function don't wait the
response
// of close operation, client should call close_wait() to finish channel's
close.
// We split one close operation into two phases in order to make multiple
channels
// can run parallel.
- Status close(RuntimeState* state, Status exec_status);
-
- // Get close wait's response, to finish channel close operation.
- Status close_wait(RuntimeState* state);
-
- int64_t num_data_bytes_sent() const { return _num_data_bytes_sent; }
-
- PBlock* ch_cur_pb_block() { return _ch_cur_pb_block; }
+ Status close(RuntimeState* state);
std::string get_fragment_instance_id_str() {
UniqueId uid(_fragment_instance_id);
return uid.to_string();
@@ -178,150 +141,40 @@ public:
bool is_local() const { return _is_local; }
- virtual void ch_roll_pb_block();
-
bool is_receiver_eof() const { return
_receiver_status.is<ErrorCode::END_OF_FILE>(); }
void set_receiver_eof(Status st) { _receiver_status = st; }
int64_t mem_usage() const;
-protected:
- bool _recvr_is_valid() {
- if (_local_recvr && !_local_recvr->is_closed()) {
- return true;
- }
- _receiver_status = Status::EndOfFile(
- "local data stream receiver closed"); // local data stream
receiver closed
- return false;
- }
-
- Status _wait_last_brpc() {
- if (_send_remote_block_callback == nullptr) {
- return Status::OK();
- }
- _send_remote_block_callback->join();
- if (_send_remote_block_callback->cntl_->Failed()) {
- std::string err = fmt::format(
- "failed to send brpc batch, error={}, error_text={},
client: {}, "
- "latency = {}",
- berror(_send_remote_block_callback->cntl_->ErrorCode()),
- _send_remote_block_callback->cntl_->ErrorText(),
- BackendOptions::get_localhost(),
- _send_remote_block_callback->cntl_->latency_us());
- LOG(WARNING) << err;
- return Status::RpcError(err);
- }
- _receiver_status =
Status::create(_send_remote_block_callback->response_->status());
- return _receiver_status;
- }
-
- Status close_internal(Status exec_status);
-
- Parent* _parent = nullptr;
-
- const RowDescriptor& _row_desc;
- const TUniqueId _fragment_instance_id;
- PlanNodeId _dest_node_id;
-
- // the number of RowBatch.data bytes sent successfully
- int64_t _num_data_bytes_sent {};
- int64_t _packet_seq {};
-
- bool _need_close;
- bool _closed;
- int _be_number;
-
- TNetworkAddress _brpc_dest_addr;
-
- PUniqueId _finst_id;
- PUniqueId _query_id;
- PBlock _pb_block;
- std::shared_ptr<PTransmitDataParams> _brpc_request = nullptr;
- std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
- std::shared_ptr<DummyBrpcCallback<PTransmitDataResult>>
_send_remote_block_callback;
- Status _receiver_status;
- int32_t _brpc_timeout_ms = 500;
- RuntimeState* _state = nullptr;
-
- bool _is_local;
- std::shared_ptr<VDataStreamRecvr> _local_recvr;
- // serialized blocks for broadcasting; we need two so we can write
- // one while the other one is still being sent.
- // Which is for same reason as `_cur_pb_block`, `_pb_block1` and
`_pb_block2`
- // in VDataStreamSender.
- PBlock* _ch_cur_pb_block = nullptr;
- PBlock _ch_pb_block1;
- PBlock _ch_pb_block2;
-
- BlockSerializer<Parent> _serializer;
-};
-
-#define HANDLE_CHANNEL_STATUS(state, channel, status) \
- do { \
- if (status.is<ErrorCode::END_OF_FILE>()) { \
- _handle_eof_channel(state, channel, status); \
- } else { \
- RETURN_IF_ERROR(status); \
- } \
- } while (0)
-
-class PipChannel final : public Channel<pipeline::ExchangeSinkLocalState> {
-public:
- PipChannel(pipeline::ExchangeSinkLocalState* parent, const RowDescriptor&
row_desc,
- const TNetworkAddress& brpc_dest, const TUniqueId&
fragment_instance_id,
- PlanNodeId dest_node_id)
- : Channel<pipeline::ExchangeSinkLocalState>(parent, row_desc,
brpc_dest,
- fragment_instance_id,
dest_node_id) {
- ch_roll_pb_block();
- }
-
- ~PipChannel() override { delete
Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block; }
-
- void ch_roll_pb_block() override {
- // We have two choices here.
- // 1. Use a PBlock pool and fetch an available PBlock if we need one.
In this way, we can
- // reuse the memory, but we have to use a lock to synchronize.
- // 2. Create a new PBlock every time. In this way we don't need a lock
but have to allocate
- // new memory.
- // Now we use the second way.
- Channel<pipeline::ExchangeSinkLocalState>::_ch_cur_pb_block = new
PBlock();
- }
-
// Asynchronously sends a block
// Returns the status of the most recently finished transmit_data
// rpc (or OK if there wasn't one that hasn't been reported yet).
// if batch is nullptr, send the eof packet
- Status send_remote_block(PBlock* block, bool eos = false,
- Status exec_status = Status::OK()) override;
-
- Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
- bool eos = false) override;
+ Status send_remote_block(std::unique_ptr<PBlock>&& block, bool eos =
false);
+ Status send_broadcast_block(std::shared_ptr<BroadcastPBlockHolder>& block,
bool eos = false);
- Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos)
override {
- if
(Channel<pipeline::ExchangeSinkLocalState>::_fragment_instance_id.lo == -1) {
+ Status add_rows(Block* block, const std::vector<uint32_t>& rows, bool eos)
{
+ if (_fragment_instance_id.lo == -1) {
return Status::OK();
}
bool serialized = false;
- _pblock = std::make_unique<PBlock>();
- RETURN_IF_ERROR(
-
Channel<pipeline::ExchangeSinkLocalState>::_serializer.next_serialized_block(
- block, _pblock.get(), 1, &serialized, eos, &rows));
+ if (_pblock == nullptr) {
+ _pblock = std::make_unique<PBlock>();
+ }
+ RETURN_IF_ERROR(_serializer.next_serialized_block(block,
_pblock.get(), 1, &serialized, eos,
+ &rows));
if (serialized) {
- Status exec_status = Status::OK();
- RETURN_IF_ERROR(send_current_block(eos, exec_status));
+ RETURN_IF_ERROR(_send_current_block(eos));
}
return Status::OK();
}
- // send _mutable_block
- Status send_current_block(bool eos, Status exec_status) override;
-
void register_exchange_buffer(pipeline::ExchangeSinkBuffer* buffer) {
_buffer = buffer;
-
_buffer->register_sink(Channel<pipeline::ExchangeSinkLocalState>::_fragment_instance_id);
+ _buffer->register_sink(_fragment_instance_id);
}
std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>>
get_send_callback(
@@ -337,12 +190,53 @@ public:
std::shared_ptr<pipeline::Dependency> get_local_channel_dependency();
-private:
+protected:
+ Status _send_local_block(bool eos);
+ Status _send_current_block(bool eos);
+
+ Status _recvr_status() const {
+ if (_local_recvr && !_local_recvr->is_closed()) {
+ return Status::OK();
+ }
+ return Status::EndOfFile(
+ "local data stream receiver closed"); // local data stream
receiver closed
+ }
+
+ pipeline::ExchangeSinkLocalState* _parent = nullptr;
+
+ const TUniqueId _fragment_instance_id;
+ PlanNodeId _dest_node_id;
+ bool _closed {false};
+ bool _need_close {false};
+ int _be_number;
+
+ TNetworkAddress _brpc_dest_addr;
+
+ PBlock _pb_block;
+ std::shared_ptr<PBackendService_Stub> _brpc_stub = nullptr;
+ Status _receiver_status;
+ int32_t _brpc_timeout_ms = 500;
+ RuntimeState* _state = nullptr;
+
+ bool _is_local;
+ std::shared_ptr<VDataStreamRecvr> _local_recvr;
+
+ BlockSerializer _serializer;
+
pipeline::ExchangeSinkBuffer* _buffer = nullptr;
bool _eos_send = false;
std::shared_ptr<pipeline::ExchangeSendCallback<PTransmitDataResult>>
_send_callback;
std::unique_ptr<PBlock> _pblock;
};
+#define HANDLE_CHANNEL_STATUS(state, channel, status) \
+ do { \
+ if (status.is<ErrorCode::END_OF_FILE>()) { \
+ _handle_eof_channel(state, channel, status); \
+ } else { \
+ RETURN_IF_ERROR(status); \
+ } \
+ } while (0)
+
} // namespace vectorized
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]