This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 591aee528d [Bug](exchange) change BlockSerializer from unique_ptr to 
object (#22653)
591aee528d is described below

commit 591aee528d166ef88c24b67c763f3a9ffe516b3a
Author: Pxl <[email protected]>
AuthorDate: Mon Aug 7 14:47:21 2023 +0800

    [Bug](exchange) change BlockSerializer from unique_ptr to object (#22653)
    
    change BlockSerializer from unique_ptr to object
---
 be/src/exec/data_sink.cpp               |   8 +--
 be/src/vec/sink/vdata_stream_sender.cpp | 119 ++++++++++++--------------------
 be/src/vec/sink/vdata_stream_sender.h   |  23 +++---
 be/src/vec/sink/vresult_file_sink.cpp   |  11 +--
 be/src/vec/sink/vresult_file_sink.h     |   9 +--
 5 files changed, 71 insertions(+), 99 deletions(-)

diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp
index c5d5446611..bc914818e8 100644
--- a/be/src/exec/data_sink.cpp
+++ b/be/src/exec/data_sink.cpp
@@ -89,12 +89,12 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         // Result file sink is not the top sink
         if (params.__isset.destinations && params.destinations.size() > 0) {
             sink->reset(new doris::vectorized::VResultFileSink(
-                    pool, params.sender_id, row_desc, 
thrift_sink.result_file_sink,
+                    state, pool, params.sender_id, row_desc, 
thrift_sink.result_file_sink,
                     params.destinations, 16 * 1024, 
send_query_statistics_with_every_batch,
                     output_exprs, desc_tbl));
         } else {
             sink->reset(new doris::vectorized::VResultFileSink(
-                    pool, row_desc, thrift_sink.result_file_sink, 16 * 1024,
+                    state, pool, row_desc, thrift_sink.result_file_sink, 16 * 
1024,
                     send_query_statistics_with_every_batch, output_exprs));
         }
         break;
@@ -226,12 +226,12 @@ Status DataSink::create_data_sink(ObjectPool* pool, const 
TDataSink& thrift_sink
         // Result file sink is not the top sink
         if (params.__isset.destinations && params.destinations.size() > 0) {
             sink->reset(new doris::vectorized::VResultFileSink(
-                    pool, local_params.sender_id, row_desc, 
thrift_sink.result_file_sink,
+                    state, pool, local_params.sender_id, row_desc, 
thrift_sink.result_file_sink,
                     params.destinations, 16 * 1024, 
send_query_statistics_with_every_batch,
                     output_exprs, desc_tbl));
         } else {
             sink->reset(new doris::vectorized::VResultFileSink(
-                    pool, row_desc, thrift_sink.result_file_sink, 16 * 1024,
+                    state, pool, row_desc, thrift_sink.result_file_sink, 16 * 
1024,
                     send_query_statistics_with_every_batch, output_exprs));
         }
         break;
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index df705ee37b..2fd86e0c38 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -95,8 +95,6 @@ Status Channel::init(RuntimeState* state) {
                 _fragment_instance_id, _dest_node_id);
     }
 
-    _serializer.reset(new BlockSerializer(_parent, _is_local));
-
     // In bucket shuffle join will set fragment_instance_id (-1, -1)
     // to build a camouflaged empty channel. the ip and port is '0.0.0.0:0"
     // so the empty channel not need call function close_internal()
@@ -113,7 +111,7 @@ Status Channel::send_current_block(bool eos) {
     }
     SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
     if (eos) {
-        RETURN_IF_ERROR(_serializer->serialize_block(_ch_cur_pb_block, 1));
+        RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1));
     }
     RETURN_IF_ERROR(send_block(_ch_cur_pb_block, eos));
     ch_roll_pb_block();
