This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b352c93ff [improvement](sink) avoid frequent allocation and 
deallocation when serializing block (#12310)
7b352c93ff is described below

commit 7b352c93ff32651741a220877b95999a19d5c75b
Author: Jerry Hu <[email protected]>
AuthorDate: Mon Sep 5 12:23:43 2022 +0800

    [improvement](sink) avoid frequent allocation and deallocation when 
serializing block (#12310)
---
 be/src/vec/core/block.cpp               | 21 ++++++++++++++-------
 be/src/vec/core/block.h                 |  9 ++++++++-
 be/src/vec/sink/vdata_stream_sender.cpp | 18 +++++++++++++-----
 be/src/vec/sink/vdata_stream_sender.h   |  5 +++++
 4 files changed, 40 insertions(+), 13 deletions(-)

diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index b9562addd0..f5af807ee5 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -694,6 +694,14 @@ Status Block::filter_block(Block* block, int 
filter_column_id, int column_to_kee
 Status Block::serialize(PBlock* pblock, size_t* uncompressed_bytes, size_t* 
compressed_bytes,
                         segment_v2::CompressionTypePB compression_type,
                         bool allow_transfer_large_data) const {
+    std::string compression_scratch;
+    return serialize(pblock, &compression_scratch, uncompressed_bytes, 
compressed_bytes,
+                     compression_type, allow_transfer_large_data);
+}
+
+Status Block::serialize(PBlock* pblock, std::string* compressed_buffer, 
size_t* uncompressed_bytes,
+                        size_t* compressed_bytes, 
segment_v2::CompressionTypePB compression_type,
+                        bool allow_transfer_large_data) const {
     // calc uncompressed size for allocation
     size_t content_uncompressed_size = 0;
     for (const auto& c : *this) {
@@ -726,7 +734,7 @@ Status Block::serialize(PBlock* pblock, size_t* 
uncompressed_bytes, size_t* comp
 
     // compress
     if (config::compress_rowbatches && content_uncompressed_size > 0) {
-        SCOPED_RAW_TIMER(const_cast<int64_t*>(&_compress_time_ns));
+        SCOPED_RAW_TIMER(&_compress_time_ns);
         pblock->set_compression_type(compression_type);
         pblock->set_uncompressed_size(content_uncompressed_size);
 
@@ -734,12 +742,11 @@ Status Block::serialize(PBlock* pblock, size_t* 
uncompressed_bytes, size_t* comp
         RETURN_IF_ERROR(get_block_compression_codec(compression_type, codec));
 
         size_t max_compressed_size = 
codec->max_compressed_len(content_uncompressed_size);
-        std::string compression_scratch;
         try {
-            // Try compressing the content to compression_scratch,
+            // Try compressing the content to compressed_buffer,
             // swap if compressed data is smaller
             // Allocation of extra-long contiguous memory may fail, and data 
compression cannot be used if it fails
-            compression_scratch.resize(max_compressed_size);
+            compressed_buffer->resize(max_compressed_size);
         } catch (...) {
             std::exception_ptr p = std::current_exception();
             std::string msg =
@@ -749,13 +756,13 @@ Status Block::serialize(PBlock* pblock, size_t* 
uncompressed_bytes, size_t* comp
             return Status::BufferAllocFailed(msg);
         }
 
-        Slice compressed_slice(compression_scratch);
+        Slice compressed_slice(*compressed_buffer);
         codec->compress(Slice(column_values->data(), 
content_uncompressed_size), &compressed_slice);
         size_t compressed_size = compressed_slice.size;
 
         if (LIKELY(compressed_size < content_uncompressed_size)) {
-            compression_scratch.resize(compressed_size);
-            column_values->swap(compression_scratch);
+            compressed_buffer->resize(compressed_size);
+            column_values->swap(*compressed_buffer);
             pblock->set_compressed(true);
             *compressed_bytes = compressed_size;
         } else {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 8895980e4d..eb229cd173 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -68,7 +68,7 @@ private:
     int64_t _decompress_time_ns = 0;
     int64_t _decompressed_bytes = 0;
 
-    int64_t _compress_time_ns = 0;
+    mutable int64_t _compress_time_ns = 0;
 
 public:
     BlockInfo info;
@@ -275,6 +275,13 @@ public:
                      segment_v2::CompressionTypePB compression_type,
                      bool allow_transfer_large_data = false) const;
 
+    // serialize block to PBlock
+    // compressed_buffer reuse to avoid frequent allocation and deallocation,
+    // NOTE: compressed_buffer's data may be swapped with 
pblock->mutable_column_values
+    Status serialize(PBlock* pblock, std::string* compressed_buffer, size_t* 
uncompressed_bytes,
+                     size_t* compressed_bytes, segment_v2::CompressionTypePB 
compression_type,
+                     bool allow_transfer_large_data = false) const;
+
     // serialize block to PRowbatch
     void serialize(RowBatch*, const RowDescriptor&);
 
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index b2ef88af49..8ff35b6b5c 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -95,7 +95,7 @@ Status VDataStreamSender::Channel::send_current_block(bool 
eos) {
         return send_local_block(eos);
     }
     auto block = _mutable_block->to_block();
-    RETURN_IF_ERROR(_parent->serialize_block(&block, _ch_cur_pb_block));
+    RETURN_IF_ERROR(_parent->serialize_block(&block, _ch_cur_pb_block, 
&_compressed_data_buffer));
     block.clear_column_data();
     _mutable_block->set_muatable_columns(block.mutate_columns());
     RETURN_IF_ERROR(send_block(_ch_cur_pb_block, eos));
@@ -481,7 +481,8 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block) {
                 RETURN_IF_ERROR(channel->send_local_block(block));
             }
         } else {
-            RETURN_IF_ERROR(serialize_block(block, _cur_pb_block, 
_channels.size()));
+            RETURN_IF_ERROR(serialize_block(block, _cur_pb_block, 
&_compressed_data_buffer,
+                                            _channels.size()));
             for (auto channel : _channels) {
                 if (channel->is_local()) {
                     RETURN_IF_ERROR(channel->send_local_block(block));
@@ -499,7 +500,8 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block) {
         if (current_channel->is_local()) {
             RETURN_IF_ERROR(current_channel->send_local_block(block));
         } else {
-            RETURN_IF_ERROR(serialize_block(block, 
current_channel->ch_cur_pb_block()));
+            RETURN_IF_ERROR(serialize_block(block, 
current_channel->ch_cur_pb_block(),
+                                            &_compressed_data_buffer));
             
RETURN_IF_ERROR(current_channel->send_block(current_channel->ch_cur_pb_block()));
             current_channel->ch_roll_pb_block();
         }
@@ -604,12 +606,18 @@ Status VDataStreamSender::close(RuntimeState* state, 
Status exec_status) {
 }
 
 Status VDataStreamSender::serialize_block(Block* src, PBlock* dest, int 
num_receivers) {
+    return serialize_block(src, dest, &_compressed_data_buffer, num_receivers);
+}
+
+Status VDataStreamSender::serialize_block(Block* src, PBlock* dest, 
std::string* compressed_buffer,
+                                          int num_receivers) {
     {
         SCOPED_TIMER(_serialize_batch_timer);
         dest->Clear();
         size_t uncompressed_bytes = 0, compressed_bytes = 0;
-        RETURN_IF_ERROR(src->serialize(dest, &uncompressed_bytes, 
&compressed_bytes,
-                                       _compression_type, 
_transfer_large_data_by_brpc));
+        RETURN_IF_ERROR(src->serialize(dest, compressed_buffer, 
&uncompressed_bytes,
+                                       &compressed_bytes, _compression_type,
+                                       _transfer_large_data_by_brpc));
         COUNTER_UPDATE(_bytes_sent_counter, compressed_bytes * num_receivers);
         COUNTER_UPDATE(_uncompressed_bytes_counter, uncompressed_bytes * 
num_receivers);
         COUNTER_UPDATE(_compress_timer, src->get_compress_time());
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index c03eb0804c..89dfbf3831 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -75,6 +75,8 @@ public:
     RuntimeState* state() { return _state; }
 
     Status serialize_block(Block* src, PBlock* dest, int num_receivers = 1);
+    Status serialize_block(Block* src, PBlock* dest, std::string* 
compressed_buffer,
+                           int num_receivers = 1);
 
 protected:
     void _roll_pb_block();
@@ -149,6 +151,8 @@ protected:
     // User can change this config at runtime, avoid it being modified during 
query or loading process.
     bool _transfer_large_data_by_brpc = false;
 
+    std::string _compressed_data_buffer;
+
     segment_v2::CompressionTypePB _compression_type;
 };
 
@@ -303,6 +307,7 @@ private:
     PBlock* _ch_cur_pb_block = nullptr;
     PBlock _ch_pb_block1;
     PBlock _ch_pb_block2;
+    std::string _compressed_data_buffer;
 
     bool _enable_local_exchange = false;
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to