This is an automated email from the ASF dual-hosted git repository.

panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new abc9de07b3 [Bug](pipeline) make sure sink is not blocked before try 
close (#22765)
abc9de07b3 is described below

commit abc9de07b3c953ae2be45635f089b8705d595b63
Author: Gabriel <[email protected]>
AuthorDate: Sun Aug 13 13:20:48 2023 +0800

    [Bug](pipeline) make sure sink is not blocked before try close (#22765)
    
    make sure sink is not blocked before try close
---
 be/src/pipeline/exec/operator.h         |  13 +-
 be/src/vec/sink/vdata_stream_sender.cpp | 225 ++++++++++++++++----------------
 be/src/vec/sink/vdata_stream_sender.h   |  28 ++--
 3 files changed, 133 insertions(+), 133 deletions(-)

diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index acf55cb7bc..55335c093a 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -273,15 +273,12 @@ public:
 
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override {
-        if (in_block->rows() > 0) {
-            auto st = _sink->send(state, in_block, source_state == 
SourceState::FINISHED);
-            // TODO: improvement: if sink returned END_OF_FILE, pipeline task 
can be finished
-            if (st.template is<ErrorCode::END_OF_FILE>()) {
-                return Status::OK();
-            }
-            return st;
+        auto st = _sink->send(state, in_block, source_state == 
SourceState::FINISHED);
+        // TODO: improvement: if sink returned END_OF_FILE, pipeline task can 
be finished
+        if (st.template is<ErrorCode::END_OF_FILE>()) {
+            return Status::OK();
         }
-        return Status::OK();
+        return st;
     }
 
     Status try_close(RuntimeState* state) override {
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index e8f2d6dbe8..9f7c34a6c3 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -111,7 +111,7 @@ Status Channel::send_current_block(bool eos) {
     }
     SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
     if (eos) {
-        RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1, 
true));
+        RETURN_IF_ERROR(_serializer.serialize_block(_ch_cur_pb_block, 1));
     }
     RETURN_IF_ERROR(send_block(_ch_cur_pb_block, eos));
     ch_roll_pb_block();
@@ -196,14 +196,14 @@ Status Channel::send_block(PBlock* block, bool eos) {
     return Status::OK();
 }
 
-Status Channel::add_rows(Block* block, const std::vector<int>& rows) {
+Status Channel::add_rows(Block* block, const std::vector<int>& rows, bool eos) 
{
     if (_fragment_instance_id.lo == -1) {
         return Status::OK();
     }
 
     bool serialized = false;
-    RETURN_IF_ERROR(_serializer.next_serialized_block(block, _ch_cur_pb_block, 
1, &serialized,
-                                                      &rows, true));
+    RETURN_IF_ERROR(
+            _serializer.next_serialized_block(block, _ch_cur_pb_block, 1, 
&serialized, eos, &rows));
     if (serialized) {
         RETURN_IF_ERROR(send_current_block(false));
     }
@@ -493,52 +493,72 @@ Status VDataStreamSender::send(RuntimeState* state, 
Block* block, bool eos) {
     }
 
     if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) {
-#ifndef BROADCAST_ALL_CHANNELS
-#define BROADCAST_ALL_CHANNELS(PBLOCK, PBLOCK_TO_SEND, POST_PROCESS)           
            \
-    {                                                                          
            \
-        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());                        
            \
-        bool serialized = false;                                               
            \
-        RETURN_IF_ERROR(_serializer.next_serialized_block(block, PBLOCK, 
_channels.size(), \
-                                                          &serialized, 
nullptr, false));   \
-        if (serialized) {                                                      
            \
-            Status status;                                                     
            \
-            Block merged_block = _serializer.get_block()->to_block();          
            \
-            for (auto channel : _channels) {                                   
            \
-                if (!channel->is_receiver_eof()) {                             
            \
-                    if (channel->is_local()) {                                 
            \
-                        status = channel->send_local_block(&merged_block);     
            \
-                    } else {                                                   
            \
-                        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());        
            \
-                        status = channel->send_block(PBLOCK_TO_SEND, false);   
            \
-                    }                                                          
            \
-                    HANDLE_CHANNEL_STATUS(state, channel, status);             
            \
-                }                                                              
            \
-            }                                                                  
            \
-            merged_block.clear_column_data();                                  
            \
-            
_serializer.get_block()->set_muatable_columns(merged_block.mutate_columns());  \
-            POST_PROCESS;                                                      
            \
-        }                                                                      
            \
-    }
-#endif
         // 1. serialize depends on it is not local exchange
         // 2. send block
         // 3. rollover block
         if (_only_local_exchange) {
-            Status status;
-            for (auto channel : _channels) {
-                if (!channel->is_receiver_eof()) {
-                    status = channel->send_local_block(block);
-                    HANDLE_CHANNEL_STATUS(state, channel, status);
+            if (!block->empty()) {
+                Status status;
+                for (auto channel : _channels) {
+                    if (!channel->is_receiver_eof()) {
+                        status = channel->send_local_block(block);
+                        HANDLE_CHANNEL_STATUS(state, channel, status);
+                    }
                 }
             }
         } else if (_enable_pipeline_exec) {
             BroadcastPBlockHolder* block_holder = nullptr;
             RETURN_IF_ERROR(_get_next_available_buffer(&block_holder));
-            BROADCAST_ALL_CHANNELS(block_holder->get_block(), block_holder, );
+            {
+                SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+                bool serialized = false;
+                RETURN_IF_ERROR(_serializer.next_serialized_block(
+                        block, block_holder->get_block(), _channels.size(), 
&serialized, eos));
+                if (serialized) {
+                    auto cur_block = _serializer.get_block()->to_block();
+                    if (!cur_block.empty()) {
+                        _serializer.serialize_block(&cur_block, 
block_holder->get_block(),
+                                                    _channels.size());
+                    } else {
+                        block_holder->get_block()->Clear();
+                    }
+                    Status status;
+                    for (auto channel : _channels) {
+                        if (!channel->is_receiver_eof()) {
+                            if (channel->is_local()) {
+                                status = channel->send_local_block(block);
+                            } else {
+                                SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+                                status = channel->send_block(block_holder, 
eos);
+                            }
+                            HANDLE_CHANNEL_STATUS(state, channel, status);
+                        }
+                    }
+                    cur_block.clear_column_data();
+                    
_serializer.get_block()->set_muatable_columns(cur_block.mutate_columns());
+                }
+            }
         } else {
-            BROADCAST_ALL_CHANNELS(_cur_pb_block, _cur_pb_block, 
_roll_pb_block());
+            SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+            bool serialized = false;
+            RETURN_IF_ERROR(_serializer.next_serialized_block(
+                    block, _cur_pb_block, _channels.size(), &serialized, 
false));
+            if (serialized) {
+                Status status;
+                for (auto channel : _channels) {
+                    if (!channel->is_receiver_eof()) {
+                        if (channel->is_local()) {
+                            status = channel->send_local_block(block);
+                        } else {
+                            SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+                            status = channel->send_block(_cur_pb_block, false);
+                        }
+                        HANDLE_CHANNEL_STATUS(state, channel, status);
+                    }
+                }
+                _roll_pb_block();
+            }
         }
-#undef BROADCAST_ALL_CHANNELS
     } else if (_part_type == TPartitionType::RANDOM) {
         // 1. select channel
         Channel* current_channel = _channels[_current_channel_idx];
@@ -550,7 +570,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block, bool eos) {
             } else {
                 SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
                 RETURN_IF_ERROR(
-                        _serializer.serialize_block(block, 
current_channel->ch_cur_pb_block(), 1));
+                        _serializer.serialize_block(block, 
current_channel->ch_cur_pb_block()));
                 auto status = 
current_channel->send_block(current_channel->ch_cur_pb_block(), eos);
                 HANDLE_CHANNEL_STATUS(state, current_channel, status);
                 current_channel->ch_roll_pb_block();
@@ -565,10 +585,6 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block, bool eos) {
 
         int result_size = _partition_expr_ctxs.size();
         int result[result_size];
-        {
-            SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-            RETURN_IF_ERROR(get_partition_column_result(block, result));
-        }
 
         // vectorized calculate hash
         int rows = block->rows();
@@ -576,44 +592,53 @@ Status VDataStreamSender::send(RuntimeState* state, 
Block* block, bool eos) {
         std::vector<uint64_t> hash_vals(rows);
         auto* __restrict hashes = hash_vals.data();
 
-        // TODO: after we support new shuffle hash method, should simple the 
code
-        if (_part_type == TPartitionType::HASH_PARTITIONED) {
-            SCOPED_TIMER(_split_block_hash_compute_timer);
-            // result[j] means column index, i means rows index, here to 
calculate the xxhash value
-            for (int j = 0; j < result_size; ++j) {
-                // complex type most not implement get_data_at() method which 
column_const will call
-                unpack_if_const(block->get_by_position(result[j]).column)
-                        .first->update_hashes_with_value(hashes);
-            }
-
-            for (int i = 0; i < rows; i++) {
-                hashes[i] = hashes[i] % element_size;
-            }
-
+        if (rows > 0) {
             {
                 SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-                Block::erase_useless_column(block, column_to_keep);
+                RETURN_IF_ERROR(get_partition_column_result(block, result));
             }
+            // TODO: after we support new shuffle hash method, should simple 
the code
+            if (_part_type == TPartitionType::HASH_PARTITIONED) {
+                SCOPED_TIMER(_split_block_hash_compute_timer);
+                // result[j] means column index, i means rows index, here to 
calculate the xxhash value
+                for (int j = 0; j < result_size; ++j) {
+                    // complex type most not implement get_data_at() method 
which column_const will call
+                    unpack_if_const(block->get_by_position(result[j]).column)
+                            .first->update_hashes_with_value(hashes);
+                }
 
-            RETURN_IF_ERROR(channel_add_rows(state, _channels, element_size, 
hashes, rows, block));
-        } else {
-            for (int j = 0; j < result_size; ++j) {
-                // complex type most not implement get_data_at() method which 
column_const will call
-                unpack_if_const(block->get_by_position(result[j]).column)
-                        .first->update_crcs_with_value(
-                                hash_vals, 
_partition_expr_ctxs[j]->root()->type().type);
-            }
-            element_size = _channel_shared_ptrs.size();
-            for (int i = 0; i < rows; i++) {
-                hashes[i] = hashes[i] % element_size;
-            }
+                for (int i = 0; i < rows; i++) {
+                    hashes[i] = hashes[i] % element_size;
+                }
 
-            {
-                SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-                Block::erase_useless_column(block, column_to_keep);
+                {
+                    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+                    Block::erase_useless_column(block, column_to_keep);
+                }
+            } else {
+                for (int j = 0; j < result_size; ++j) {
+                    // complex type most not implement get_data_at() method 
which column_const will call
+                    unpack_if_const(block->get_by_position(result[j]).column)
+                            .first->update_crcs_with_value(
+                                    hash_vals, 
_partition_expr_ctxs[j]->root()->type().type);
+                }
+                element_size = _channel_shared_ptrs.size();
+                for (int i = 0; i < rows; i++) {
+                    hashes[i] = hashes[i] % element_size;
+                }
+
+                {
+                    SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
+                    Block::erase_useless_column(block, column_to_keep);
+                }
             }
+        }
+        if (_part_type == TPartitionType::HASH_PARTITIONED) {
+            RETURN_IF_ERROR(channel_add_rows(state, _channels, element_size, 
hashes, rows, block,
+                                             _enable_pipeline_exec ? eos : 
false));
+        } else {
             RETURN_IF_ERROR(channel_add_rows(state, _channel_shared_ptrs, 
element_size, hashes,
-                                             rows, block));
+                                             rows, block, 
_enable_pipeline_exec ? eos : false));
         }
     } else {
         // Range partition
@@ -624,28 +649,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block* 
block, bool eos) {
 }
 
 Status VDataStreamSender::try_close(RuntimeState* state, Status exec_status) {
-    if (_serializer.get_block() && _serializer.get_block()->rows() > 0) {
-        BroadcastPBlockHolder* block_holder = nullptr;
-        RETURN_IF_ERROR(_get_next_available_buffer(&block_holder));
-        {
-            SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-            Block block = _serializer.get_block()->to_block();
-            RETURN_IF_ERROR(_serializer.serialize_block(&block, 
block_holder->get_block(),
-                                                        _channels.size()));
-            Status status;
-            for (auto channel : _channels) {
-                if (!channel->is_receiver_eof()) {
-                    if (channel->is_local()) {
-                        status = channel->send_local_block(&block);
-                    } else {
-                        SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-                        status = channel->send_block(block_holder, false);
-                    }
-                    HANDLE_CHANNEL_STATUS(state, channel, status);
-                }
-            }
-        }
-    }
+    DCHECK(_serializer.get_block() == nullptr || 
_serializer.get_block()->rows() == 0);
     Status final_st = Status::OK();
     for (int i = 0; i < _channels.size(); ++i) {
         Status st = _channels[i]->close(state);
@@ -710,8 +714,8 @@ BlockSerializer::BlockSerializer(VDataStreamSender* parent, 
bool is_local)
         : _parent(parent), _is_local(is_local), 
_batch_size(parent->state()->batch_size()) {}
 
 Status BlockSerializer::next_serialized_block(Block* block, PBlock* dest, int 
num_receivers,
-                                              bool* serialized, const 
std::vector<int>* rows,
-                                              bool clear_after_serialize) {
+                                              bool* serialized, bool eos,
+                                              const std::vector<int>* rows) {
     if (_mutable_block == nullptr) {
         SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
         _mutable_block = MutableBlock::create_unique(block->clone_empty());
@@ -720,18 +724,20 @@ Status BlockSerializer::next_serialized_block(Block* 
block, PBlock* dest, int nu
     {
         SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
         if (rows) {
-            SCOPED_TIMER(_parent->_split_block_distribute_by_channel_timer);
-            const int* begin = &(*rows)[0];
-            _mutable_block->add_rows(block, begin, begin + rows->size());
-        } else {
+            if (rows->size() > 0) {
+                
SCOPED_TIMER(_parent->_split_block_distribute_by_channel_timer);
+                const int* begin = &(*rows)[0];
+                _mutable_block->add_rows(block, begin, begin + rows->size());
+            }
+        } else if (!block->empty()) {
             SCOPED_TIMER(_parent->_merge_block_timer);
             RETURN_IF_ERROR(_mutable_block->merge(*block));
         }
     }
 
-    if (_mutable_block->rows() >= _batch_size) {
+    if (_mutable_block->rows() >= _batch_size || eos) {
         if (!_is_local) {
-            RETURN_IF_ERROR(serialize_block(dest, num_receivers, 
clear_after_serialize));
+            RETURN_IF_ERROR(serialize_block(dest, num_receivers));
         }
         *serialized = true;
         return Status::OK();
@@ -740,21 +746,18 @@ Status BlockSerializer::next_serialized_block(Block* 
block, PBlock* dest, int nu
     return Status::OK();
 }
 
-Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers,
-                                        bool clear_after_serialize) {
+Status BlockSerializer::serialize_block(PBlock* dest, int num_receivers) {
     if (_mutable_block && _mutable_block->rows() > 0) {
         auto block = _mutable_block->to_block();
         RETURN_IF_ERROR(serialize_block(&block, dest, num_receivers));
-        if (clear_after_serialize) {
-            block.clear_column_data();
-        }
+        block.clear_column_data();
         _mutable_block->set_muatable_columns(block.mutate_columns());
     }
 
     return Status::OK();
 }
 
-Status BlockSerializer::serialize_block(const Block* src, PBlock* dest, int 
num_receivers) {
+Status BlockSerializer::serialize_block(Block* src, PBlock* dest, int 
num_receivers) {
     {
         SCOPED_TIMER(_parent->_serialize_batch_timer);
         dest->Clear();
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 2649e077d3..503a73798e 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -71,11 +71,11 @@ class VDataStreamSender;
 
 class BlockSerializer {
 public:
-    BlockSerializer(VDataStreamSender* parent, bool is_local = false);
+    BlockSerializer(VDataStreamSender* parent, bool is_local = true);
     Status next_serialized_block(Block* src, PBlock* dest, int num_receivers, 
bool* serialized,
-                                 const std::vector<int>* rows, bool 
clear_after_serialize);
-    Status serialize_block(PBlock* dest, int num_receivers, bool 
clear_after_serialize);
-    Status serialize_block(const Block* src, PBlock* dest, int num_receivers);
+                                 bool eos, const std::vector<int>* rows = 
nullptr);
+    Status serialize_block(PBlock* dest, int num_receivers = 1);
+    Status serialize_block(Block* src, PBlock* dest, int num_receivers = 1);
 
     MutableBlock* get_block() const { return _mutable_block.get(); }
 
@@ -141,7 +141,7 @@ protected:
 
     template <typename Channels>
     Status channel_add_rows(RuntimeState* state, Channels& channels, int 
num_channels,
-                            const uint64_t* channel_ids, int rows, Block* 
block);
+                            const uint64_t* channel_ids, int rows, Block* 
block, bool eos);
 
     template <typename ChannelPtrType>
     void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, 
Status st);
@@ -264,7 +264,7 @@ public:
         return Status::InternalError("Send BroadcastPBlockHolder is not 
allowed!");
     }
 
-    virtual Status add_rows(Block* block, const std::vector<int>& row);
+    virtual Status add_rows(Block* block, const std::vector<int>& row, bool 
eos);
 
     virtual Status send_current_block(bool eos);
 
@@ -397,7 +397,7 @@ protected:
 template <typename Channels>
 Status VDataStreamSender::channel_add_rows(RuntimeState* state, Channels& 
channels,
                                            int num_channels, const uint64_t* 
__restrict channel_ids,
-                                           int rows, Block* block) {
+                                           int rows, Block* block, bool eos) {
     std::vector<int> channel2rows[num_channels];
 
     for (int i = 0; i < rows; i++) {
@@ -406,8 +406,8 @@ Status VDataStreamSender::channel_add_rows(RuntimeState* 
state, Channels& channe
 
     Status status;
     for (int i = 0; i < num_channels; ++i) {
-        if (!channels[i]->is_receiver_eof() && !channel2rows[i].empty()) {
-            status = channels[i]->add_rows(block, channel2rows[i]);
+        if (!channels[i]->is_receiver_eof() && (!channel2rows[i].empty() || 
eos)) {
+            status = channels[i]->add_rows(block, channel2rows[i], eos);
             HANDLE_CHANNEL_STATUS(state, channels[i], status);
         }
     }
@@ -479,17 +479,17 @@ public:
         return Status::OK();
     }
 
-    Status add_rows(Block* block, const std::vector<int>& rows) override {
+    Status add_rows(Block* block, const std::vector<int>& rows, bool eos) 
override {
         if (_fragment_instance_id.lo == -1) {
             return Status::OK();
         }
 
         bool serialized = false;
         _pblock = std::make_unique<PBlock>();
-        RETURN_IF_ERROR(_serializer.next_serialized_block(block, 
_pblock.get(), 1, &serialized,
-                                                          &rows, true));
+        RETURN_IF_ERROR(_serializer.next_serialized_block(block, 
_pblock.get(), 1, &serialized, eos,
+                                                          &rows));
         if (serialized) {
-            RETURN_IF_ERROR(send_current_block(false));
+            RETURN_IF_ERROR(send_current_block(eos));
         }
 
         return Status::OK();
@@ -503,7 +503,7 @@ public:
         SCOPED_CONSUME_MEM_TRACKER(_parent->_mem_tracker.get());
         if (eos) {
             _pblock = std::make_unique<PBlock>();
-            RETURN_IF_ERROR(_serializer.serialize_block(_pblock.get(), 1, 
true));
+            RETURN_IF_ERROR(_serializer.serialize_block(_pblock.get(), 1));
         }
         RETURN_IF_ERROR(send_block(_pblock.release(), eos));
         return Status::OK();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to