@@ -122,8 +120,8 @@ Status Channel::send_current_block(bool eos) {
 
 Status Channel::send_local_block(bool eos) {
     SCOPED_TIMER(_parent->_local_send_timer);
-    Block block = _serializer->get_block()->to_block();
-    
_serializer->get_block()->set_muatable_columns(block.clone_empty_columns());
+    Block block = _serializer.get_block()->to_block();
+    _serializer.get_block()->set_muatable_columns(block.clone_empty_columns());
     if (_recvr_is_valid()) {
         COUNTER_UPDATE(_parent->_local_bytes_send_counter, block.bytes());
         COUNTER_UPDATE(_parent->_local_sent_rows, block.rows());
@@ -134,7 +132,7 @@ Status Channel::send_local_block(bool eos) {
         }
         return Status::OK();
     } else {
-        _serializer->reset_block();
+        _serializer.reset_block();
         return _receiver_status;
     }
 }
@@ -205,7 +203,7 @@ Status Channel::add_rows(Block* block, const 
std::vector<int>& rows) {
 
     bool serialized = false;
     RETURN_IF_ERROR(
-            _serializer->next_serialized_block(block, _ch_cur_pb_block, 1, 
&serialized, &rows));
+            _serializer.next_serialized_block(block, _ch_cur_pb_block, 1, 
&serialized, &rows));
     if (serialized) {
         RETURN_IF_ERROR(send_current_block(false));
     }
@@ -224,9 +222,7 @@ Status Channel::close_wait(RuntimeState* state) {
         _need_close = false;
         return st;
     }
-    if (_serializer) {
-        _serializer->reset_block();
-    }
+    _serializer.reset_block();
     return Status::OK();
 }
 
@@ -236,14 +232,14 @@ Status Channel::close_internal() {
     }
     VLOG_RPC << "Channel::close() instance_id=" << _fragment_instance_id
              << " dest_node=" << _dest_node_id << " #rows= "
-             << ((_serializer->get_block() == nullptr) ? 0 : 
_serializer->get_block()->rows())
+             << ((_serializer.get_block() == nullptr) ? 0 : 
_serializer.get_block()->rows())
              << " receiver status: " << _receiver_status;
     if (is_receiver_eof()) {
-        _serializer->reset_block();
+        _serializer.reset_block();
         return Status::OK();
     }
     Status status;
