This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 76a0d0dddae [improvement](Block) Replace Block(const PBlock&) with
deserialize because it has heavy operations in ctor (#23672) (#25291)
76a0d0dddae is described below
commit 76a0d0dddae414a7ce4ed19acb645e4b924df8a5
Author: TengJianPing <[email protected]>
AuthorDate: Wed Oct 11 15:39:58 2023 +0800
[improvement](Block) Replace Block(const PBlock&) with deserialize because
it has heavy operations in ctor (#23672) (#25291)
---
be/src/exec/rowid_fetcher.cpp | 3 ++-
be/src/runtime/tablets_channel.cpp | 5 ++---
.../aggregate_functions/aggregate_function_sort.h | 3 ++-
be/src/vec/core/block.cpp | 10 ++++++---
be/src/vec/core/block.h | 3 ++-
be/src/vec/core/block_spill_reader.cpp | 3 ++-
be/src/vec/runtime/vdata_stream_mgr.cpp | 5 +++--
be/src/vec/runtime/vdata_stream_recvr.cpp | 24 ++++++++++++----------
be/src/vec/runtime/vdata_stream_recvr.h | 8 ++++----
be/test/vec/core/block_test.cpp | 24 ++++++++++++++--------
be/test/vec/exec/vtablet_sink_test.cpp | 3 ++-
11 files changed, 55 insertions(+), 36 deletions(-)
diff --git a/be/src/exec/rowid_fetcher.cpp b/be/src/exec/rowid_fetcher.cpp
index 6a937932eac..e1a48276891 100644
--- a/be/src/exec/rowid_fetcher.cpp
+++ b/be/src/exec/rowid_fetcher.cpp
@@ -160,7 +160,8 @@ Status RowIDFetcher::_merge_rpc_results(const
PMultiGetRequest& request,
return Status::OK();
}
// Merge partial blocks
- vectorized::Block partial_block(resp.block());
+ vectorized::Block partial_block;
+ RETURN_IF_ERROR(partial_block.deserialize(resp.block()));
if (partial_block.is_empty_column()) {
return Status::OK();
}
diff --git a/be/src/runtime/tablets_channel.cpp
b/be/src/runtime/tablets_channel.cpp
index dd1822e557a..a619352fafa 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -465,9 +465,8 @@ Status TabletsChannel::add_batch(const
PTabletWriterAddBlockRequest& request,
}
}
- auto get_send_data = [&]() { return vectorized::Block(request.block()); };
-
- auto send_data = get_send_data();
+ vectorized::Block send_data;
+ RETURN_IF_ERROR(send_data.deserialize(request.block()));
CHECK(send_data.rows() == request.tablet_ids_size())
<< "block rows: " << send_data.rows()
<< ", tablet_ids_size: " << request.tablet_ids_size();
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sort.h
b/be/src/vec/aggregate_functions/aggregate_function_sort.h
index f82061e071a..39d7fd184f5 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sort.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sort.h
@@ -87,7 +87,8 @@ struct AggregateFunctionSortData {
PBlock pblock;
pblock.ParseFromString(data);
- block = Block(pblock);
+ auto st = block.deserialize(pblock);
+ CHECK(st.ok());
}
void add(const IColumn** columns, size_t columns_num, size_t row_num) {
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 7a5505b42b4..f8ade0e07ff 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -84,7 +84,8 @@ Block::Block(const std::vector<SlotDescriptor*>& slots,
size_t block_size,
}
}
-Block::Block(const PBlock& pblock) {
+Status Block::deserialize(const PBlock& pblock) {
+ swap(Block());
int be_exec_version = pblock.has_be_exec_version() ?
pblock.be_exec_version() : 0;
CHECK(BeExecVersionManager::check_be_exec_version(be_exec_version));
@@ -98,11 +99,12 @@ Block::Block(const PBlock& pblock) {
size_t uncompressed_size = 0;
if (pblock.has_compression_type() && pblock.has_uncompressed_size()) {
BlockCompressionCodec* codec;
- get_block_compression_codec(pblock.compression_type(), &codec);
+
RETURN_IF_ERROR(get_block_compression_codec(pblock.compression_type(), &codec));
uncompressed_size = pblock.uncompressed_size();
compression_scratch.resize(uncompressed_size);
Slice decompressed_slice(compression_scratch);
- codec->decompress(Slice(compressed_data, compressed_size),
&decompressed_slice);
+ RETURN_IF_ERROR(codec->decompress(Slice(compressed_data,
compressed_size),
+ &decompressed_slice));
DCHECK(uncompressed_size == decompressed_slice.size);
} else {
bool success = snappy::GetUncompressedLength(compressed_data,
compressed_size,
@@ -126,6 +128,8 @@ Block::Block(const PBlock& pblock) {
data.emplace_back(data_column->get_ptr(), type, pcol_meta.name());
}
initialize_index_by_name();
+
+ return Status::OK();
}
void Block::initialize_index_by_name() {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index ee2b84dd27b..0860c46792b 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -87,7 +87,6 @@ public:
Block() = default;
Block(std::initializer_list<ColumnWithTypeAndName> il);
Block(const ColumnsWithTypeAndName& data_);
- Block(const PBlock& pblock);
Block(const std::vector<SlotDescriptor*>& slots, size_t block_size,
bool ignore_trivial_slot = false);
@@ -311,6 +310,8 @@ public:
size_t* compressed_bytes, segment_v2::CompressionTypePB
compression_type,
bool allow_transfer_large_data = false) const;
+ Status deserialize(const PBlock& pblock);
+
std::unique_ptr<Block> create_same_struct_block(size_t size) const;
/** Compares (*this) n-th row and rhs m-th row.
diff --git a/be/src/vec/core/block_spill_reader.cpp
b/be/src/vec/core/block_spill_reader.cpp
index 8d2d4812296..d0cebd3043b 100644
--- a/be/src/vec/core/block_spill_reader.cpp
+++ b/be/src/vec/core/block_spill_reader.cpp
@@ -134,7 +134,8 @@ Status BlockSpillReader::read(Block* block, bool* eos) {
if (!pb_block.ParseFromArray(result.data, result.size)) {
return Status::InternalError("Failed to read spilled block");
}
- new_block = Block::create_unique(pb_block);
+ new_block = Block::create_unique();
+ RETURN_IF_ERROR(new_block->deserialize(pb_block));
}
block->swap(*new_block);
} else {
diff --git a/be/src/vec/runtime/vdata_stream_mgr.cpp
b/be/src/vec/runtime/vdata_stream_mgr.cpp
index eeee0723717..ad161828f90 100644
--- a/be/src/vec/runtime/vdata_stream_mgr.cpp
+++ b/be/src/vec/runtime/vdata_stream_mgr.cpp
@@ -120,8 +120,9 @@ Status VDataStreamMgr::transmit_block(const
PTransmitDataParams* request,
bool eos = request->eos();
if (request->has_block()) {
- recvr->add_block(request->block(), request->sender_id(),
request->be_number(),
- request->packet_seq(), eos ? nullptr : done);
+ RETURN_IF_ERROR(recvr->add_block(request->block(),
request->sender_id(),
+ request->be_number(),
request->packet_seq(),
+ eos ? nullptr : done));
}
if (eos) {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index ec602f143ec..9cbc76693b9 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -108,13 +108,13 @@ Status
VDataStreamRecvr::SenderQueue::_inner_get_batch_without_lock(Block* block
return Status::OK();
}
-void VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int
be_number,
- int64_t packet_seq,
- ::google::protobuf::Closure**
done) {
+Status VDataStreamRecvr::SenderQueue::add_block(const PBlock& pblock, int
be_number,
+ int64_t packet_seq,
+ ::google::protobuf::Closure**
done) {
{
std::lock_guard<std::mutex> l(_lock);
if (_is_cancelled) {
- return;
+ return Status::OK();
}
auto iter = _packet_seq_map.find(be_number);
if (iter != _packet_seq_map.end()) {
@@ -122,7 +122,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock&
pblock, int be_numbe
LOG(WARNING) << fmt::format(
"packet already exist [cur_packet_id= {}
receive_packet_id={}]",
iter->second, packet_seq);
- return;
+ return Status::OK();
}
iter->second = packet_seq;
} else {
@@ -134,7 +134,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock&
pblock, int be_numbe
DCHECK(_num_remaining_senders >= 0);
if (_num_remaining_senders == 0) {
DCHECK(_sender_eos_set.end() != _sender_eos_set.find(be_number));
- return;
+ return Status::OK();
}
}
@@ -142,7 +142,8 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock&
pblock, int be_numbe
int64_t deserialize_time = 0;
{
SCOPED_RAW_TIMER(&deserialize_time);
- block = Block::create_unique(pblock);
+ block = Block::create_unique();
+ RETURN_IF_ERROR(block->deserialize(pblock));
}
auto block_byte_size = block->allocated_bytes();
@@ -150,7 +151,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock&
pblock, int be_numbe
std::lock_guard<std::mutex> l(_lock);
if (_is_cancelled) {
- return;
+ return Status::OK();
}
COUNTER_UPDATE(_recvr->_deserialize_row_batch_timer, deserialize_time);
@@ -170,6 +171,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock&
pblock, int be_numbe
}
_recvr->update_blocks_memory_usage(block_byte_size);
_data_arrival_cv.notify_one();
+ return Status::OK();
}
void VDataStreamRecvr::SenderQueue::add_block(Block* block, bool use_move) {
@@ -369,11 +371,11 @@ Status VDataStreamRecvr::create_merger(const
VExprContextSPtrs& ordering_expr,
return Status::OK();
}
-void VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int
be_number,
- int64_t packet_seq,
::google::protobuf::Closure** done) {
+Status VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int
be_number,
+ int64_t packet_seq,
::google::protobuf::Closure** done) {
SCOPED_ATTACH_TASK_WITH_ID(_query_mem_tracker, _query_id,
_fragment_instance_id);
int use_sender_id = _is_merging ? sender_id : 0;
- _sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq,
done);
+ return _sender_queues[use_sender_id]->add_block(pblock, be_number,
packet_seq, done);
}
void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index 0059c8ddf0e..5e88aa8eb43 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -75,8 +75,8 @@ public:
const std::vector<bool>& nulls_first, size_t
batch_size, int64_t limit,
size_t offset);
- void add_block(const PBlock& pblock, int sender_id, int be_number, int64_t
packet_seq,
- ::google::protobuf::Closure** done);
+ Status add_block(const PBlock& pblock, int sender_id, int be_number,
int64_t packet_seq,
+ ::google::protobuf::Closure** done);
void add_block(Block* block, int sender_id, bool use_move);
@@ -193,8 +193,8 @@ public:
virtual Status get_batch(Block* next_block, bool* eos);
- void add_block(const PBlock& pblock, int be_number, int64_t packet_seq,
- ::google::protobuf::Closure** done);
+ Status add_block(const PBlock& pblock, int be_number, int64_t packet_seq,
+ ::google::protobuf::Closure** done);
virtual void add_block(Block* block, bool use_move);
diff --git a/be/test/vec/core/block_test.cpp b/be/test/vec/core/block_test.cpp
index 456d4fc4807..61903e588f2 100644
--- a/be/test/vec/core/block_test.cpp
+++ b/be/test/vec/core/block_test.cpp
@@ -129,7 +129,8 @@ void
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();
- vectorized::Block block2(pblock);
+ vectorized::Block block2;
+ block2.deserialize(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
@@ -150,7 +151,8 @@ void
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();
- vectorized::Block block2(pblock);
+ vectorized::Block block2;
+ block2.deserialize(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
@@ -174,7 +176,8 @@ void
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();
- vectorized::Block block2(pblock);
+ vectorized::Block block2;
+ block2.deserialize(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
@@ -200,7 +203,8 @@ void
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();
- vectorized::Block block2(pblock);
+ vectorized::Block block2;
+ block2.deserialize(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
@@ -220,7 +224,8 @@ void
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();
- vectorized::Block block2(pblock);
+ vectorized::Block block2;
+ block2.deserialize(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
@@ -242,7 +247,8 @@ void
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
EXPECT_TRUE(pblock.column_metas()[0].has_decimal_param());
std::string s1 = pblock.DebugString();
- vectorized::Block block2(pblock);
+ vectorized::Block block2;
+ block2.deserialize(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
@@ -264,7 +270,8 @@ void
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();
- vectorized::Block block2(pblock);
+ vectorized::Block block2;
+ block2.deserialize(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
@@ -279,7 +286,8 @@ void
serialize_and_deserialize_test(segment_v2::CompressionTypePB compression_ty
block_to_pb(block, &pblock, compression_type);
std::string s1 = pblock.DebugString();
- vectorized::Block block2(pblock);
+ vectorized::Block block2;
+ block2.deserialize(pblock);
PBlock pblock2;
block_to_pb(block2, &pblock2, compression_type);
std::string s2 = pblock2.DebugString();
diff --git a/be/test/vec/exec/vtablet_sink_test.cpp
b/be/test/vec/exec/vtablet_sink_test.cpp
index 5e60181c8fb..75ed927fcb2 100644
--- a/be/test/vec/exec/vtablet_sink_test.cpp
+++ b/be/test/vec/exec/vtablet_sink_test.cpp
@@ -316,7 +316,8 @@ public:
k_add_batch_status.to_protobuf(response->mutable_status());
if (request->has_block() && _row_desc != nullptr) {
- vectorized::Block block(request->block());
+ vectorized::Block block;
+ block.deserialize(request->block());
for (size_t row_num = 0; row_num < block.rows(); ++row_num) {
std::stringstream out;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]