This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d390e63a03 [enhancement](stream receiver) make stream receiver 
exception safe (#16412)
d390e63a03 is described below

commit d390e63a031d8aa79ee3a2593cfc3e7f51a48e33
Author: yiguolei <[email protected]>
AuthorDate: Tue Feb 7 12:44:20 2023 +0800

    [enhancement](stream receiver) make stream receiver exception safe (#16412)
    
    make stream receiver exception safe
    change get_block(block**) to get_block(block* , bool* eos) unify stream 
semantic
---
 be/src/util/proto_util.h                  |  19 ------
 be/src/vec/common/sort/sorter.cpp         |   2 +-
 be/src/vec/core/block.h                   |   1 +
 be/src/vec/core/block_spill_reader.cpp    |  15 ++---
 be/src/vec/core/block_spill_reader.h      |   3 +-
 be/src/vec/core/sort_cursor.h             |  39 +++++------
 be/src/vec/exec/vexchange_node.cpp        |   2 +-
 be/src/vec/runtime/vdata_stream_recvr.cpp |  68 +++++++++----------
 be/src/vec/runtime/vdata_stream_recvr.h   |  23 +++----
 be/test/vec/core/block_spill_test.cpp     | 105 ++++++++++++++++--------------
 10 files changed, 123 insertions(+), 154 deletions(-)

diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h
index 5828b8eeec..28f5955b15 100644
--- a/be/src/util/proto_util.h
+++ b/be/src/util/proto_util.h
@@ -142,15 +142,6 @@ inline void attachment_transfer_request_block(const 
Params* brpc_request, brpc::
     }
 }
 
-// Embed tuple_data and brpc request serialization string in controller 
attachment.
-template <typename Params, typename Closure>
-inline Status request_embed_attachment_contain_tuple(Params* brpc_request, 
Closure* closure) {
-    auto row_batch = brpc_request->row_batch();
-    Status st = request_embed_attachment(brpc_request, row_batch.tuple_data(), 
closure);
-    row_batch.set_tuple_data("");
-    return st;
-}
-
 template <typename Params, typename Closure>
 inline Status request_embed_attachment(Params* brpc_request, const 
std::string& data,
                                        Closure* closure) {
@@ -181,16 +172,6 @@ inline Status request_embed_attachment(Params* 
brpc_request, const std::string&
     return Status::OK();
 }
 
-// Extract the brpc request and tuple data from the controller attachment,
-// and put the tuple data into the request.
-template <typename Params>
-inline Status attachment_extract_request_contain_tuple(const Params* 
brpc_request,
-                                                       brpc::Controller* cntl) 
{
-    Params* req = const_cast<Params*>(brpc_request);
-    auto rb = req->mutable_row_batch();
-    return attachment_extract_request(req, cntl, rb->mutable_tuple_data());
-}
-
 // Extract the brpc request and block from the controller attachment,
 // and put the block into the request.
 template <typename Params>
diff --git a/be/src/vec/common/sort/sorter.cpp 
b/be/src/vec/common/sort/sorter.cpp
index 3fa7b9e06f..aaf91d4428 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -225,7 +225,7 @@ Status MergeSorterState::_create_intermediate_merger(int 
num_blocks,
                 stream_id, spilled_block_reader, block_spill_profile_));
         
child_block_suppliers.emplace_back(std::bind(std::mem_fn(&BlockSpillReader::read),
                                                      
spilled_block_reader.get(),
-                                                     std::placeholders::_1));
+                                                     std::placeholders::_1, 
std::placeholders::_2));
         spilled_block_readers_.emplace_back(std::move(spilled_block_reader));
 
         spilled_sorted_block_streams_.pop_front();
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index d5032cd568..cfde22603a 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -577,6 +577,7 @@ struct IteratorRowRef {
 };
 
 using BlockView = std::vector<IteratorRowRef>;
+using BlockUPtr = std::unique_ptr<Block>;
 
 } // namespace vectorized
 } // namespace doris