-    if (_serializer->get_block() != nullptr && 
_serializer->get_block()->rows() > 0) {
+    if (_serializer.get_block() != nullptr && _serializer.get_block()->rows() 
> 0) {
         status = send_current_block(true);
     } else {
         SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
@@ -286,6 +282,7 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, 
ObjectPool* pool, int
                                      int per_channel_buffer_size,
                                      bool 
send_query_statistics_with_every_batch)
         : _sender_id(sender_id),
+          _state(state),
           _pool(pool),
           _row_desc(row_desc),
           _current_channel_idx(0),
@@ -299,7 +296,8 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, 
ObjectPool* pool, int
           _blocks_sent_counter(nullptr),
           _local_bytes_send_counter(nullptr),
           _dest_node_id(sink.dest_node_id),
-          _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc) {
+          _transfer_large_data_by_brpc(config::transfer_large_data_by_brpc),
+          _serializer(this) {
     DCHECK_GT(destinations.size(), 0);
     DCHECK(sink.output_partition.type == TPartitionType::UNPARTITIONED ||
            sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
@@ -344,12 +342,13 @@ VDataStreamSender::VDataStreamSender(RuntimeState* state, 
ObjectPool* pool, int
     }
 }
 
-VDataStreamSender::VDataStreamSender(ObjectPool* pool, int sender_id, const 
RowDescriptor& row_desc,
-                                     PlanNodeId dest_node_id,
+VDataStreamSender::VDataStreamSender(RuntimeState* state, ObjectPool* pool, 
int sender_id,
+                                     const RowDescriptor& row_desc, PlanNodeId 
dest_node_id,
                                      const 
std::vector<TPlanFragmentDestination>& destinations,
                                      int per_channel_buffer_size,
                                      bool 
send_query_statistics_with_every_batch)
         : _sender_id(sender_id),
+          _state(state),
           _pool(pool),
           _row_desc(row_desc),
           _current_channel_idx(0),
@@ -365,7 +364,8 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int 
sender_id, const RowD
           _split_block_distribute_by_channel_timer(nullptr),
           _blocks_sent_counter(nullptr),
           _local_bytes_send_counter(nullptr),
-          _dest_node_id(dest_node_id) {
+          _dest_node_id(dest_node_id),
+          _serializer(this) {
     _cur_pb_block = &_pb_block1;
     _name = "VDataStreamSender";
     std::map<int64_t, int64_t> fragment_id_to_channel_index;
@@ -384,30 +384,6 @@ VDataStreamSender::VDataStreamSender(ObjectPool* pool, int 
sender_id, const RowD
     }
 }
 
-VDataStreamSender::VDataStreamSender(ObjectPool* pool, const RowDescriptor& 
row_desc,
-                                     int per_channel_buffer_size,
-                                     bool 
send_query_statistics_with_every_batch)
-        : _sender_id(0),
-          _pool(pool),
-          _row_desc(row_desc),
-          _current_channel_idx(0),
-          _profile(nullptr),
-          _serialize_batch_timer(nullptr),
-          _compress_timer(nullptr),
-          _brpc_send_timer(nullptr),
-          _brpc_wait_timer(nullptr),
-          _bytes_sent_counter(nullptr),
-          _local_send_timer(nullptr),
-          _split_block_hash_compute_timer(nullptr),
-          _split_block_distribute_by_channel_timer(nullptr),
-          _blocks_sent_counter(nullptr),
-          _peak_memory_usage_counter(nullptr),
-          _local_bytes_send_counter(nullptr),
-          _dest_node_id(0) {
-    _cur_pb_block = &_pb_block1;
-    _name = "VDataStreamSender";
-}
-
 VDataStreamSender::~VDataStreamSender() {
     _channel_shared_ptrs.clear();
 }
@@ -429,7 +405,6 @@ Status VDataStreamSender::init(const TDataSink& tsink) {
 
 Status VDataStreamSender::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(DataSink::prepare(state));
-    _state = state;
 
     std::vector<std::string> instances;
     for (const auto& channel : _channels) {
@@ -454,8 +429,6 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
         RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state, 
_row_desc));
     }
 
-    _serializer.reset(new BlockSerializer(this));
-
     _bytes_sent_counter = ADD_COUNTER(profile(), "BytesSent", TUnit::BYTES);
     _uncompressed_bytes_counter = ADD_COUNTER(profile(), 
"UncompressedRowBatchSize", TUnit::BYTES);
     _local_sent_rows = ADD_COUNTER(profile(), "LocalSentRows", TUnit::UNIT);
@@ -521,27 +494,27 @@ Status VDataStreamSender::send(RuntimeState* state, 
Block* block, bool eos) {
 
     if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) {
 #ifndef BROADCAST_ALL_CHANNELS
-#define BROADCAST_ALL_CHANNELS(PBLOCK, PBLOCK_TO_SEND, POST_PROCESS)           
                    \
-    {                                                                          
                    \
-        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());                        
                    \
-        bool serialized = false;                                               
                    \
-        RETURN_IF_ERROR(                                                       
                    \
-                _serializer->next_serialized_block(block, PBLOCK, 
_channels.size(), &serialized)); \
-        if (serialized) {                                                      
                    \
-            Status status;                                                     
                    \
-            for (auto channel : _channels) {                                   
                    \
-                if (!channel->is_receiver_eof()) {                             
                    \
-                    if (channel->is_local()) {                                 
                    \
-                        status = channel->send_local_block(block);             
                    \
-                    } else {                                                   
                    \
-                        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());        
                    \
-                        status = channel->send_block(PBLOCK_TO_SEND, false);   
                    \
-                    }                                                          
                    \
-                    HANDLE_CHANNEL_STATUS(state, channel, status);             
                    \
-                }                                                              
                    \
-            }                                                                  
                    \
-            POST_PROCESS;                                                      
                    \
-        }                                                                      
                    \
+#define BROADCAST_ALL_CHANNELS(PBLOCK, PBLOCK_TO_SEND, POST_PROCESS)           
                   \
+    {                                                                          
                   \
+        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());                        
                   \
+        bool serialized = false;                                               
                   \
+        RETURN_IF_ERROR(                                                       
                   \
+                _serializer.next_serialized_block(block, PBLOCK, 
_channels.size(), &serialized)); \
+        if (serialized) {                                                      
                   \
+            Status status;                                                     
                   \
+            for (auto channel : _channels) {                                   
                   \
+                if (!channel->is_receiver_eof()) {                             
                   \
+                    if (channel->is_local()) {                                 
                   \
+                        status = channel->send_local_block(block);             
                   \
+                    } else {                                                   
                   \
+                        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());        
                   \
+                        status = channel->send_block(PBLOCK_TO_SEND, false);   
                   \
+                    }                                                          
                   \
+                    HANDLE_CHANNEL_STATUS(state, channel, status);             
                   \
+                }                                                              
                   \
+            }                                                                  
                   \
+            POST_PROCESS;                                                      
                   \
+        }                                                                      
                   \
     }
 #endif
         // 1. serialize depends on it is not local exchange
@@ -574,7 +547,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block, bool eos) {
             } else {
                 SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
                 RETURN_IF_ERROR(
-                        _serializer->serialize_block(block, 
current_channel->ch_cur_pb_block()));
+                        _serializer.serialize_block(block, 
current_channel->ch_cur_pb_block()));
                 auto status = 
