This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new bcac160013 [fix](broadcast shuffle) fix wrong result of broadcast 
shuffle (#22847)
bcac160013 is described below

commit bcac1600134884cc8c687fd381ba5e5f7826d3ee
Author: TengJianPing <[email protected]>
AuthorDate: Fri Aug 11 17:01:11 2023 +0800

    [fix](broadcast shuffle) fix wrong result of broadcast shuffle (#22847)
    
    When data stream sender is doing broadcast shuffle, it accumulate to batch 
size and then send blocks to destinations, but for local receivers, it ONLY 
send the current block, which will cause data loss.
    
    This issue is introduced by #22218.
    
    If #22218 is pick to 2.0 branch, then also need to pick this PR.
---
 be/src/vec/sink/vdata_stream_sender.cpp | 67 ++++++++++++++++++---------------
 be/src/vec/sink/vdata_stream_sender.h   | 12 +++---
 2 files changed, 43 insertions(+), 36 deletions(-)

diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index 2fd86e0c38..e8f2d6dbe8 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -111,7 +111,7 @@ Status Channel::send_current_block(bool eos) {
     }
     SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
     if (eos) {
-        RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1));
+        RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1, 
true));
     }
     RETURN_IF_ERROR(send_block(_ch_cur_pb_block, eos));
     ch_roll_pb_block();
@@ -202,8 +202,8 @@ Status Channel::add_rows(Block* block, const 
std::vector<int>& rows) {
     }
 
     bool serialized = false;
-    RETURN_IF_ERROR(
-            _serializer.next_serialized_block(block, _ch_cur_pb_block, 1, 
&serialized, &rows));
+    RETURN_IF_ERROR(_serializer.next_serialized_block(block, _ch_cur_pb_block, 
1, &serialized,
+                                                      &rows, true));
     if (serialized) {
         RETURN_IF_ERROR(send_current_block(false));
     }
@@ -494,27 +494,30 @@ Status VDataStreamSender::send(RuntimeState* state, 
Block* block, bool eos) {
 
     if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) {
 #ifndef BROADCAST_ALL_CHANNELS
-#define BROADCAST_ALL_CHANNELS(PBLOCK, PBLOCK_TO_SEND, POST_PROCESS)           
                   \
-    {                                                                          
                   \
-        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());                        
                   \
-        bool serialized = false;                                               
                   \
-        RETURN_IF_ERROR(                                                       
                   \
-                _serializer.next_serialized_block(block, PBLOCK, 
_channels.size(), &serialized)); \
-        if (serialized) {                                                      
                   \
-            Status status;                                                     
                   \
-            for (auto channel : _channels) {                                   
                   \
-                if (!channel->is_receiver_eof()) {                             
                   \
-                    if (channel->is_local()) {                                 
                   \
-                        status = channel->send_local_block(block);             
                   \
-                    } else {                                                   
                   \
-                        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());        
                   \
-                        status = channel->send_block(PBLOCK_TO_SEND, false);   
                   \
-                    }                                                          
                   \
-                    HANDLE_CHANNEL_STATUS(state, channel, status);             
                   \
-                }                                                              
                   \
-            }                                                                  
                   \
-            POST_PROCESS;                                                      
                   \
-        }                                                                      
                   \
+#define BROADCAST_ALL_CHANNELS(PBLOCK, PBLOCK_TO_SEND, POST_PROCESS)           
            \
+    {                                                                          
            \
+        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());                        
            \
+        bool serialized = false;                                               
            \
+        RETURN_IF_ERROR(_serializer.next_serialized_block(block, PBLOCK, 
_channels.size(), \
+                                                          &serialized, 
nullptr, false));   \
+        if (serialized) {                                                      
            \
+            Status status;                                                     
            \
+            Block merged_block = _serializer.get_block()->to_block();          
            \
+            for (auto channel : _channels) {                                   
            \
+                if (!channel->is_receiver_eof()) {                             
            \
+                    if (channel->is_local()) {                                 
            \
+                        status = channel->send_local_block(&merged_block);     
            \
+                    } else {                                                   
            \
+                        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());        
            \
+                        status = channel->send_block(PBLOCK_TO_SEND, false);   
            \
+                    }                                                          
            \
+                    HANDLE_CHANNEL_STATUS(state, channel, status);             
            \
+                }                                                              
            \
+            }                                                                  
            \
+            merged_block.clear_column_data();                                  
            \
