This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 76a0d0dddae [improvement](Block) Replace Block(const PBlock&) with 
deserialize because it has heavy operations in ctor (#23672) (#25291)
76a0d0dddae is described below

commit 76a0d0dddae414a7ce4ed19acb645e4b924df8a5
Author: TengJianPing <[email protected]>
AuthorDate: Wed Oct 11 15:39:58 2023 +0800

    [improvement](Block) Replace Block(const PBlock&) with deserialize because 
it has heavy operations in ctor (#23672) (#25291)
---
 be/src/exec/rowid_fetcher.cpp                      |  3 ++-
 be/src/runtime/tablets_channel.cpp                 |  5 ++---
 .../aggregate_functions/aggregate_function_sort.h  |  3 ++-
 be/src/vec/core/block.cpp                          | 10 ++++++---
 be/src/vec/core/block.h                            |  3 ++-
 be/src/vec/core/block_spill_reader.cpp             |  3 ++-
 be/src/vec/runtime/vdata_stream_mgr.cpp            |  5 +++--
 be/src/vec/runtime/vdata_stream_recvr.cpp          | 24 ++++++++++++----------
 be/src/vec/runtime/vdata_stream_recvr.h            |  8 ++++----
 be/test/vec/core/block_test.cpp                    | 24 ++++++++++++++--------
 be/test/vec/exec/vtablet_sink_test.cpp             |  3 ++-
 11 files changed, 55 insertions(+), 36 deletions(-)

diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp
index 6a937932eac..e1a48276891 100644
--- a/be/src/exec/rowid_fetcher.cpp
+++ b/be/src/exec/rowid_fetcher.cpp
@@ -160,7 +160,8 @@ Status RowIDFetcher::_merge_rpc_results(const 
PMultiGetRequest& request,
             return Status::OK();
         }
         // Merge partial blocks
-        vectorized::Block partial_block(resp.block());
+        vectorized::Block partial_block;
+        RETURN_IF_ERROR(partial_block.deserialize(resp.block()));
         if (partial_block.is_empty_column()) {
             return Status::OK();
         }
diff --git a/be/src/runtime/tablets_channel.cpp 
b/be/src/runtime/tablets_channel.cpp
index dd1822e557a..a619352fafa 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -465,9 +465,8 @@ Status TabletsChannel::add_batch(const 
PTabletWriterAddBlockRequest& request,
         }
     }
 
-    auto get_send_data = [&]() { return vectorized::Block(request.block()); };
-
-    auto send_data = get_send_data();
+    vectorized::Block send_data;
+    RETURN_IF_ERROR(send_data.deserialize(request.block()));
     CHECK(send_data.rows() == request.tablet_ids_size())
             << "block rows: " << send_data.rows()
             << ", tablet_ids_size: " << request.tablet_ids_size();
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sort.h 
b/be/src/vec/aggregate_functions/aggregate_function_sort.h
index f82061e071a..39d7fd184f5 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sort.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sort.h
@@ -87,7 +87,8 @@ struct AggregateFunctionSortData {
 
         PBlock pblock;
         pblock.ParseFromString(data);
-        block = Block(pblock);
+        auto st = block.deserialize(pblock);
+        CHECK(st.ok());
     }
 
     void add(const IColumn** columns, size_t columns_num, size_t row_num) {
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 7a5505b42b4..f8ade0e07ff 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -84,7 +84,8 @@ Block::Block(const std::vector<SlotDescriptor*>& slots, 
size_t block_size,
     }
 }
 
-Block::Block(const PBlock& pblock) {
+Status Block::deserialize(const PBlock& pblock) {
+    swap(Block());
     int be_exec_version = pblock.has_be_exec_version() ? 
pblock.be_exec_version() : 0;
     CHECK(BeExecVersionManager::check_be_exec_version(be_exec_version));
 
@@ -98,11 +99,12 @@ Block::Block(const PBlock& pblock) {
         size_t uncompressed_size = 0;
         if (pblock.has_compression_type() && pblock.has_uncompressed_size()) {
             BlockCompressionCodec* codec;
-            get_block_compression_codec(pblock.compression_type(), &codec);
+            
RETURN_IF_ERROR(get_block_compression_codec(pblock.compression_type(), &codec));
             uncompressed_size = pblock.uncompressed_size();
             compression_scratch.resize(uncompressed_size);
             Slice decompressed_slice(compression_scratch);
-            codec->decompress(Slice(compressed_data, compressed_size), 
&decompressed_slice);
+            RETURN_IF_ERROR(codec->decompress(Slice(compressed_data, 
compressed_size),
+                                              &decompressed_slice));
             DCHECK(uncompressed_size == decompressed_slice.size);
         } else {
             bool success = snappy::GetUncompressedLength(compressed_data, 
compressed_size,
@@ -126,6 +128,8 @@ Block::Block(const PBlock& pblock) {
         data.emplace_back(data_column->get_ptr(), type, pcol_meta.name());
     }
     initialize_index_by_name();
+
+    return Status::OK();
 }
 
 void Block::initialize_index_by_name() {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index ee2b84dd27b..0860c46792b 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -87,7 +87,6 @@ public:
     Block() = default;
     Block(std::initializer_list<ColumnWithTypeAndName> il);
     Block(const ColumnsWithTypeAndName& data_);
-    Block(const PBlock& pblock);
     Block(const std::vector<SlotDescriptor*>& slots, size_t block_size,
           bool ignore_trivial_slot = false);
 
@@ -311,6 +310,8 @@ public:
                      size_t* compressed_bytes, segment_v2::CompressionTypePB 
compression_type,
                      bool allow_transfer_large_data = false) const;
 
+    Status deserialize(const PBlock& pblock);
+
     std::unique_ptr<Block> create_same_struct_block(size_t size) const;
 
     /** Compares (*this) n-th row and rhs m-th row.
diff --git a/be/src/vec/core/block_spill_reader.cpp 
b/be/src/vec/core/block_spill_reader.cpp
index 8d2d4812296..d0cebd3043b 100644
--- a/be/src/vec/core/block_spill_reader.cpp
+++ b/be/src/vec/core/block_spill_reader.cpp
@@ -134,7 +134,8 @@ Status BlockSpillReader::read(Block* block, bool* eos) {
             if (!pb_block.ParseFromArray(result.data, result.size)) {
                 return Status::InternalError("Failed to read spilled block");
             }
-            new_block = Block::create_unique(pb_block);
+            new_block = Block::create_unique();
+            RETURN_IF_ERROR(new_block->deserialize(pb_block));
         }
         block->swap(*new_block);
     } else {
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp 
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index eeee0723717..ad161828f90 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -120,8 +120,9 @@ Status VDataStreamMgr::transmit_block(const 
PTransmitDataParams* request,
 
     bool eos = request->eos();
     if (request->has_block()) {
-        recvr->add_block(request->block(), request->sender_id(), 
request->be_number(),
-                         request->packet_seq(), eos ? nullptr : done);
+        RETURN_IF_ERROR(recvr->add_block(request->block(), 
request->sender_id(),
+                                         request->be_number(), 
request->packet_seq(),
+                                         eos ? nullptr : done));
     }
 
     if (eos) {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index ec602f143ec..9cbc76693b9 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -108,13 +108,13 @@ Status 
VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block
     return Status::OK();
 }
 
-void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int 
be_number,
-                                              int64_t packet_seq,
-                                              ::google::protobuf::Closure** 
done) {
+Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int 
be_number,
+                                                int64_t packet_seq,
+                                                ::google::protobuf::Closure** 
done) {
     {
         std::lock_guard<std::mutex> l(_lock);
         if (_is_cancelled) {
-            return;
+            return Status::OK();
         }
         auto iter = _packet_seq_map.find(be_number);
         if (iter != _packet_seq_map.end()) {
@@ -122,7 +122,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& 
pblock, int be_numbe
                 LOG(WARNING) << fmt::format(
                         "packet already exist [cur_packet_id= {} 
receive_packet_id={}]",
                         iter->second, packet_seq);
-                return;
+                return Status::OK();
             }
             iter->second = packet_seq;
         } else {
@@ -134,7 +134,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& 
pblock, int be_numbe
         DCHECK(_num_remaining_senders >= 0);
         if (_num_remaining_senders == 0) {
             DCHECK(_sender_eos_set.end() != _sender_eos_set.find(be_number));
-            return;
+            return Status::OK();
         }
     }
 
@@ -142,7 +142,8 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& 
pblock, int be_numbe
     int64_t deserialize_time = 0;
     {
         SCOPED_RAW_TIMER(&deserialize_time);
-        block = Block::create_unique(pblock);
+        block = Block::create_unique();
+        RETURN_IF_ERROR(block->deserialize(pblock));
     }
 
     auto block_byte_size = block->allocated_bytes();
@@ -150,7 +151,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& 
pblock, int be_numbe
 
     std::lock_guard<std::mutex> l(_lock);
     if (_is_cancelled) {
-        return;
+        return Status::OK();
     }
 
     COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, deserialize_time);
@@ -170,6 +171,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& 
pblock, int be_numbe
     }
     _recvr->update_blocks_memory_usage(block_byte_size);
     _data_arrival_cv.notify_one();
+    return Status::OK();
 }
 
 void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
@@ -369,11 +371,11 @@ Status VDataStreamRecvr::create_merger(const 
VExprContextSPtrs& ordering_expr,
     return Status::OK();
 }
 
-void VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int 
be_number,
-                                 int64_t packet_seq, 
::google::protobuf::Closure** done) {
+Status VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int 
be_number,
+                                   int64_t packet_seq, 
::google::protobuf::Closure** done) {
     SCOPED_ATTACH_TASK_WITH_ID(_query_mem_tracker, _query_id, 
_fragment_instance_id);
     int use_sender_id = _is_merging ? sender_id : 0;
-    _sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq, 
done);
+    return _sender_queues[use_sender_id]->add_block(pblock, be_number, 
packet_seq, done);
 }
 
 void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index 0059c8ddf0e..5e88aa8eb43 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -75,8 +75,8 @@ public:
                          const std::vector<bool>& nulls_first, size_t 
batch_size, int64_t limit,
                          size_t offset);
 
-    void add_block(const PBlock& pblock, int sender_id, int be_number, int64_t 
packet_seq,
-                   ::google::protobuf::Closure** done);
+    Status add_block(const PBlock& pblock, int sender_id, int be_number, 
int64_t packet_seq,
+                     ::google::protobuf::Closure** done);
 
     void add_block(Block* block, int sender_id, bool use_move);
 
@@ -193,8 +193,8 @@ public:
 
     virtual Status get_batch(Block* next_block, bool* eos);
 
-    void add_block(const PBlock& pblock, int be_number, int64_t packet_seq,
-                   ::google::protobuf::Closure** done);
+    Status add_block(const PBlock& pblock, int be_number, int64_t packet_seq,
+                     ::google::protobuf::Closure** done);
 
     virtual void add_block(Block* block, bool use_move);
 
diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp
index 456d4fc4807..61903e588f2 100644
--- a/be/test/vec/core/block_test.cpp
+++ b/be/test/vec/core/block_test.cpp
@@ -129,7 +129,8 @@ void 
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
         block_to_pb(block, &pblock, compression_type);
         std::string s1 = pblock.DebugString();
 
-        vectorized::Block block2(pblock);
+        vectorized::Block block2;
+        block2.deserialize(pblock);
         PBlock pblock2;
         block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
@@ -150,7 +151,8 @@ void 
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
         block_to_pb(block, &pblock, compression_type);
         std::string s1 = pblock.DebugString();
 
-        vectorized::Block block2(pblock);
+        vectorized::Block block2;
+        block2.deserialize(pblock);
         PBlock pblock2;
         block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
@@ -174,7 +176,8 @@ void 
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
         block_to_pb(block, &pblock, compression_type);
         std::string s1 = pblock.DebugString();
 
-        vectorized::Block block2(pblock);
+        vectorized::Block block2;
+        block2.deserialize(pblock);
         PBlock pblock2;
         block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
@@ -200,7 +203,8 @@ void 
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
         block_to_pb(block, &pblock, compression_type);
         std::string s1 = pblock.DebugString();
 
-        vectorized::Block block2(pblock);
+        vectorized::Block block2;
+        block2.deserialize(pblock);
         PBlock pblock2;
         block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
@@ -220,7 +224,8 @@ void 
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
         block_to_pb(block, &pblock, compression_type);
         std::string s1 = pblock.DebugString();
 
-        vectorized::Block block2(pblock);
+        vectorized::Block block2;
+        block2.deserialize(pblock);
         PBlock pblock2;
         block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
@@ -242,7 +247,8 @@ void 
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
         EXPECT_TRUE(pblock.column_metas()[0].has_decimal_param());
         std::string s1 = pblock.DebugString();
 
-        vectorized::Block block2(pblock);
+        vectorized::Block block2;
+        block2.deserialize(pblock);
         PBlock pblock2;
         block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
@@ -264,7 +270,8 @@ void 
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
         block_to_pb(block, &pblock, compression_type);
         std::string s1 = pblock.DebugString();
 
-        vectorized::Block block2(pblock);
+        vectorized::Block block2;
+        block2.deserialize(pblock);
         PBlock pblock2;
         block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
@@ -279,7 +286,8 @@ void 
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
         block_to_pb(block, &pblock, compression_type);
         std::string s1 = pblock.DebugString();
 
-        vectorized::Block block2(pblock);
+        vectorized::Block block2;
+        block2.deserialize(pblock);
         PBlock pblock2;
         block_to_pb(block2, &pblock2, compression_type);
         std::string s2 = pblock2.DebugString();
diff --git a/be/test/vec/exec/vtablet_sink_test.cpp 
b/be/test/vec/exec/vtablet_sink_test.cpp
index 5e60181c8fb..75ed927fcb2 100644
--- a/be/test/vec/exec/vtablet_sink_test.cpp
+++ b/be/test/vec/exec/vtablet_sink_test.cpp
@@ -316,7 +316,8 @@ public:
             k_add_batch_status.to_protobuf(response->mutable_status());
 
             if (request->has_block() && _row_desc != nullptr) {
-                vectorized::Block block(request->block());
+                vectorized::Block block;
+                block.deserialize(request->block());
 
                 for (size_t row_num = 0; row_num < block.rows(); ++row_num) {
                     std::stringstream out;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to