current_channel->send_block(current_channel->ch_cur_pb_block(), eos);
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
                 current_channel->ch_roll_pb_block();
@@ -648,14 +621,14 @@ Status VDataStreamSender::send(RuntimeState* state, 
Block* block, bool eos) {
 }
 
 Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) {
-    if (_serializer->get_block() && _serializer->get_block()->rows() > 0) {
+    if (_serializer.get_block() && _serializer.get_block()->rows() > 0) {
         BroadcastPBlockHolder* block_holder = nullptr;
         RETURN_IF_ERROR(_get_next_available_buffer(&block_holder));
         {
             SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-            Block block = _serializer->get_block()->to_block();
-            RETURN_IF_ERROR(_serializer->serialize_block(&block, 
block_holder->get_block(),
-                                                         _channels.size()));
+            Block block = _serializer.get_block()->to_block();
+            RETURN_IF_ERROR(_serializer.serialize_block(&block, 
block_holder->get_block(),
+                                                        _channels.size()));
             Status status;
             for (auto channel : _channels) {
                 if (!channel->is_receiver_eof()) {
@@ -690,10 +663,10 @@ Status VDataStreamSender::close(RuntimeState* state, 
Status exec_status) {
         {
             // send last block
             SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-            if (_serializer && _serializer->get_block() && 
_serializer->get_block()->rows() > 0) {
-                Block block = _serializer->get_block()->to_block();
+            if (_serializer.get_block() && _serializer.get_block()->rows() > 
0) {
+                Block block = _serializer.get_block()->to_block();
                 RETURN_IF_ERROR(
-                        _serializer->serialize_block(&block, _cur_pb_block, 
_channels.size()));
+                        _serializer.serialize_block(&block, _cur_pb_block, 
_channels.size()));
                 Status status;
                 for (auto channel : _channels) {
                     if (!channel->is_receiver_eof()) {
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 77dfe0b20a..770a381da4 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -97,14 +97,11 @@ public:
                       const std::vector<TPlanFragmentDestination>& 
destinations,
                       int per_channel_buffer_size, bool 
send_query_statistics_with_every_batch);
 
-    VDataStreamSender(ObjectPool* pool, int sender_id, const RowDescriptor& 
row_desc,
-                      PlanNodeId dest_node_id,
+    VDataStreamSender(RuntimeState* state, ObjectPool* pool, int sender_id,
+                      const RowDescriptor& row_desc, PlanNodeId dest_node_id,
                       const std::vector<TPlanFragmentDestination>& 
destinations,
                       int per_channel_buffer_size, bool 
send_query_statistics_with_every_batch);
 
-    VDataStreamSender(ObjectPool* pool, const RowDescriptor& row_desc, int 
per_channel_buffer_size,
-                      bool send_query_statistics_with_every_batch);
-
     ~VDataStreamSender() override;
 
     Status init(const TDataSink& thrift_sink) override;
@@ -209,7 +206,7 @@ protected:
     bool _only_local_exchange = false;
     bool _enable_pipeline_exec = false;
 
-    std::unique_ptr<BlockSerializer> _serializer;
+    BlockSerializer _serializer;
 };
 
 class Channel {
@@ -234,10 +231,10 @@ public:
               _closed(false),
               _brpc_dest_addr(brpc_dest),
               _is_transfer_chain(is_transfer_chain),
-              
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch) 
{
-        std::string localhost = BackendOptions::get_localhost();
-        _is_local = (_brpc_dest_addr.hostname == localhost) &&
-                    (_brpc_dest_addr.port == config::brpc_port);
+              
_send_query_statistics_with_every_batch(send_query_statistics_with_every_batch),
+              _is_local((_brpc_dest_addr.hostname == 
BackendOptions::get_localhost()) &&
+                        (_brpc_dest_addr.port == config::brpc_port)),
+              _serializer(_parent, _is_local) {
         if (_is_local) {
             VLOG_NOTICE << "will use local Exchange, dest_node_id is : " << 
_dest_node_id;
         }
@@ -385,7 +382,7 @@ protected:
     PBlock _ch_pb_block1;
     PBlock _ch_pb_block2;
 
-    std::unique_ptr<BlockSerializer> _serializer;
+    BlockSerializer _serializer;
 };
 
 #define HANDLE_CHANNEL_STATUS(state, channel, status)    \
@@ -490,7 +487,7 @@ public:
         bool serialized = false;
         _pblock = std::make_unique<PBlock>();
         RETURN_IF_ERROR(
-                _serializer->next_serialized_block(block, _pblock.get(), 1, 
&serialized, &rows));
+                _serializer.next_serialized_block(block, _pblock.get(), 1, 
&serialized, &rows));
         if (serialized) {
             RETURN_IF_ERROR(send_current_block(false));
         }
@@ -506,7 +503,7 @@ public:
         SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
         if (eos) {
             _pblock = std::make_unique<PBlock>();
-            RETURN_IF_ERROR(_serializer->serialize_block(_pblock.get(), 1));
+            RETURN_IF_ERROR(_serializer.serialize_block(_pblock.get(), 1));
         }
         RETURN_IF_ERROR(send_block(_pblock.release(), eos));
         return Status::OK();
diff --git a/be/src/vec/sink/vresult_file_sink.cpp 
b/be/src/vec/sink/vresult_file_sink.cpp
index c5f4c0358e..5b2003d8c2 100644
--- a/be/src/vec/sink/vresult_file_sink.cpp
+++ b/be/src/vec/sink/vresult_file_sink.cpp
@@ -45,8 +45,9 @@ class TExpr;
 
 namespace doris::vectorized {
 
-VResultFileSink::VResultFileSink(ObjectPool* pool, const RowDescriptor& 
row_desc,
-                                 const TResultFileSink& sink, int 
per_channel_buffer_size,
+VResultFileSink::VResultFileSink(RuntimeState* state, ObjectPool* pool,
+                                 const RowDescriptor& row_desc, const 
TResultFileSink& sink,
+                                 int per_channel_buffer_size,
                                  bool send_query_statistics_with_every_batch,
                                  const std::vector<TExpr>& t_output_expr)
         : _t_output_expr(t_output_expr), _row_desc(row_desc) {
@@ -62,8 +63,8 @@ VResultFileSink::VResultFileSink(ObjectPool* pool, const 
RowDescriptor& row_desc
     _header = sink.header;
 }
 
-VResultFileSink::VResultFileSink(ObjectPool* pool, int sender_id, const 
RowDescriptor& row_desc,
-                                 const TResultFileSink& sink,
+VResultFileSink::VResultFileSink(RuntimeState* state, ObjectPool* pool, int 
sender_id,
+                                 const RowDescriptor& row_desc, const 
TResultFileSink& sink,
                                  const std::vector<TPlanFragmentDestination>& 
destinations,
                                  int per_channel_buffer_size,
                                  bool send_query_statistics_with_every_batch,
@@ -77,7 +78,7 @@ VResultFileSink::VResultFileSink(ObjectPool* pool, int 
sender_id, const RowDescr
     _storage_type = sink.storage_backend_type;
     _is_top_sink = false;
     CHECK_EQ(destinations.size(), 1);
-    _stream_sender.reset(new VDataStreamSender(pool, sender_id, row_desc, 
sink.dest_node_id,
+    _stream_sender.reset(new VDataStreamSender(state, pool, sender_id, 
row_desc, sink.dest_node_id,
                                                destinations, 
per_channel_buffer_size,
                                                
send_query_statistics_with_every_batch));
 
diff --git a/be/src/vec/sink/vresult_file_sink.h 
b/be/src/vec/sink/vresult_file_sink.h
index 90bc06bb42..c8d2f3be18 100644
--- a/be/src/vec/sink/vresult_file_sink.h
+++ b/be/src/vec/sink/vresult_file_sink.h
@@ -46,11 +46,12 @@ class VExprContext;
 
 class VResultFileSink : public DataSink {
 public:
-    VResultFileSink(ObjectPool* pool, const RowDescriptor& row_desc, const 
TResultFileSink& sink,
-                    int per_channel_buffer_size, bool 
send_query_statistics_with_every_batch,
+    VResultFileSink(RuntimeState* state, ObjectPool* pool, const 
RowDescriptor& row_desc,
+                    const TResultFileSink& sink, int per_channel_buffer_size,
+                    bool send_query_statistics_with_every_batch,
                     const std::vector<TExpr>& t_output_expr);
-    VResultFileSink(ObjectPool* pool, int sender_id, const RowDescriptor& 
row_desc,
-                    const TResultFileSink& sink,
+    VResultFileSink(RuntimeState* state, ObjectPool* pool, int sender_id,
+                    const RowDescriptor& row_desc, const TResultFileSink& sink,
                     const std::vector<TPlanFragmentDestination>& destinations,
                     int per_channel_buffer_size, bool 
send_query_statistics_with_every_batch,
                     const std::vector<TExpr>& t_output_expr, DescriptorTbl& 
descs);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to