+            
_serializer.get_block()->set_muatable_columns(merged_block.mutate_columns());  \
+            POST_PROCESS;                                                      
            \
+        }                                                                      
            \
     }
 #endif
         // 1. serialize depends on it is not local exchange
@@ -547,7 +550,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block, bool eos) {
             } else {
                 SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
                 RETURN_IF_ERROR(
-                        _serializer.serialize_block(block, 
current_channel->ch_cur_pb_block()));
+                        _serializer.serialize_block(block, 
current_channel->ch_cur_pb_block(), 1));
                 auto status = 
current_channel->send_block(current_channel->ch_cur_pb_block(), eos);
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
                 current_channel->ch_roll_pb_block();
@@ -707,7 +710,8 @@ BlockSerializer::BlockSerializer(VDataStreamSender* parent, 
bool is_local)
         : _parent(parent), _is_local(is_local), 
_batch_size(parent->state()->batch_size()) {}
 
 Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, int 
num_receivers,
-                                              bool* serialized, const 
std::vector<int>* rows) {
+                                              bool* serialized, const 
std::vector<int>* rows,
+                                              bool clear_after_serialize) {
     if (_mutable_block == nullptr) {
         SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
         _mutable_block = MutableBlock::create_unique(block->clone_empty());
@@ -727,7 +731,7 @@ Status BlockSerializer::next_serialized_block(Block* block, 
PBlock* dest, int nu
 
     if (_mutable_block->rows() >= _batch_size) {
         if (!_is_local) {
-            RETURN_IF_ERROR(serialize_block(dest, num_receivers));
+            RETURN_IF_ERROR(serialize_block(dest, num_receivers, 
clear_after_serialize));
         }
         *serialized = true;
         return Status::OK();
@@ -736,18 +740,21 @@ Status BlockSerializer::next_serialized_block(Block* 
block, PBlock* dest, int nu
     return Status::OK();
 }
 
-Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers) {
+Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers,
+                                        bool clear_after_serialize) {
     if (_mutable_block && _mutable_block->rows() > 0) {
         auto block = _mutable_block->to_block();
         RETURN_IF_ERROR(serialize_block(&block, dest, num_receivers));
-        block.clear_column_data();
+        if (clear_after_serialize) {
+            block.clear_column_data();
+        }
         _mutable_block->set_muatable_columns(block.mutate_columns());
     }
 
     return Status::OK();
 }
 
-Status BlockSerializer::serialize_block(Block* src, PBlock* dest, int 
num_receivers) {
+Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, int 
num_receivers) {
     {
         SCOPED_TIMER(_parent->_serialize_batch_timer);
         dest->Clear();
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 770a381da4..2649e077d3 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -73,9 +73,9 @@ class BlockSerializer {
 public:
     BlockSerializer(VDataStreamSender* parent, bool is_local = false);
     Status next_serialized_block(Block* src, PBlock* dest, int num_receivers, 
bool* serialized,
-                                 const std::vector<int>* rows = nullptr);
-    Status serialize_block(PBlock* dest, int num_receivers = 1);
-    Status serialize_block(Block* src, PBlock* dest, int num_receivers = 1);
+                                 const std::vector<int>* rows, bool 
clear_after_serialize);
+    Status serialize_block(PBlock* dest, int num_receivers, bool 
clear_after_serialize);
+    Status serialize_block(const Block* src, PBlock* dest, int num_receivers);
 
     MutableBlock* get_block() const { return _mutable_block.get(); }
 
@@ -486,8 +486,8 @@ public:
 
         bool serialized = false;
         _pblock = std::make_unique<PBlock>();
-        RETURN_IF_ERROR(
-                _serializer.next_serialized_block(block, _pblock.get(), 1, 
&serialized, &rows));
+        RETURN_IF_ERROR(_serializer.next_serialized_block(block, 
_pblock.get(), 1, &serialized,
+                                                          &rows, true));
         if (serialized) {
             RETURN_IF_ERROR(send_current_block(false));
         }
@@ -503,7 +503,7 @@ public:
         SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
         if (eos) {
             _pblock = std::make_unique<PBlock>();
-            RETURN_IF_ERROR(_serializer.serialize_block(_pblock.get(), 1));
+            RETURN_IF_ERROR(_serializer.serialize_block(_pblock.get(), 1, 
true));
         }
         RETURN_IF_ERROR(send_block(_pblock.release(), eos));
         return Status::OK();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to