This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7b352c93ff [improvement](sink) avoid frequent allocation and
deallocation when serializing block (#12310)
7b352c93ff is described below
commit 7b352c93ff32651741a220877b95999a19d5c75b
Author: Jerry Hu <[email protected]>
AuthorDate: Mon Sep 5 12:23:43 2022 +0800
[improvement](sink) avoid frequent allocation and deallocation when
serializing block (#12310)
---
be/src/vec/core/block.cpp | 21 ++++++++++++++-------
be/src/vec/core/block.h | 9 ++++++++-
be/src/vec/sink/vdata_stream_sender.cpp | 18 +++++++++++++-----
be/src/vec/sink/vdata_stream_sender.h | 5 +++++
4 files changed, 40 insertions(+), 13 deletions(-)
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index b9562addd0..f5af807ee5 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -694,6 +694,14 @@ Status Block::filter_block(Block* block, int
filter_column_id, int column_to_kee
Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t*
compressed_bytes,
segment_v2::CompressionTypePB compression_type,
bool allow_transfer_large_data) const {
+ std::string compression_scratch;
+ return serialize(pblock, &compression_scratch, uncompressed_bytes,
compressed_bytes,
+ compression_type, allow_transfer_large_data);
+}
+
+Status Block::serialize(PBlock* pblock, std::string* compressed_buffer,
size_t* uncompressed_bytes,
+ size_t* compressed_bytes,
segment_v2::CompressionTypePB compression_type,
+ bool allow_transfer_large_data) const {
// calc uncompressed size for allocation
size_t content_uncompressed_size = 0;
for (const auto& c : *this) {
@@ -726,7 +734,7 @@ Status Block::serialize(PBlock* pblock, size_t*
uncompressed_bytes, size_t* comp
// compress
if (config::compress_rowbatches && content_uncompressed_size > 0) {
- SCOPED_RAW_TIMER(const_cast<int64_t*>(&_compress_time_ns));
+ SCOPED_RAW_TIMER(&_compress_time_ns);
pblock->set_compression_type(compression_type);
pblock->set_uncompressed_size(content_uncompressed_size);
@@ -734,12 +742,11 @@ Status Block::serialize(PBlock* pblock, size_t*
uncompressed_bytes, size_t* comp
RETURN_IF_ERROR(get_block_compression_codec(compression_type, codec));
size_t max_compressed_size =
codec->max_compressed_len(content_uncompressed_size);
- std::string compression_scratch;
try {
- // Try compressing the content to compression_scratch,
+ // Try compressing the content to compressed_buffer,
// swap if compressed data is smaller
// Allocation of extra-long contiguous memory may fail, and data
compression cannot be used if it fails
- compression_scratch.resize(max_compressed_size);
+ compressed_buffer->resize(max_compressed_size);
} catch (...) {
std::exception_ptr p = std::current_exception();
std::string msg =
@@ -749,13 +756,13 @@ Status Block::serialize(PBlock* pblock, size_t*
uncompressed_bytes, size_t* comp
return Status::BufferAllocFailed(msg);
}
- Slice compressed_slice(compression_scratch);
+ Slice compressed_slice(*compressed_buffer);
codec->compress(Slice(column_values->data(),
content_uncompressed_size), &compressed_slice);
size_t compressed_size = compressed_slice.size;
if (LIKELY(compressed_size < content_uncompressed_size)) {
- compression_scratch.resize(compressed_size);
- column_values->swap(compression_scratch);
+ compressed_buffer->resize(compressed_size);
+ column_values->swap(*compressed_buffer);
pblock->set_compressed(true);
*compressed_bytes = compressed_size;
} else {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 8895980e4d..eb229cd173 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -68,7 +68,7 @@ private:
int64_t _decompress_time_ns = 0;
int64_t _decompressed_bytes = 0;
- int64_t _compress_time_ns = 0;
+ mutable int64_t _compress_time_ns = 0;
public:
BlockInfo info;
@@ -275,6 +275,13 @@ public:
segment_v2::CompressionTypePB compression_type,
bool allow_transfer_large_data = false) const;
+ // serialize block to PBlock
+ // compressed_buffer reuse to avoid frequent allocation and deallocation,
+ // NOTE: compressed_buffer's data may be swapped with
pblock->mutable_column_values
+ Status serialize(PBlock* pblock, std::string* compressed_buffer, size_t*
uncompressed_bytes,
+ size_t* compressed_bytes, segment_v2::CompressionTypePB
compression_type,
+ bool allow_transfer_large_data = false) const;
+
// serialize block to PRowbatch
void serialize(RowBatch*, const RowDescriptor&);
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index b2ef88af49..8ff35b6b5c 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -95,7 +95,7 @@ Status VDataStreamSender::Channel::send_current_block(bool
eos) {
return send_local_block(eos);
}
auto block = _mutable_block->to_block();
- RETURN_IF_ERROR(_parent->serialize_block(&block, _ch_cur_pb_block));
+ RETURN_IF_ERROR(_parent->serialize_block(&block, _ch_cur_pb_block,
&_compressed_data_buffer));
block.clear_column_data();
_mutable_block->set_muatable_columns(block.mutate_columns());
RETURN_IF_ERROR(send_block(_ch_cur_pb_block, eos));
@@ -481,7 +481,8 @@ Status VDataStreamSender::send(RuntimeState* state, Block*
block) {
RETURN_IF_ERROR(channel->send_local_block(block));
}
} else {
- RETURN_IF_ERROR(serialize_block(block, _cur_pb_block,
_channels.size()));
+ RETURN_IF_ERROR(serialize_block(block, _cur_pb_block,
&_compressed_data_buffer,
+ _channels.size()));
for (auto channel : _channels) {
if (channel->is_local()) {
RETURN_IF_ERROR(channel->send_local_block(block));
@@ -499,7 +500,8 @@ Status VDataStreamSender::send(RuntimeState* state, Block*
block) {
if (current_channel->is_local()) {
RETURN_IF_ERROR(current_channel->send_local_block(block));
} else {
- RETURN_IF_ERROR(serialize_block(block,
current_channel->ch_cur_pb_block()));
+ RETURN_IF_ERROR(serialize_block(block,
current_channel->ch_cur_pb_block(),
+ &_compressed_data_buffer));
RETURN_IF_ERROR(current_channel->send_block(current_channel->ch_cur_pb_block()));
current_channel->ch_roll_pb_block();
}
@@ -604,12 +606,18 @@ Status VDataStreamSender::close(RuntimeState* state,
Status exec_status) {
}
Status VDataStreamSender::serialize_block(Block* src, PBlock* dest, int
num_receivers) {
+ return serialize_block(src, dest, &_compressed_data_buffer, num_receivers);
+}
+
+Status VDataStreamSender::serialize_block(Block* src, PBlock* dest,
std::string* compressed_buffer,
+ int num_receivers) {
{
SCOPED_TIMER(_serialize_batch_timer);
dest->Clear();
size_t uncompressed_bytes = 0, compressed_bytes = 0;
- RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes,
&compressed_bytes,
- _compression_type,
_transfer_large_data_by_brpc));
+ RETURN_IF_ERROR(src->serialize(dest, compressed_buffer,
&uncompressed_bytes,
+ &compressed_bytes, _compression_type,
+ _transfer_large_data_by_brpc));
COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers);
COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes *
num_receivers);
COUNTER_UPDATE(_compress_timer, src->get_compress_time());
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index c03eb0804c..89dfbf3831 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -75,6 +75,8 @@ public:
RuntimeState* state() { return _state; }
Status serialize_block(Block* src, PBlock* dest, int num_receivers = 1);
+ Status serialize_block(Block* src, PBlock* dest, std::string*
compressed_buffer,
+ int num_receivers = 1);
protected:
void _roll_pb_block();
@@ -149,6 +151,8 @@ protected:
// User can change this config at runtime, avoid it being modified during
query or loading process.
bool _transfer_large_data_by_brpc = false;
+ std::string _compressed_data_buffer;
+
segment_v2::CompressionTypePB _compression_type;
};
@@ -303,6 +307,7 @@ private:
PBlock* _ch_cur_pb_block = nullptr;
PBlock _ch_pb_block1;
PBlock _ch_pb_block2;
+ std::string _compressed_data_buffer;
bool _enable_local_exchange = false;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]