This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 591aee528d [Bug](exchange) change BlockSerializer from unique_ptr to
object (#22653)
591aee528d is described below
commit 591aee528d166ef88c24b67c763f3a9ffe516b3a
Author: Pxl <[email protected]>
AuthorDate: Mon Aug 7 14:47:21 2023 +0800
[Bug](exchange) change BlockSerializer from unique_ptr to object (#22653)
change BlockSerializer from unique_ptr to object
---
be/src/exec/data_sink.cpp | 8 +--
be/src/vec/sink/vdata_stream_sender.cpp | 119 ++++++++++++--------------------
be/src/vec/sink/vdata_stream_sender.h | 23 +++---
be/src/vec/sink/vresult_file_sink.cpp | 11 +--
be/src/vec/sink/vresult_file_sink.h | 9 +--
5 files changed, 71 insertions(+), 99 deletions(-)
diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index c5d5446611..bc914818e8 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -89,12 +89,12 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
// Result file sink is not the top sink
if (params.__isset.destinations && params.destinations.size() > 0) {
sink->reset(new doris::vectorized::VResultFileSink(
- pool, params.sender_id, row_desc,
thrift_sink.result_file_sink,
+ state, pool, params.sender_id, row_desc,
thrift_sink.result_file_sink,
params.destinations, 16 * 1024,
send_query_statistics_with_every_batch,
output_exprs, desc_tbl));
} else {
sink->reset(new doris::vectorized::VResultFileSink(
- pool, row_desc, thrift_sink.result_file_sink, 16 * 1024,
+ state, pool, row_desc, thrift_sink.result_file_sink, 16 *
1024,
send_query_statistics_with_every_batch, output_exprs));
}
break;
@@ -226,12 +226,12 @@ Status DataSink::create_data_sink(ObjectPool* pool, const
TDataSink& thrift_sink
// Result file sink is not the top sink
if (params.__isset.destinations && params.destinations.size() > 0) {
sink->reset(new doris::vectorized::VResultFileSink(
- pool, local_params.sender_id, row_desc,
thrift_sink.result_file_sink,
+ state, pool, local_params.sender_id, row_desc,
thrift_sink.result_file_sink,
params.destinations, 16 * 1024,
send_query_statistics_with_every_batch,
output_exprs, desc_tbl));
} else {
sink->reset(new doris::vectorized::VResultFileSink(
- pool, row_desc, thrift_sink.result_file_sink, 16 * 1024,
+ state, pool, row_desc, thrift_sink.result_file_sink, 16 *
1024,
send_query_statistics_with_every_batch, output_exprs));
}
break;
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index df705ee37b..2fd86e0c38 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -95,8 +95,6 @@ Status Channel::init(RuntimeState* state) {
_fragment_instance_id, _dest_node_id);
}
- _serializer.reset(new BlockSerializer(_parent, _is_local));
-
// In bucket shuffle join will set fragment_instance_id (-1, -1)
// to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
// so the empty channel not need call function close_internal()
@@ -113,7 +111,7 @@ Status Channel::send_current_block(bool eos) {
}
SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
if (eos) {
- RETURN_IF_ERROR(_serializer->serialize_block(_ch_cur_pb_block, 1));
+ RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1));
}
RETURN_IF_ERROR(send_block(_ch_cur_pb_block, eos));
ch_roll_pb_block();
@@ -122,8 +120,8 @@ Status Channel::send_current_block(bool eos) {
Status Channel::send_local_block(bool eos) {
SCOPED_TIMER(_parent->_local_send_timer);
- Block block = _serializer->get_block()->to_block();
-
_serializer->get_block()->set_muatable_columns(block.clone_empty_columns());
+ Block block = _serializer.get_block()->to_block();
+ _serializer.get_block()->set_muatable_columns(block.clone_empty_columns());
if (_recvr_is_valid()) {
COUNTER_UPDATE(_parent->_local_bytes_send_counter, block.bytes());
COUNTER_UPDATE(_parent->_local_sent_rows, block.rows());
@@ -134,7 +132,7 @@ Status Channel::send_local_block(bool eos) {
}
return Status::OK();
} else {
- _serializer->reset_block();
+ _serializer.reset_block();
return _receiver_status;
}
}
@@ -205,7 +203,7 @@ Status Channel::add_rows(Block* block, const
std::vector<int>& rows) {
bool serialized = false;
RETURN_IF_ERROR(
- _serializer->next_serialized_block(block, _ch_cur_pb_block, 1,
&serialized, &rows));
+ _serializer.next_serialized_block(block, _ch_cur_pb_block, 1,
&serialized, &rows));
if (serialized) {
RETURN_IF_ERROR(send_current_block(false));
}
@@ -224,9 +222,7 @@ Status Channel::close_wait(RuntimeState* state) {
_need_close = false;
return st;
}
- if (_serializer) {
- _serializer->reset_block();
- }
+ _serializer.reset_block();
return Status::OK();
}
@@ -236,14 +232,14 @@ Status Channel::close_internal() {
}
VLOG_RPC << "Channel::close() instance_id=" << _fragment_instance_id
<< " dest_node=" << _dest_node_id << " #rows= "
- << ((_serializer->get_block() == nullptr) ? 0 :
_serializer->get_block()->rows())
+ << ((_serializer.get_block() == nullptr) ? 0 :
_serializer.get_block()->rows())
<< " receiver status: " << _receiver_status;
if (is_receiver_eof()) {
- _serializer->reset_block();
+ _serializer.reset_block();
return Status::OK();
}
Status status;
- if (_serializer->get_block() != nullptr &&
_serializer->get_block()->rows() > 0) {
+ if (_serializer.get_block() != nullptr && _serializer.get_block()->rows()
> 0) {
status = send_current_block(true);
} else {
SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
@@ -286,6 +282,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state,
ObjectPool* pool, int
int per_channel_buffer_size,
bool
send_query_statistics_with_every_batch)
: _sender_id(sender_id),
+ _state(state),
_pool(pool),
_row_desc(row_desc),
_current_channel_idx(0),
@@ -299,7 +296,8 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state,
ObjectPool* pool, int
_blocks_sent_counter(nullptr),
_local_bytes_send_counter(nullptr),
_dest_node_id(sink.dest_node_id),
- _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc) {
+ _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc),
+ _serializer(this) {
DCHECK_GT(destinations.size(), 0);
DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
@@ -344,12 +342,13 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state,
ObjectPool* pool, int
}
}
-VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const
RowDescriptor& row_desc,
- PlanNodeId dest_node_id,
+VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool,
int sender_id,
+ const RowDescriptor& row_desc, PlanNodeId
dest_node_id,
const
std::vector<TPlanFragmentDestination>& destinations,
int per_channel_buffer_size,
bool
send_query_statistics_with_every_batch)
: _sender_id(sender_id),
+ _state(state),
_pool(pool),
_row_desc(row_desc),
_current_channel_idx(0),
@@ -365,7 +364,8 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int
sender_id, const RowD
_split_block_distribute_by_channel_timer(nullptr),
_blocks_sent_counter(nullptr),
_local_bytes_send_counter(nullptr),
- _dest_node_id(dest_node_id) {
+ _dest_node_id(dest_node_id),
+ _serializer(this) {
_cur_pb_block = &_pb_block1;
_name = "VDataStreamSender";
std::map<int64_t, int64_t> fragment_id_to_channel_index;
@@ -384,30 +384,6 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int
sender_id, const RowD
}
}
-VDataStreamSender::VDataStreamSender(ObjectPool* pool, const RowDescriptor&
row_desc,
- int per_channel_buffer_size,
- bool
send_query_statistics_with_every_batch)
- : _sender_id(0),
- _pool(pool),
- _row_desc(row_desc),
- _current_channel_idx(0),
- _profile(nullptr),
- _serialize_batch_timer(nullptr),
- _compress_timer(nullptr),
- _brpc_send_timer(nullptr),
- _brpc_wait_timer(nullptr),
- _bytes_sent_counter(nullptr),
- _local_send_timer(nullptr),
- _split_block_hash_compute_timer(nullptr),
- _split_block_distribute_by_channel_timer(nullptr),
- _blocks_sent_counter(nullptr),
- _peak_memory_usage_counter(nullptr),
- _local_bytes_send_counter(nullptr),
- _dest_node_id(0) {
- _cur_pb_block = &_pb_block1;
- _name = "VDataStreamSender";
-}
-
VDataStreamSender::~VDataStreamSender() {
_channel_shared_ptrs.clear();
}
@@ -429,7 +405,6 @@ Status VDataStreamSender::init(const TDataSink& tsink) {
Status VDataStreamSender::prepare(RuntimeState* state) {
RETURN_IF_ERROR(DataSink::prepare(state));
- _state = state;
std::vector<std::string> instances;
for (const auto& channel : _channels) {
@@ -454,8 +429,6 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state,
_row_desc));
}
- _serializer.reset(new BlockSerializer(this));
-
_bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
_uncompressed_bytes_counter = ADD_COUNTER(profile(),
"UncompressedRowBatchSize", TUnit::BYTES);
_local_sent_rows = ADD_COUNTER(profile(), "LocalSentRows", TUnit::UNIT);
@@ -521,27 +494,27 @@ Status VDataStreamSender::send(RuntimeState* state,
Block* block, bool eos) {
if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) {
#ifndef BROADCAST_ALL_CHANNELS
-#define BROADCAST_ALL_CHANNELS(PBLOCK, PBLOCK_TO_SEND, POST_PROCESS)
\
- {
\
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
\
- bool serialized = false;
\
- RETURN_IF_ERROR(
\
- _serializer->next_serialized_block(block, PBLOCK,
_channels.size(), &serialized)); \
- if (serialized) {
\
- Status status;
\
- for (auto channel : _channels) {
\
- if (!channel->is_receiver_eof()) {
\
- if (channel->is_local()) {
\
- status = channel->send_local_block(block);
\
- } else {
\
- SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
\
- status = channel->send_block(PBLOCK_TO_SEND, false);
\
- }
\
- HANDLE_CHANNEL_STATUS(state, channel, status);
\
- }
\
- }
\
- POST_PROCESS;
\
- }
\
+#define BROADCAST_ALL_CHANNELS(PBLOCK, PBLOCK_TO_SEND, POST_PROCESS)
\
+ {
\
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
\
+ bool serialized = false;
\
+ RETURN_IF_ERROR(
\
+ _serializer.next_serialized_block(block, PBLOCK,
_channels.size(), &serialized)); \
+ if (serialized) {
\
+ Status status;
\
+ for (auto channel : _channels) {
\
+ if (!channel->is_receiver_eof()) {
\
+ if (channel->is_local()) {
\
+ status = channel->send_local_block(block);
\
+ } else {
\
+ SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
\
+ status = channel->send_block(PBLOCK_TO_SEND, false);
\
+ }
\
+ HANDLE_CHANNEL_STATUS(state, channel, status);
\
+ }
\
+ }
\
+ POST_PROCESS;
\
+ }
\
}
#endif
// 1. serialize depends on it is not local exchange
@@ -574,7 +547,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block*
block, bool eos) {
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
RETURN_IF_ERROR(
- _serializer->serialize_block(block,
current_channel->ch_cur_pb_block()));
+ _serializer.serialize_block(block,
current_channel->ch_cur_pb_block()));
auto status =
current_channel->send_block(current_channel->ch_cur_pb_block(), eos);
HANDLE_CHANNEL_STATUS(state, current_channel, status);
current_channel->ch_roll_pb_block();
@@ -648,14 +621,14 @@ Status VDataStreamSender::send(RuntimeState* state,
Block* block, bool eos) {
}
Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) {
- if (_serializer->get_block() && _serializer->get_block()->rows() > 0) {
+ if (_serializer.get_block() && _serializer.get_block()->rows() > 0) {
BroadcastPBlockHolder* block_holder = nullptr;
RETURN_IF_ERROR(_get_next_available_buffer(&block_holder));
{
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
- Block block = _serializer->get_block()->to_block();
- RETURN_IF_ERROR(_serializer->serialize_block(&block,
block_holder->get_block(),
- _channels.size()));
+ Block block = _serializer.get_block()->to_block();
+ RETURN_IF_ERROR(_serializer.serialize_block(&block,
block_holder->get_block(),
+ _channels.size()));
Status status;
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
@@ -690,10 +663,10 @@ Status VDataStreamSender::close(RuntimeState* state,
Status exec_status) {
{
// send last block
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
- if (_serializer && _serializer->get_block() &&
_serializer->get_block()->rows() > 0) {
- Block block = _serializer->get_block()->to_block();
+ if (_serializer.get_block() && _serializer.get_block()->rows() >
0) {
+ Block block = _serializer.get_block()->to_block();
RETURN_IF_ERROR(
- _serializer->serialize_block(&block, _cur_pb_block,
_channels.size()));
+ _serializer.serialize_block(&block, _cur_pb_block,
_channels.size()));
Status status;
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 77dfe0b20a..770a381da4 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -97,14 +97,11 @@ public:
const std::vector<TPlanFragmentDestination>&
destinations,
int per_channel_buffer_size, bool
send_query_statistics_with_every_batch);
- VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor&
row_desc,
- PlanNodeId dest_node_id,
+ VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id,
+ const RowDescriptor& row_desc, PlanNodeId dest_node_id,
const std::vector<TPlanFragmentDestination>&
destinations,
int per_channel_buffer_size, bool
send_query_statistics_with_every_batch);
- VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_desc, int
per_channel_buffer_size,
- bool send_query_statistics_with_every_batch);
-
~VDataStreamSender() override;
Status init(const TDataSink& thrift_sink) override;
@@ -209,7 +206,7 @@ protected:
bool _only_local_exchange = false;
bool _enable_pipeline_exec = false;
- std::unique_ptr<BlockSerializer> _serializer;
+ BlockSerializer _serializer;
};
class Channel {
@@ -234,10 +231,10 @@ public:
_closed(false),
_brpc_dest_addr(brpc_dest),
_is_transfer_chain(is_transfer_chain),
-
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch)
{
- std::string localhost = BackendOptions::get_localhost();
- _is_local = (_brpc_dest_addr.hostname == localhost) &&
- (_brpc_dest_addr.port == config::brpc_port);
+
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch),
+ _is_local((_brpc_dest_addr.hostname ==
BackendOptions::get_localhost()) &&
+ (_brpc_dest_addr.port == config::brpc_port)),
+ _serializer(_parent, _is_local) {
if (_is_local) {
VLOG_NOTICE << "will use local Exchange, dest_node_id is : " <<
_dest_node_id;
}
@@ -385,7 +382,7 @@ protected:
PBlock _ch_pb_block1;
PBlock _ch_pb_block2;
- std::unique_ptr<BlockSerializer> _serializer;
+ BlockSerializer _serializer;
};
#define HANDLE_CHANNEL_STATUS(state, channel, status) \
@@ -490,7 +487,7 @@ public:
bool serialized = false;
_pblock = std::make_unique<PBlock>();
RETURN_IF_ERROR(
- _serializer->next_serialized_block(block, _pblock.get(), 1,
&serialized, &rows));
+ _serializer.next_serialized_block(block, _pblock.get(), 1,
&serialized, &rows));
if (serialized) {
RETURN_IF_ERROR(send_current_block(false));
}
@@ -506,7 +503,7 @@ public:
SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
if (eos) {
_pblock = std::make_unique<PBlock>();
- RETURN_IF_ERROR(_serializer->serialize_block(_pblock.get(), 1));
+ RETURN_IF_ERROR(_serializer.serialize_block(_pblock.get(), 1));
}
RETURN_IF_ERROR(send_block(_pblock.release(), eos));
return Status::OK();
diff --git a/be/src/vec/sink/vresult_file_sink.cpp
b/be/src/vec/sink/vresult_file_sink.cpp
index c5f4c0358e..5b2003d8c2 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -45,8 +45,9 @@ class TExpr;
namespace doris::vectorized {
-VResultFileSink::VResultFileSink(ObjectPool* pool, const RowDescriptor&
row_desc,
- const TResultFileSink& sink, int
per_channel_buffer_size,
+VResultFileSink::VResultFileSink(RuntimeState* state, ObjectPool* pool,
+ const RowDescriptor& row_desc, const
TResultFileSink& sink,
+ int per_channel_buffer_size,
bool send_query_statistics_with_every_batch,
const std::vector<TExpr>& t_output_expr)
: _t_output_expr(t_output_expr), _row_desc(row_desc) {
@@ -62,8 +63,8 @@ VResultFileSink::VResultFileSink(ObjectPool* pool, const
RowDescriptor& row_desc
_header = sink.header;
}
-VResultFileSink::VResultFileSink(ObjectPool* pool, int sender_id, const
RowDescriptor& row_desc,
- const TResultFileSink& sink,
+VResultFileSink::VResultFileSink(RuntimeState* state, ObjectPool* pool, int
sender_id,
+ const RowDescriptor& row_desc, const
TResultFileSink& sink,
const std::vector<TPlanFragmentDestination>&
destinations,
int per_channel_buffer_size,
bool send_query_statistics_with_every_batch,
@@ -77,7 +78,7 @@ VResultFileSink::VResultFileSink(ObjectPool* pool, int
sender_id, const RowDescr
_storage_type = sink.storage_backend_type;
_is_top_sink = false;
CHECK_EQ(destinations.size(), 1);
- _stream_sender.reset(new VDataStreamSender(pool, sender_id, row_desc,
sink.dest_node_id,
+ _stream_sender.reset(new VDataStreamSender(state, pool, sender_id,
row_desc, sink.dest_node_id,
destinations,
per_channel_buffer_size,
send_query_statistics_with_every_batch));
diff --git a/be/src/vec/sink/vresult_file_sink.h
b/be/src/vec/sink/vresult_file_sink.h
index 90bc06bb42..c8d2f3be18 100644
--- a/be/src/vec/sink/vresult_file_sink.h
+++ b/be/src/vec/sink/vresult_file_sink.h
@@ -46,11 +46,12 @@ class VExprContext;
class VResultFileSink : public DataSink {
public:
- VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc, const
TResultFileSink& sink,
- int per_channel_buffer_size, bool
send_query_statistics_with_every_batch,
+ VResultFileSink(RuntimeState* state, ObjectPool* pool, const
RowDescriptor& row_desc,
+ const TResultFileSink& sink, int per_channel_buffer_size,
+ bool send_query_statistics_with_every_batch,
const std::vector<TExpr>& t_output_expr);
- VResultFileSink(ObjectPool* pool, int sender_id, const RowDescriptor&
row_desc,
- const TResultFileSink& sink,
+ VResultFileSink(RuntimeState* state, ObjectPool* pool, int sender_id,
+ const RowDescriptor& row_desc, const TResultFileSink& sink,
const std::vector<TPlanFragmentDestination>& destinations,
int per_channel_buffer_size, bool
send_query_statistics_with_every_batch,
const std::vector<TExpr>& t_output_expr, DescriptorTbl&
descs);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]