diff --git a/be/src/vec/core/block_spill_reader.cpp 
b/be/src/vec/core/block_spill_reader.cpp
index 7f8fb69d31..945d14f9ff 100644
--- a/be/src/vec/core/block_spill_reader.cpp
+++ b/be/src/vec/core/block_spill_reader.cpp
@@ -79,13 +79,12 @@ Status BlockSpillReader::open() {
 
 // The returned block is owned by BlockSpillReader and is
 // destroyed when reading next block.
-Status BlockSpillReader::read(Block** block) {
+Status BlockSpillReader::read(Block* block, bool* eos) {
     DCHECK(file_reader_);
-
-    current_block_.reset();
-    *block = nullptr;
+    block->clear();
 
     if (read_block_index_ >= block_count_) {
+        *eos = true;
         return Status::OK();
     }
 
@@ -103,17 +102,15 @@ Status BlockSpillReader::read(Block** block) {
     DCHECK(bytes_read == bytes_to_read);
 
     PBlock pb_block;
-    Block* new_block = nullptr;
+    BlockUPtr new_block = nullptr;
     {
         SCOPED_TIMER(deserialize_time_);
         if (!pb_block.ParseFromArray(result.data, result.size)) {
             return Status::InternalError("Failed to read spilled block");
         }
-        new_block = new Block(pb_block);
+        new_block.reset(new Block(pb_block));
     }
-
-    current_block_.reset(new_block);
-    *block = new_block;
+    block->swap(*new_block);
 
     ++read_block_index_;
 
diff --git a/be/src/vec/core/block_spill_reader.h 
b/be/src/vec/core/block_spill_reader.h
index a165d6aae6..48e9434c8f 100644
--- a/be/src/vec/core/block_spill_reader.h
+++ b/be/src/vec/core/block_spill_reader.h
@@ -40,7 +40,7 @@ public:
 
     Status close();
 
-    Status read(Block** block);
+    Status read(Block* block, bool* eos);
 
     int64_t get_id() const { return stream_id_; }
 
@@ -59,7 +59,6 @@ private:
     size_t max_sub_block_size_ = 0;
     std::unique_ptr<char[]> read_buff_;
     std::vector<size_t> block_start_offsets_;
-    std::unique_ptr<Block> current_block_;
 
     RuntimeProfile* profile_ = nullptr;
     RuntimeProfile::Counter* read_time_;
diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index 74a9762673..01834f8345 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -167,19 +167,6 @@ struct MergeSortCursorImpl {
 
     MergeSortCursorImpl(const SortDescription& desc_)
             : desc(desc_), sort_columns_size(desc.size()) {}
-
-    MergeSortCursorImpl(const Columns& columns, const SortDescription& desc_)
-            : desc(desc_), sort_columns_size(desc.size()) {
-        for (auto& column_desc : desc) {
-            if (!column_desc.column_name.empty()) {
-                LOG(FATAL)
-                        << "SortDesctiption should contain column position if 
MergeSortCursor was "
-                           "used without header.";
-            }
-        }
-        reset(columns, {});
-    }
-
     bool empty() const { return rows == 0; }
 
     /// Set the cursor to the beginning of the new block.
@@ -216,7 +203,7 @@ struct MergeSortCursorImpl {
     virtual Block* block_ptr() { return nullptr; }
 };
 
-using BlockSupplier = std::function<Status(Block**)>;
+using BlockSupplier = std::function<Status(Block*, bool* eos)>;
 
 struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl {
     BlockSupplierSortCursorImpl(const BlockSupplier& block_supplier,
@@ -240,21 +227,29 @@ struct BlockSupplierSortCursorImpl : public 
MergeSortCursorImpl {
     }
 
     bool has_next_block() override {
-        auto status = _block_supplier(&_block_ptr);
-        if (status.ok() && _block_ptr != nullptr) {
+        _block.clear();
+        auto status = _block_supplier(&_block, &_is_eof);
+        // If status not ok, upper callers could not detect whether it is eof 
or error.
+        // So that fatal here, and should throw exception in the future.
+        if (status.ok() && !_is_eof) {
             if (_ordering_expr.size() > 0) {
                 for (int i = 0; status.ok() && i < desc.size(); ++i) {
-                    status = _ordering_expr[i]->execute(_block_ptr, 
&desc[i].column_number);
+                    // TODO yiguolei: throw exception if status not ok in the 
future
+                    status = _ordering_expr[i]->execute(&_block, 
&desc[i].column_number);
                 }
             }
-            MergeSortCursorImpl::reset(*_block_ptr);
+            MergeSortCursorImpl::reset(_block);
             return status.ok();
         }
-        _block_ptr = nullptr;
         return false;
     }
 
-    Block* block_ptr() override { return _block_ptr; }
+    Block* block_ptr() override {
+        if (_is_eof) {
+            return nullptr;
+        }
+        return &_block;
+    }
 
     size_t columns_num() const { return all_columns.size(); }
 
@@ -264,11 +259,11 @@ struct BlockSupplierSortCursorImpl : public 
MergeSortCursorImpl {
         for (size_t i = 0; i < num_columns; ++i) {
             columns[i] = all_columns[i]->clone_empty();
         }
-        return _block_ptr->clone_with_columns(std::move(columns));
+        return _block.clone_with_columns(std::move(columns));
     }
 
     std::vector<VExprContext*> _ordering_expr;
-    Block* _block_ptr = nullptr;
+    Block _block;
     BlockSupplier _block_supplier {};
     bool _is_eof = false;
 };
diff --git a/be/src/vec/exec/vexchange_node.cpp 
b/be/src/vec/exec/vexchange_node.cpp
index d31da2418c..c3732a6d9c 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -125,7 +125,7 @@ Status VExchangeNode::get_next(RuntimeState* state, Block* 
block, bool* eos) {
         block->clear();
     }
     auto status = _stream_recvr->get_next(block, eos);
-    if (block != nullptr) {
+    if (!*eos) {
         if (!_is_merging) {
             if (_num_rows_skipped + block->rows() < _offset) {
                 _num_rows_skipped += block->rows();
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp 
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index f58e3d1652..a1e0ea4c32 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -36,7 +36,15 @@ VDataStreamRecvr::SenderQueue::SenderQueue(VDataStreamRecvr* 
parent_recvr, int n
           _num_remaining_senders(num_senders),
           _received_first_batch(false) {}
 
-VDataStreamRecvr::SenderQueue::~SenderQueue() = default;
+VDataStreamRecvr::SenderQueue::~SenderQueue() {
+    // Check pending closures, if it is not empty, should clear it here. but 
it should not happen.
+    // closure will delete itself during run method. If it is not called, brpc 
will memory leak.
+    DCHECK(_pending_closures.empty());
+    for (auto closure_pair : _pending_closures) {
+        closure_pair.first->Run();
+    }
+    _pending_closures.clear();
+}
 
 bool VDataStreamRecvr::SenderQueue::should_wait() {
     DCHECK(false) << "VDataStreamRecvr::SenderQueue::should_wait execute";
@@ -44,7 +52,7 @@ bool VDataStreamRecvr::SenderQueue::should_wait() {
     return !_is_cancelled && _block_queue.empty() && _num_remaining_senders > 
0;
 }
 
-Status VDataStreamRecvr::SenderQueue::get_batch(Block** next_block) {
+Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) {
     std::unique_lock<std::mutex> l(_lock);
     // wait until something shows up or we know we're done
     while (!_is_cancelled && _block_queue.empty() && _num_remaining_senders > 
0) {
@@ -57,35 +65,30 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block** 
next_block) {
                 &_is_cancelled);
         _data_arrival_cv.wait(l);
     }
-    return _inner_get_batch(next_block);
+    return _inner_get_batch(block, eos);
 }
 
-Status VDataStreamRecvr::SenderQueue::_inner_get_batch(Block** next_block) {
-    // _cur_batch must be replaced with the returned batch.
-    _current_block.reset();
-    *next_block = nullptr;
+Status VDataStreamRecvr::SenderQueue::_inner_get_batch(Block* block, bool* 
eos) {
     if (_is_cancelled) {
         return Status::Cancelled("Cancelled");
     }
 
     if (_block_queue.empty()) {
         DCHECK_EQ(_num_remaining_senders, 0);
+        *eos = true;
         return Status::OK();
     }
 
     _received_first_batch = true;
 
     DCHECK(!_block_queue.empty());
-    Block* result = _block_queue.front().second;
-    _recvr->_num_buffered_bytes -= _block_queue.front().first;
-    _recvr->_blocks_memory_usage->add(-_block_queue.front().first);
-    VLOG_ROW << "fetched #rows=" << result->rows();
+    BlockUPtr next_block = std::move(_block_queue.front());
+    auto block_byte_size = block->allocated_bytes();
+    _recvr->_num_buffered_bytes -= block_byte_size;
+    _recvr->_blocks_memory_usage->add(-block_byte_size);
     _block_queue.pop_front();
     _update_block_queue_empty();
 
-    _current_block.reset(result);
-    *next_block = _current_block.get();
-
     if (!_pending_closures.empty()) {
         auto closure_pair = _pending_closures.front();
         closure_pair.first->Run();
@@ -94,7 +97,8 @@ Status 
VDataStreamRecvr::SenderQueue::_inner_get_batch(Block** next_block) {
         closure_pair.second.stop();
         
_recvr->_buffer_full_total_timer->update(closure_pair.second.elapsed_time());
     }
-
+    block->swap(*next_block);
+    *eos = false;
     return Status::OK();
 }
 
@@ -134,11 +138,11 @@ void VDataStreamRecvr::SenderQueue::add_block(const 
PBlock& pblock, int be_numbe
         }
     }
 
-    Block* block = nullptr;
+    BlockUPtr block = nullptr;
     int64_t deserialize_time = 0;
     {
         SCOPED_RAW_TIMER(&deserialize_time);
-        block = new Block(pblock);
+        block.reset(new Block(pblock));
     }
 
     auto block_byte_size = block->allocated_bytes();
@@ -154,7 +158,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock& 
pblock, int be_numbe
     COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
     _recvr->_blocks_memory_usage->add(block_byte_size);
 
-    _block_queue.emplace_back(block_byte_size, block);
+    _block_queue.emplace_back(std::move(block));
     _update_block_queue_empty();
     // if done is nullptr, this function can't delay this response
     if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) {
@@ -180,7 +184,8 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, 
bool use_move) {
     }
 
     auto block_bytes_received = block->bytes();
-    Block* nblock = new Block(block->get_columns_with_type_and_name());
+    // Has to use unique ptr here, because clone column may failed if allocate 
memory failed.
+    BlockUPtr nblock = 
std::make_unique<Block>(block->get_columns_with_type_and_name());
 
     // local exchange should copy the block contented if use move == false
     if (use_move) {
@@ -203,7 +208,7 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block, 
bool use_move) {
     COUNTER_UPDATE(_recvr->_local_bytes_received_counter, 
block_bytes_received);
     _recvr->_blocks_memory_usage->add(nblock->allocated_bytes());
 
-    _block_queue.emplace_back(block_size, nblock);
+    _block_queue.emplace_back(std::move(nblock));
     _update_block_queue_empty();
     _data_arrival_cv.notify_one();
 
@@ -281,11 +286,7 @@ void VDataStreamRecvr::SenderQueue::close() {
     }
 
     // Delete any batches queued in _block_queue
-    for (auto it = _block_queue.begin(); it != _block_queue.end(); ++it) {
-        delete it->second;
-    }
-
-    _current_block.reset();
+    _block_queue.clear();
 }
 
 VDataStreamRecvr::VDataStreamRecvr(
@@ -359,7 +360,8 @@ Status VDataStreamRecvr::create_merger(const 
std::vector<VExprContext*>& orderin
 
     for (int i = 0; i < _sender_queues.size(); ++i) {
         
child_block_suppliers.emplace_back(std::bind(std::mem_fn(&SenderQueue::get_batch),
-                                                     _sender_queues[i], 
std::placeholders::_1));
+                                                     _sender_queues[i], 
std::placeholders::_1,
+                                                     std::placeholders::_2));
     }
     RETURN_IF_ERROR(_merger->prepare(child_block_suppliers));
     return Status::OK();
@@ -388,19 +390,11 @@ bool VDataStreamRecvr::ready_to_read() {
 
 Status VDataStreamRecvr::get_next(Block* block, bool* eos) {
     if (!_is_merging) {
-        Block* res = nullptr;
-        RETURN_IF_ERROR(_sender_queues[0]->get_batch(&res));
-        if (res != nullptr) {
-            block->swap(*res);
-        } else {
-            *eos = true;
-            return Status::OK();
-        }
+        block->clear();
+        return _sender_queues[0]->get_batch(block, eos);
     } else {
-        RETURN_IF_ERROR(_merger->get_next(block, eos));
+        return _merger->get_next(block, eos);
     }
-
-    return Status::OK();
 }
 
 void VDataStreamRecvr::remove_sender(int sender_id, int be_number) {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h 
b/be/src/vec/runtime/vdata_stream_recvr.h
index 89af3c74ee..523d364fdf 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -121,6 +121,7 @@ private:
 
     std::atomic<int> _num_buffered_bytes;
     std::unique_ptr<MemTracker> _mem_tracker;
+    // Managed by object pool
     std::vector<SenderQueue*> _sender_queues;
 
     std::unique_ptr<VSortedRunMerger> _merger;
@@ -160,7 +161,7 @@ public:
 
     virtual bool should_wait();
 
-    virtual Status get_batch(Block** next_block);
+    virtual Status get_batch(Block* next_block, bool* eos);
 
     void add_block(const PBlock& pblock, int be_number, int64_t packet_seq,
                    ::google::protobuf::Closure** done);
@@ -173,26 +174,20 @@ public:
 
     void close();
 
-    Block* current_block() const { return _current_block.get(); }
-
 protected:
     virtual void _update_block_queue_empty() {}
-    Status _inner_get_batch(Block** next_block);
+    Status _inner_get_batch(Block* block, bool* eos);
 
+    // Not managed by this class
     VDataStreamRecvr* _recvr;
     std::mutex _lock;
     std::atomic_bool _is_cancelled;
     std::atomic_int _num_remaining_senders;
     std::condition_variable _data_arrival_cv;
     std::condition_variable _data_removal_cv;
-
-    using VecBlockQueue = std::list<std::pair<int, Block*>>;
-    VecBlockQueue _block_queue;
-
+    std::list<BlockUPtr> _block_queue;
     std::atomic_bool _block_queue_empty = true;
 
-    std::unique_ptr<Block> _current_block;
-
     bool _received_first_batch;
     // sender_id
     std::unordered_set<int> _sender_eos_set;
@@ -213,12 +208,12 @@ public:
 
     void _update_block_queue_empty() override { _block_queue_empty = 
_block_queue.empty(); }
 
-    Status get_batch(Block** next_block) override {
+    Status get_batch(Block* block, bool* eos) override {
         CHECK(!should_wait()) << " _is_cancelled: " << _is_cancelled
                               << ", _block_queue_empty: " << _block_queue_empty
                               << ", _num_remaining_senders: " << 
_num_remaining_senders;
         std::lock_guard<std::mutex> l(_lock); // protect _block_queue
-        return _inner_get_batch(next_block);
+        return _inner_get_batch(block, eos);
     }
 
     void add_block(Block* block, bool use_move) override {
@@ -229,7 +224,7 @@ public:
         if (_is_cancelled || !block->rows()) {
             return;
         }
-        Block* nblock = new Block(block->get_columns_with_type_and_name());
+        BlockUPtr nblock = 
std::make_unique<Block>(block->get_columns_with_type_and_name());
 
         // local exchange should copy the block contented if use move == false
         if (use_move) {
@@ -247,7 +242,7 @@ public:
         auto block_mem_size = nblock->allocated_bytes();
         {
             std::unique_lock<std::mutex> l(_lock);
-            _block_queue.emplace_back(block_size, nblock);
+            _block_queue.emplace_back(std::move(nblock));
         }
         COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_size);
         _recvr->_blocks_memory_usage->add(block_mem_size);
diff --git a/be/test/vec/core/block_spill_test.cpp 
b/be/test/vec/core/block_spill_test.cpp
index ad6481cec9..9cefafe934 100644
--- a/be/test/vec/core/block_spill_test.cpp
+++ b/be/test/vec/core/block_spill_test.cpp
@@ -111,23 +111,24 @@ TEST_F(TestBlockSpill, TestInt) {
     vectorized::BlockSpillReaderUPtr spill_block_reader;
     block_spill_manager->get_reader(spill_block_writer->get_id(), 
spill_block_reader, profile_);
 
-    vectorized::Block* block_read;
+    vectorized::Block block_read;
+    bool eos = false;
 
     for (int i = 0; i < batch_num; ++i) {
-        spill_block_reader->read(&block_read);
-        EXPECT_EQ(block_read->rows(), batch_size);
-        auto column = block_read->get_by_position(0).column;
+        spill_block_reader->read(&block_read, &eos);
+        EXPECT_EQ(block_read.rows(), batch_size);
+        auto column = block_read.get_by_position(0).column;
         auto* real_column = (vectorized::ColumnVector<int>*)column.get();
         for (size_t j = 0; j < batch_size; ++j) {
             EXPECT_EQ(real_column->get_int(j), j + i * batch_size);
         }
     }
 
-    spill_block_reader->read(&block_read);
+    spill_block_reader->read(&block_read, &eos);
     spill_block_reader->close();
 
-    EXPECT_EQ(block_read->rows(), 1);
-    auto column = block_read->get_by_position(0).column;
+    EXPECT_EQ(block_read.rows(), 1);
+    auto column = block_read.get_by_position(0).column;
     auto* real_column = (vectorized::ColumnVector<int>*)column.get();
     EXPECT_EQ(real_column->get_int(0), 0);
 }
@@ -159,13 +160,14 @@ TEST_F(TestBlockSpill, TestIntNullable) {
     vectorized::BlockSpillReaderUPtr spill_block_reader;
     block_spill_manager->get_reader(spill_block_writer->get_id(), 
spill_block_reader, profile_);
 
-    vectorized::Block* block_read;
+    vectorized::Block block_read;
+    bool eos = false;
 
     for (int i = 0; i < batch_num; ++i) {
-        spill_block_reader->read(&block_read);
+        spill_block_reader->read(&block_read, &eos);
 
-        EXPECT_EQ(block_read->rows(), batch_size);
-        auto column = block_read->get_by_position(0).column;
+        EXPECT_EQ(block_read.rows(), batch_size);
+        auto column = block_read.get_by_position(0).column;
         auto* real_column = (vectorized::ColumnNullable*)column.get();
         const auto& int_column =
                 (const 
vectorized::ColumnVector<int>&)(real_column->get_nested_column());
@@ -178,11 +180,11 @@ TEST_F(TestBlockSpill, TestIntNullable) {
         }
     }
 
-    spill_block_reader->read(&block_read);
+    spill_block_reader->read(&block_read, &eos);
     spill_block_reader->close();
 
-    EXPECT_EQ(block_read->rows(), 1);
-    auto column = block_read->get_by_position(0).column;
+    EXPECT_EQ(block_read.rows(), 1);
+    auto column = block_read.get_by_position(0).column;
     auto* real_column = (vectorized::ColumnNullable*)column.get();
     const auto& int_column =
             (const 
vectorized::ColumnVector<int>&)(real_column->get_nested_column());
@@ -211,25 +213,26 @@ TEST_F(TestBlockSpill, TestString) {
     vectorized::BlockSpillReaderUPtr spill_block_reader;
     block_spill_manager->get_reader(spill_block_writer->get_id(), 
spill_block_reader, profile_);
 
-    vectorized::Block* block_read;
+    vectorized::Block block_read;
+    bool eos = false;
 
     for (int i = 0; i < batch_num; ++i) {
-        st = spill_block_reader->read(&block_read);
+        st = spill_block_reader->read(&block_read, &eos);
         EXPECT_TRUE(st.ok());
 
-        EXPECT_EQ(block_read->rows(), batch_size);
-        auto column = block_read->get_by_position(0).column;
+        EXPECT_EQ(block_read.rows(), batch_size);
+        auto column = block_read.get_by_position(0).column;
         auto* real_column = (vectorized::ColumnString*)column.get();
         for (size_t j = 0; j < batch_size; ++j) {
             EXPECT_EQ(real_column->get_data_at(j), StringRef(std::to_string(j 
+ i * batch_size)));
         }
     }
 
-    spill_block_reader->read(&block_read);
+    spill_block_reader->read(&block_read, &eos);
     spill_block_reader->close();
 
-    EXPECT_EQ(block_read->rows(), 1);
-    auto column = block_read->get_by_position(0).column;
+    EXPECT_EQ(block_read.rows(), 1);
+    auto column = block_read.get_by_position(0).column;
     auto* real_column = (vectorized::ColumnString*)column.get();
     EXPECT_EQ(real_column->get_data_at(0), StringRef(std::to_string(batch_size 
* 3)));
 }
@@ -262,14 +265,15 @@ TEST_F(TestBlockSpill, TestStringNullable) {
     vectorized::BlockSpillReaderUPtr spill_block_reader;
     block_spill_manager->get_reader(spill_block_writer->get_id(), 
spill_block_reader, profile_);
 
-    vectorized::Block* block_read;
+    vectorized::Block block_read;
+    bool eos = false;
 
     for (int i = 0; i < batch_num; ++i) {
-        st = spill_block_reader->read(&block_read);
+        st = spill_block_reader->read(&block_read, &eos);
         EXPECT_TRUE(st.ok());
 
-        EXPECT_EQ(block_read->rows(), batch_size);
-        auto column = block_read->get_by_position(0).column;
+        EXPECT_EQ(block_read.rows(), batch_size);
+        auto column = block_read.get_by_position(0).column;
         auto* real_column = (vectorized::ColumnNullable*)column.get();
         const auto& string_column =
                 (const 
vectorized::ColumnString&)(real_column->get_nested_column());
@@ -283,12 +287,12 @@ TEST_F(TestBlockSpill, TestStringNullable) {
         }
     }
 
-    st = spill_block_reader->read(&block_read);
+    st = spill_block_reader->read(&block_read, &eos);
     spill_block_reader->close();
     EXPECT_TRUE(st.ok());
 
-    EXPECT_EQ(block_read->rows(), 1);
-    auto column = block_read->get_by_position(0).column;
+    EXPECT_EQ(block_read.rows(), 1);
+    auto column = block_read.get_by_position(0).column;
     auto* real_column = (vectorized::ColumnNullable*)column.get();
     const auto& string_column = (const 
vectorized::ColumnString&)(real_column->get_nested_column());
     EXPECT_EQ(string_column.get_data_at(0), 
StringRef(std::to_string(batch_size * 3)));
@@ -320,14 +324,15 @@ TEST_F(TestBlockSpill, TestDecimal) {
     vectorized::BlockSpillReaderUPtr spill_block_reader;
     block_spill_manager->get_reader(spill_block_writer->get_id(), 
spill_block_reader, profile_);
 
-    vectorized::Block* block_read;
+    vectorized::Block block_read;
+    bool eos = false;
 
     for (int i = 0; i < batch_num; ++i) {
-        st = spill_block_reader->read(&block_read);
+        st = spill_block_reader->read(&block_read, &eos);
         EXPECT_TRUE(st.ok());
 
-        EXPECT_EQ(block_read->rows(), batch_size);
-        auto column = block_read->get_by_position(0).column;
+        EXPECT_EQ(block_read.rows(), batch_size);
+        auto column = block_read.get_by_position(0).column;
         auto* real_column =
                 
(vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int128>>*)column.get();
         for (size_t j = 0; j < batch_size; ++j) {
@@ -336,12 +341,12 @@ TEST_F(TestBlockSpill, TestDecimal) {
         }
     }
 
-    st = spill_block_reader->read(&block_read);
+    st = spill_block_reader->read(&block_read, &eos);
     spill_block_reader->close();
     EXPECT_TRUE(st.ok());
 
-    EXPECT_EQ(block_read->rows(), 1);
-    auto column = block_read->get_by_position(0).column;
+    EXPECT_EQ(block_read.rows(), 1);
+    auto column = block_read.get_by_position(0).column;
     auto* real_column =
             
(vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int128>>*)column.get();
     EXPECT_EQ(real_column->get_element(0), batch_size * 3 * (pow(10, 9) + 
pow(10, 8)));
@@ -376,14 +381,15 @@ TEST_F(TestBlockSpill, TestDecimalNullable) {
     vectorized::BlockSpillReaderUPtr spill_block_reader;
     block_spill_manager->get_reader(spill_block_writer->get_id(), 
spill_block_reader, profile_);
 
-    vectorized::Block* block_read;
+    vectorized::Block block_read;
+    bool eos = false;
 
     for (int i = 0; i < batch_num; ++i) {
-        st = spill_block_reader->read(&block_read);
+        st = spill_block_reader->read(&block_read, &eos);
         EXPECT_TRUE(st.ok());
 
-        EXPECT_EQ(block_read->rows(), batch_size);
-        auto column = block_read->get_by_position(0).column;
+        EXPECT_EQ(block_read.rows(), batch_size);
+        auto column = block_read.get_by_position(0).column;
         auto* real_column = (vectorized::ColumnNullable*)column.get();
         const auto& decimal_col = 
(vectorized::ColumnDecimal<vectorized::Decimal<
                                            
vectorized::Int128>>&)(real_column->get_nested_column());
@@ -397,12 +403,12 @@ TEST_F(TestBlockSpill, TestDecimalNullable) {
         }
     }
 
-    st = spill_block_reader->read(&block_read);
+    st = spill_block_reader->read(&block_read, &eos);
     spill_block_reader->close();
     EXPECT_TRUE(st.ok());
 
-    EXPECT_EQ(block_read->rows(), 1);
-    auto column = block_read->get_by_position(0).column;
+    EXPECT_EQ(block_read.rows(), 1);
+    auto column = block_read.get_by_position(0).column;
     auto* real_column = (vectorized::ColumnNullable*)column.get();
     const auto& decimal_col =
             (vectorized::ColumnDecimal<
@@ -441,14 +447,15 @@ TEST_F(TestBlockSpill, TestBitmap) {
     vectorized::BlockSpillReaderUPtr spill_block_reader;
     block_spill_manager->get_reader(spill_block_writer->get_id(), 
spill_block_reader, profile_);
 
-    vectorized::Block* block_read;
+    vectorized::Block block_read;
+    bool eos = false;
 
     for (int i = 0; i < batch_num; ++i) {
-        st = spill_block_reader->read(&block_read);
+        st = spill_block_reader->read(&block_read, &eos);
         EXPECT_TRUE(st.ok());
 
-        EXPECT_EQ(block_read->rows(), batch_size);
-        auto column = block_read->get_by_position(0).column;
+        EXPECT_EQ(block_read.rows(), batch_size);
+        auto column = block_read.get_by_position(0).column;
         auto* real_column = 
(vectorized::ColumnComplexType<BitmapValue>*)column.get();
         for (size_t j = 0; j < batch_size; ++j) {
             auto bitmap_str = 
convert_bitmap_to_string(real_column->get_element(j));
@@ -456,12 +463,12 @@ TEST_F(TestBlockSpill, TestBitmap) {
         }
     }
 
-    st = spill_block_reader->read(&block_read);
+    st = spill_block_reader->read(&block_read, &eos);
     spill_block_reader->close();
     EXPECT_TRUE(st.ok());
 
-    EXPECT_EQ(block_read->rows(), 1);
-    auto column = block_read->get_by_position(0).column;
+    EXPECT_EQ(block_read.rows(), 1);
+    auto column = block_read.get_by_position(0).column;
     auto* real_column = 
(vectorized::ColumnComplexType<BitmapValue>*)column.get();
     auto bitmap_str = convert_bitmap_to_string(real_column->get_element(0));
     EXPECT_EQ(bitmap_str, expected_bitmap_str[3 * batch_size]);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to