This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d390e63a03 [enhancement](stream receiver) make stream receiver
exception safe (#16412)
d390e63a03 is described below
commit d390e63a031d8aa79ee3a2593cfc3e7f51a48e33
Author: yiguolei <[email protected]>
AuthorDate: Tue Feb 7 12:44:20 2023 +0800
[enhancement](stream receiver) make stream receiver exception safe (#16412)
make stream receiver exception safe
change get_block(block**) to get_block(block* , bool* eos) unify stream
semantic
---
be/src/util/proto_util.h | 19 ------
be/src/vec/common/sort/sorter.cpp | 2 +-
be/src/vec/core/block.h | 1 +
be/src/vec/core/block_spill_reader.cpp | 15 ++---
be/src/vec/core/block_spill_reader.h | 3 +-
be/src/vec/core/sort_cursor.h | 39 +++++------
be/src/vec/exec/vexchange_node.cpp | 2 +-
be/src/vec/runtime/vdata_stream_recvr.cpp | 68 +++++++++----------
be/src/vec/runtime/vdata_stream_recvr.h | 23 +++----
be/test/vec/core/block_spill_test.cpp | 105 ++++++++++++++++--------------
10 files changed, 123 insertions(+), 154 deletions(-)
diff --git a/be/src/util/proto_util.h b/be/src/util/proto_util.h
index 5828b8eeec..28f5955b15 100644
--- a/be/src/util/proto_util.h
+++ b/be/src/util/proto_util.h
@@ -142,15 +142,6 @@ inline void attachment_transfer_request_block(const
Params* brpc_request, brpc::
}
}
-// Embed tuple_data and brpc request serialization string in controller
attachment.
-template <typename Params, typename Closure>
-inline Status request_embed_attachment_contain_tuple(Params* brpc_request,
Closure* closure) {
- auto row_batch = brpc_request->row_batch();
- Status st = request_embed_attachment(brpc_request, row_batch.tuple_data(),
closure);
- row_batch.set_tuple_data("");
- return st;
-}
-
template <typename Params, typename Closure>
inline Status request_embed_attachment(Params* brpc_request, const
std::string& data,
Closure* closure) {
@@ -181,16 +172,6 @@ inline Status request_embed_attachment(Params*
brpc_request, const std::string&
return Status::OK();
}
-// Extract the brpc request and tuple data from the controller attachment,
-// and put the tuple data into the request.
-template <typename Params>
-inline Status attachment_extract_request_contain_tuple(const Params*
brpc_request,
- brpc::Controller* cntl)
{
- Params* req = const_cast<Params*>(brpc_request);
- auto rb = req->mutable_row_batch();
- return attachment_extract_request(req, cntl, rb->mutable_tuple_data());
-}
-
// Extract the brpc request and block from the controller attachment,
// and put the block into the request.
template <typename Params>
diff --git a/be/src/vec/common/sort/sorter.cpp
b/be/src/vec/common/sort/sorter.cpp
index 3fa7b9e06f..aaf91d4428 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -225,7 +225,7 @@ Status MergeSorterState::_create_intermediate_merger(int
num_blocks,
stream_id, spilled_block_reader, block_spill_profile_));
child_block_suppliers.emplace_back(std::bind(std::mem_fn(&BlockSpillReader::read),
spilled_block_reader.get(),
- std::placeholders::_1));
+ std::placeholders::_1,
std::placeholders::_2));
spilled_block_readers_.emplace_back(std::move(spilled_block_reader));
spilled_sorted_block_streams_.pop_front();
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index d5032cd568..cfde22603a 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -577,6 +577,7 @@ struct IteratorRowRef {
};
using BlockView = std::vector<IteratorRowRef>;
+using BlockUPtr = std::unique_ptr<Block>;
} // namespace vectorized
} // namespace doris
diff --git a/be/src/vec/core/block_spill_reader.cpp
b/be/src/vec/core/block_spill_reader.cpp
index 7f8fb69d31..945d14f9ff 100644
--- a/be/src/vec/core/block_spill_reader.cpp
+++ b/be/src/vec/core/block_spill_reader.cpp
@@ -79,13 +79,12 @@ Status BlockSpillReader::open() {
// The returned block is owned by BlockSpillReader and is
// destroyed when reading next block.
-Status BlockSpillReader::read(Block** block) {
+Status BlockSpillReader::read(Block* block, bool* eos) {
DCHECK(file_reader_);
-
- current_block_.reset();
- *block = nullptr;
+ block->clear();
if (read_block_index_ >= block_count_) {
+ *eos = true;
return Status::OK();
}
@@ -103,17 +102,15 @@ Status BlockSpillReader::read(Block** block) {
DCHECK(bytes_read == bytes_to_read);
PBlock pb_block;
- Block* new_block = nullptr;
+ BlockUPtr new_block = nullptr;
{
SCOPED_TIMER(deserialize_time_);
if (!pb_block.ParseFromArray(result.data, result.size)) {
return Status::InternalError("Failed to read spilled block");
}
- new_block = new Block(pb_block);
+ new_block.reset(new Block(pb_block));
}
-
- current_block_.reset(new_block);
- *block = new_block;
+ block->swap(*new_block);
++read_block_index_;
diff --git a/be/src/vec/core/block_spill_reader.h
b/be/src/vec/core/block_spill_reader.h
index a165d6aae6..48e9434c8f 100644
--- a/be/src/vec/core/block_spill_reader.h
+++ b/be/src/vec/core/block_spill_reader.h
@@ -40,7 +40,7 @@ public:
Status close();
- Status read(Block** block);
+ Status read(Block* block, bool* eos);
int64_t get_id() const { return stream_id_; }
@@ -59,7 +59,6 @@ private:
size_t max_sub_block_size_ = 0;
std::unique_ptr<char[]> read_buff_;
std::vector<size_t> block_start_offsets_;
- std::unique_ptr<Block> current_block_;
RuntimeProfile* profile_ = nullptr;
RuntimeProfile::Counter* read_time_;
diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index 74a9762673..01834f8345 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -167,19 +167,6 @@ struct MergeSortCursorImpl {
MergeSortCursorImpl(const SortDescription& desc_)
: desc(desc_), sort_columns_size(desc.size()) {}
-
- MergeSortCursorImpl(const Columns& columns, const SortDescription& desc_)
- : desc(desc_), sort_columns_size(desc.size()) {
- for (auto& column_desc : desc) {
- if (!column_desc.column_name.empty()) {
- LOG(FATAL)
- << "SortDesctiption should contain column position if
MergeSortCursor was "
- "used without header.";
- }
- }
- reset(columns, {});
- }
-
bool empty() const { return rows == 0; }
/// Set the cursor to the beginning of the new block.
@@ -216,7 +203,7 @@ struct MergeSortCursorImpl {
virtual Block* block_ptr() { return nullptr; }
};
-using BlockSupplier = std::function<Status(Block**)>;
+using BlockSupplier = std::function<Status(Block*, bool* eos)>;
struct BlockSupplierSortCursorImpl : public MergeSortCursorImpl {
BlockSupplierSortCursorImpl(const BlockSupplier& block_supplier,
@@ -240,21 +227,29 @@ struct BlockSupplierSortCursorImpl : public
MergeSortCursorImpl {
}
bool has_next_block() override {
- auto status = _block_supplier(&_block_ptr);
- if (status.ok() && _block_ptr != nullptr) {
+ _block.clear();
+ auto status = _block_supplier(&_block, &_is_eof);
+ // If status not ok, upper callers could not detect whether it is eof
or error.
+ // So that fatal here, and should throw exception in the future.
+ if (status.ok() && !_is_eof) {
if (_ordering_expr.size() > 0) {
for (int i = 0; status.ok() && i < desc.size(); ++i) {
- status = _ordering_expr[i]->execute(_block_ptr,
&desc[i].column_number);
+ // TODO yiguolei: throw exception if status not ok in the
future
+ status = _ordering_expr[i]->execute(&_block,
&desc[i].column_number);
}
}
- MergeSortCursorImpl::reset(*_block_ptr);
+ MergeSortCursorImpl::reset(_block);
return status.ok();
}
- _block_ptr = nullptr;
return false;
}
- Block* block_ptr() override { return _block_ptr; }
+ Block* block_ptr() override {
+ if (_is_eof) {
+ return nullptr;
+ }
+ return &_block;
+ }
size_t columns_num() const { return all_columns.size(); }
@@ -264,11 +259,11 @@ struct BlockSupplierSortCursorImpl : public
MergeSortCursorImpl {
for (size_t i = 0; i < num_columns; ++i) {
columns[i] = all_columns[i]->clone_empty();
}
- return _block_ptr->clone_with_columns(std::move(columns));
+ return _block.clone_with_columns(std::move(columns));
}
std::vector<VExprContext*> _ordering_expr;
- Block* _block_ptr = nullptr;
+ Block _block;
BlockSupplier _block_supplier {};
bool _is_eof = false;
};
diff --git a/be/src/vec/exec/vexchange_node.cpp
b/be/src/vec/exec/vexchange_node.cpp
index d31da2418c..c3732a6d9c 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -125,7 +125,7 @@ Status VExchangeNode::get_next(RuntimeState* state, Block*
block, bool* eos) {
block->clear();
}
auto status = _stream_recvr->get_next(block, eos);
- if (block != nullptr) {
+ if (!*eos) {
if (!_is_merging) {
if (_num_rows_skipped + block->rows() < _offset) {
_num_rows_skipped += block->rows();
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp
b/be/src/vec/runtime/vdata_stream_recvr.cpp
index f58e3d1652..a1e0ea4c32 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -36,7 +36,15 @@ VDataStreamRecvr::SenderQueue::SenderQueue(VDataStreamRecvr*
parent_recvr, int n
_num_remaining_senders(num_senders),
_received_first_batch(false) {}
-VDataStreamRecvr::SenderQueue::~SenderQueue() = default;
+VDataStreamRecvr::SenderQueue::~SenderQueue() {
+ // Check pending closures, if it is not empty, should clear it here. but
it should not happen.
+ // closure will delete itself during run method. If it is not called, brpc
will memory leak.
+ DCHECK(_pending_closures.empty());
+ for (auto closure_pair : _pending_closures) {
+ closure_pair.first->Run();
+ }
+ _pending_closures.clear();
+}
bool VDataStreamRecvr::SenderQueue::should_wait() {
DCHECK(false) << "VDataStreamRecvr::SenderQueue::should_wait execute";
@@ -44,7 +52,7 @@ bool VDataStreamRecvr::SenderQueue::should_wait() {
return !_is_cancelled && _block_queue.empty() && _num_remaining_senders >
0;
}
-Status VDataStreamRecvr::SenderQueue::get_batch(Block** next_block) {
+Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) {
std::unique_lock<std::mutex> l(_lock);
// wait until something shows up or we know we're done
while (!_is_cancelled && _block_queue.empty() && _num_remaining_senders >
0) {
@@ -57,35 +65,30 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block**
next_block) {
&_is_cancelled);
_data_arrival_cv.wait(l);
}
- return _inner_get_batch(next_block);
+ return _inner_get_batch(block, eos);
}
-Status VDataStreamRecvr::SenderQueue::_inner_get_batch(Block** next_block) {
- // _cur_batch must be replaced with the returned batch.
- _current_block.reset();
- *next_block = nullptr;
+Status VDataStreamRecvr::SenderQueue::_inner_get_batch(Block* block, bool*
eos) {
if (_is_cancelled) {
return Status::Cancelled("Cancelled");
}
if (_block_queue.empty()) {
DCHECK_EQ(_num_remaining_senders, 0);
+ *eos = true;
return Status::OK();
}
_received_first_batch = true;
DCHECK(!_block_queue.empty());
- Block* result = _block_queue.front().second;
- _recvr->_num_buffered_bytes -= _block_queue.front().first;
- _recvr->_blocks_memory_usage->add(-_block_queue.front().first);
- VLOG_ROW << "fetched #rows=" << result->rows();
+ BlockUPtr next_block = std::move(_block_queue.front());
+ auto block_byte_size = block->allocated_bytes();
+ _recvr->_num_buffered_bytes -= block_byte_size;
+ _recvr->_blocks_memory_usage->add(-block_byte_size);
_block_queue.pop_front();
_update_block_queue_empty();
- _current_block.reset(result);
- *next_block = _current_block.get();
-
if (!_pending_closures.empty()) {
auto closure_pair = _pending_closures.front();
closure_pair.first->Run();
@@ -94,7 +97,8 @@ Status
VDataStreamRecvr::SenderQueue::_inner_get_batch(Block** next_block) {
closure_pair.second.stop();
_recvr->_buffer_full_total_timer->update(closure_pair.second.elapsed_time());
}
-
+ block->swap(*next_block);
+ *eos = false;
return Status::OK();
}
@@ -134,11 +138,11 @@ void VDataStreamRecvr::SenderQueue::add_block(const
PBlock& pblock, int be_numbe
}
}
- Block* block = nullptr;
+ BlockUPtr block = nullptr;
int64_t deserialize_time = 0;
{
SCOPED_RAW_TIMER(&deserialize_time);
- block = new Block(pblock);
+ block.reset(new Block(pblock));
}
auto block_byte_size = block->allocated_bytes();
@@ -154,7 +158,7 @@ void VDataStreamRecvr::SenderQueue::add_block(const PBlock&
pblock, int be_numbe
COUNTER_UPDATE(_recvr->_decompress_bytes, block->get_decompressed_bytes());
_recvr->_blocks_memory_usage->add(block_byte_size);
- _block_queue.emplace_back(block_byte_size, block);
+ _block_queue.emplace_back(std::move(block));
_update_block_queue_empty();
// if done is nullptr, this function can't delay this response
if (done != nullptr && _recvr->exceeds_limit(block_byte_size)) {
@@ -180,7 +184,8 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block,
bool use_move) {
}
auto block_bytes_received = block->bytes();
- Block* nblock = new Block(block->get_columns_with_type_and_name());
+ // Has to use unique ptr here, because clone column may failed if allocate
memory failed.
+ BlockUPtr nblock =
std::make_unique<Block>(block->get_columns_with_type_and_name());
// local exchange should copy the block contented if use move == false
if (use_move) {
@@ -203,7 +208,7 @@ void VDataStreamRecvr::SenderQueue::add_block(Block* block,
bool use_move) {
COUNTER_UPDATE(_recvr->_local_bytes_received_counter,
block_bytes_received);
_recvr->_blocks_memory_usage->add(nblock->allocated_bytes());
- _block_queue.emplace_back(block_size, nblock);
+ _block_queue.emplace_back(std::move(nblock));
_update_block_queue_empty();
_data_arrival_cv.notify_one();
@@ -281,11 +286,7 @@ void VDataStreamRecvr::SenderQueue::close() {
}
// Delete any batches queued in _block_queue
- for (auto it = _block_queue.begin(); it != _block_queue.end(); ++it) {
- delete it->second;
- }
-
- _current_block.reset();
+ _block_queue.clear();
}
VDataStreamRecvr::VDataStreamRecvr(
@@ -359,7 +360,8 @@ Status VDataStreamRecvr::create_merger(const
std::vector<VExprContext*>& orderin
for (int i = 0; i < _sender_queues.size(); ++i) {
child_block_suppliers.emplace_back(std::bind(std::mem_fn(&SenderQueue::get_batch),
- _sender_queues[i],
std::placeholders::_1));
+ _sender_queues[i],
std::placeholders::_1,
+ std::placeholders::_2));
}
RETURN_IF_ERROR(_merger->prepare(child_block_suppliers));
return Status::OK();
@@ -388,19 +390,11 @@ bool VDataStreamRecvr::ready_to_read() {
Status VDataStreamRecvr::get_next(Block* block, bool* eos) {
if (!_is_merging) {
- Block* res = nullptr;
- RETURN_IF_ERROR(_sender_queues[0]->get_batch(&res));
- if (res != nullptr) {
- block->swap(*res);
- } else {
- *eos = true;
- return Status::OK();
- }
+ block->clear();
+ return _sender_queues[0]->get_batch(block, eos);
} else {
- RETURN_IF_ERROR(_merger->get_next(block, eos));
+ return _merger->get_next(block, eos);
}
-
- return Status::OK();
}
void VDataStreamRecvr::remove_sender(int sender_id, int be_number) {
diff --git a/be/src/vec/runtime/vdata_stream_recvr.h
b/be/src/vec/runtime/vdata_stream_recvr.h
index 89af3c74ee..523d364fdf 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.h
+++ b/be/src/vec/runtime/vdata_stream_recvr.h
@@ -121,6 +121,7 @@ private:
std::atomic<int> _num_buffered_bytes;
std::unique_ptr<MemTracker> _mem_tracker;
+ // Managed by object pool
std::vector<SenderQueue*> _sender_queues;
std::unique_ptr<VSortedRunMerger> _merger;
@@ -160,7 +161,7 @@ public:
virtual bool should_wait();
- virtual Status get_batch(Block** next_block);
+ virtual Status get_batch(Block* next_block, bool* eos);
void add_block(const PBlock& pblock, int be_number, int64_t packet_seq,
::google::protobuf::Closure** done);
@@ -173,26 +174,20 @@ public:
void close();
- Block* current_block() const { return _current_block.get(); }
-
protected:
virtual void _update_block_queue_empty() {}
- Status _inner_get_batch(Block** next_block);
+ Status _inner_get_batch(Block* block, bool* eos);
+ // Not managed by this class
VDataStreamRecvr* _recvr;
std::mutex _lock;
std::atomic_bool _is_cancelled;
std::atomic_int _num_remaining_senders;
std::condition_variable _data_arrival_cv;
std::condition_variable _data_removal_cv;
-
- using VecBlockQueue = std::list<std::pair<int, Block*>>;
- VecBlockQueue _block_queue;
-
+ std::list<BlockUPtr> _block_queue;
std::atomic_bool _block_queue_empty = true;
- std::unique_ptr<Block> _current_block;
-
bool _received_first_batch;
// sender_id
std::unordered_set<int> _sender_eos_set;
@@ -213,12 +208,12 @@ public:
void _update_block_queue_empty() override { _block_queue_empty =
_block_queue.empty(); }
- Status get_batch(Block** next_block) override {
+ Status get_batch(Block* block, bool* eos) override {
CHECK(!should_wait()) << " _is_cancelled: " << _is_cancelled
<< ", _block_queue_empty: " << _block_queue_empty
<< ", _num_remaining_senders: " <<
_num_remaining_senders;
std::lock_guard<std::mutex> l(_lock); // protect _block_queue
- return _inner_get_batch(next_block);
+ return _inner_get_batch(block, eos);
}
void add_block(Block* block, bool use_move) override {
@@ -229,7 +224,7 @@ public:
if (_is_cancelled || !block->rows()) {
return;
}
- Block* nblock = new Block(block->get_columns_with_type_and_name());
+ BlockUPtr nblock =
std::make_unique<Block>(block->get_columns_with_type_and_name());
// local exchange should copy the block contented if use move == false
if (use_move) {
@@ -247,7 +242,7 @@ public:
auto block_mem_size = nblock->allocated_bytes();
{
std::unique_lock<std::mutex> l(_lock);
- _block_queue.emplace_back(block_size, nblock);
+ _block_queue.emplace_back(std::move(nblock));
}
COUNTER_UPDATE(_recvr->_local_bytes_received_counter, block_size);
_recvr->_blocks_memory_usage->add(block_mem_size);
diff --git a/be/test/vec/core/block_spill_test.cpp
b/be/test/vec/core/block_spill_test.cpp
index ad6481cec9..9cefafe934 100644
--- a/be/test/vec/core/block_spill_test.cpp
+++ b/be/test/vec/core/block_spill_test.cpp
@@ -111,23 +111,24 @@ TEST_F(TestBlockSpill, TestInt) {
vectorized::BlockSpillReaderUPtr spill_block_reader;
block_spill_manager->get_reader(spill_block_writer->get_id(),
spill_block_reader, profile_);
- vectorized::Block* block_read;
+ vectorized::Block block_read;
+ bool eos = false;
for (int i = 0; i < batch_num; ++i) {
- spill_block_reader->read(&block_read);
- EXPECT_EQ(block_read->rows(), batch_size);
- auto column = block_read->get_by_position(0).column;
+ spill_block_reader->read(&block_read, &eos);
+ EXPECT_EQ(block_read.rows(), batch_size);
+ auto column = block_read.get_by_position(0).column;
auto* real_column = (vectorized::ColumnVector<int>*)column.get();
for (size_t j = 0; j < batch_size; ++j) {
EXPECT_EQ(real_column->get_int(j), j + i * batch_size);
}
}
- spill_block_reader->read(&block_read);
+ spill_block_reader->read(&block_read, &eos);
spill_block_reader->close();
- EXPECT_EQ(block_read->rows(), 1);
- auto column = block_read->get_by_position(0).column;
+ EXPECT_EQ(block_read.rows(), 1);
+ auto column = block_read.get_by_position(0).column;
auto* real_column = (vectorized::ColumnVector<int>*)column.get();
EXPECT_EQ(real_column->get_int(0), 0);
}
@@ -159,13 +160,14 @@ TEST_F(TestBlockSpill, TestIntNullable) {
vectorized::BlockSpillReaderUPtr spill_block_reader;
block_spill_manager->get_reader(spill_block_writer->get_id(),
spill_block_reader, profile_);
- vectorized::Block* block_read;
+ vectorized::Block block_read;
+ bool eos = false;
for (int i = 0; i < batch_num; ++i) {
- spill_block_reader->read(&block_read);
+ spill_block_reader->read(&block_read, &eos);
- EXPECT_EQ(block_read->rows(), batch_size);
- auto column = block_read->get_by_position(0).column;
+ EXPECT_EQ(block_read.rows(), batch_size);
+ auto column = block_read.get_by_position(0).column;
auto* real_column = (vectorized::ColumnNullable*)column.get();
const auto& int_column =
(const
vectorized::ColumnVector<int>&)(real_column->get_nested_column());
@@ -178,11 +180,11 @@ TEST_F(TestBlockSpill, TestIntNullable) {
}
}
- spill_block_reader->read(&block_read);
+ spill_block_reader->read(&block_read, &eos);
spill_block_reader->close();
- EXPECT_EQ(block_read->rows(), 1);
- auto column = block_read->get_by_position(0).column;
+ EXPECT_EQ(block_read.rows(), 1);
+ auto column = block_read.get_by_position(0).column;
auto* real_column = (vectorized::ColumnNullable*)column.get();
const auto& int_column =
(const
vectorized::ColumnVector<int>&)(real_column->get_nested_column());
@@ -211,25 +213,26 @@ TEST_F(TestBlockSpill, TestString) {
vectorized::BlockSpillReaderUPtr spill_block_reader;
block_spill_manager->get_reader(spill_block_writer->get_id(),
spill_block_reader, profile_);
- vectorized::Block* block_read;
+ vectorized::Block block_read;
+ bool eos = false;
for (int i = 0; i < batch_num; ++i) {
- st = spill_block_reader->read(&block_read);
+ st = spill_block_reader->read(&block_read, &eos);
EXPECT_TRUE(st.ok());
- EXPECT_EQ(block_read->rows(), batch_size);
- auto column = block_read->get_by_position(0).column;
+ EXPECT_EQ(block_read.rows(), batch_size);
+ auto column = block_read.get_by_position(0).column;
auto* real_column = (vectorized::ColumnString*)column.get();
for (size_t j = 0; j < batch_size; ++j) {
EXPECT_EQ(real_column->get_data_at(j), StringRef(std::to_string(j
+ i * batch_size)));
}
}
- spill_block_reader->read(&block_read);
+ spill_block_reader->read(&block_read, &eos);
spill_block_reader->close();
- EXPECT_EQ(block_read->rows(), 1);
- auto column = block_read->get_by_position(0).column;
+ EXPECT_EQ(block_read.rows(), 1);
+ auto column = block_read.get_by_position(0).column;
auto* real_column = (vectorized::ColumnString*)column.get();
EXPECT_EQ(real_column->get_data_at(0), StringRef(std::to_string(batch_size
* 3)));
}
@@ -262,14 +265,15 @@ TEST_F(TestBlockSpill, TestStringNullable) {
vectorized::BlockSpillReaderUPtr spill_block_reader;
block_spill_manager->get_reader(spill_block_writer->get_id(),
spill_block_reader, profile_);
- vectorized::Block* block_read;
+ vectorized::Block block_read;
+ bool eos = false;
for (int i = 0; i < batch_num; ++i) {
- st = spill_block_reader->read(&block_read);
+ st = spill_block_reader->read(&block_read, &eos);
EXPECT_TRUE(st.ok());
- EXPECT_EQ(block_read->rows(), batch_size);
- auto column = block_read->get_by_position(0).column;
+ EXPECT_EQ(block_read.rows(), batch_size);
+ auto column = block_read.get_by_position(0).column;
auto* real_column = (vectorized::ColumnNullable*)column.get();
const auto& string_column =
(const
vectorized::ColumnString&)(real_column->get_nested_column());
@@ -283,12 +287,12 @@ TEST_F(TestBlockSpill, TestStringNullable) {
}
}
- st = spill_block_reader->read(&block_read);
+ st = spill_block_reader->read(&block_read, &eos);
spill_block_reader->close();
EXPECT_TRUE(st.ok());
- EXPECT_EQ(block_read->rows(), 1);
- auto column = block_read->get_by_position(0).column;
+ EXPECT_EQ(block_read.rows(), 1);
+ auto column = block_read.get_by_position(0).column;
auto* real_column = (vectorized::ColumnNullable*)column.get();
const auto& string_column = (const
vectorized::ColumnString&)(real_column->get_nested_column());
EXPECT_EQ(string_column.get_data_at(0),
StringRef(std::to_string(batch_size * 3)));
@@ -320,14 +324,15 @@ TEST_F(TestBlockSpill, TestDecimal) {
vectorized::BlockSpillReaderUPtr spill_block_reader;
block_spill_manager->get_reader(spill_block_writer->get_id(),
spill_block_reader, profile_);
- vectorized::Block* block_read;
+ vectorized::Block block_read;
+ bool eos = false;
for (int i = 0; i < batch_num; ++i) {
- st = spill_block_reader->read(&block_read);
+ st = spill_block_reader->read(&block_read, &eos);
EXPECT_TRUE(st.ok());
- EXPECT_EQ(block_read->rows(), batch_size);
- auto column = block_read->get_by_position(0).column;
+ EXPECT_EQ(block_read.rows(), batch_size);
+ auto column = block_read.get_by_position(0).column;
auto* real_column =
(vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int128>>*)column.get();
for (size_t j = 0; j < batch_size; ++j) {
@@ -336,12 +341,12 @@ TEST_F(TestBlockSpill, TestDecimal) {
}
}
- st = spill_block_reader->read(&block_read);
+ st = spill_block_reader->read(&block_read, &eos);
spill_block_reader->close();
EXPECT_TRUE(st.ok());
- EXPECT_EQ(block_read->rows(), 1);
- auto column = block_read->get_by_position(0).column;
+ EXPECT_EQ(block_read.rows(), 1);
+ auto column = block_read.get_by_position(0).column;
auto* real_column =
(vectorized::ColumnDecimal<vectorized::Decimal<vectorized::Int128>>*)column.get();
EXPECT_EQ(real_column->get_element(0), batch_size * 3 * (pow(10, 9) +
pow(10, 8)));
@@ -376,14 +381,15 @@ TEST_F(TestBlockSpill, TestDecimalNullable) {
vectorized::BlockSpillReaderUPtr spill_block_reader;
block_spill_manager->get_reader(spill_block_writer->get_id(),
spill_block_reader, profile_);
- vectorized::Block* block_read;
+ vectorized::Block block_read;
+ bool eos = false;
for (int i = 0; i < batch_num; ++i) {
- st = spill_block_reader->read(&block_read);
+ st = spill_block_reader->read(&block_read, &eos);
EXPECT_TRUE(st.ok());
- EXPECT_EQ(block_read->rows(), batch_size);
- auto column = block_read->get_by_position(0).column;
+ EXPECT_EQ(block_read.rows(), batch_size);
+ auto column = block_read.get_by_position(0).column;
auto* real_column = (vectorized::ColumnNullable*)column.get();
const auto& decimal_col =
(vectorized::ColumnDecimal<vectorized::Decimal<
vectorized::Int128>>&)(real_column->get_nested_column());
@@ -397,12 +403,12 @@ TEST_F(TestBlockSpill, TestDecimalNullable) {
}
}
- st = spill_block_reader->read(&block_read);
+ st = spill_block_reader->read(&block_read, &eos);
spill_block_reader->close();
EXPECT_TRUE(st.ok());
- EXPECT_EQ(block_read->rows(), 1);
- auto column = block_read->get_by_position(0).column;
+ EXPECT_EQ(block_read.rows(), 1);
+ auto column = block_read.get_by_position(0).column;
auto* real_column = (vectorized::ColumnNullable*)column.get();
const auto& decimal_col =
(vectorized::ColumnDecimal<
@@ -441,14 +447,15 @@ TEST_F(TestBlockSpill, TestBitmap) {
vectorized::BlockSpillReaderUPtr spill_block_reader;
block_spill_manager->get_reader(spill_block_writer->get_id(),
spill_block_reader, profile_);
- vectorized::Block* block_read;
+ vectorized::Block block_read;
+ bool eos = false;
for (int i = 0; i < batch_num; ++i) {
- st = spill_block_reader->read(&block_read);
+ st = spill_block_reader->read(&block_read, &eos);
EXPECT_TRUE(st.ok());
- EXPECT_EQ(block_read->rows(), batch_size);
- auto column = block_read->get_by_position(0).column;
+ EXPECT_EQ(block_read.rows(), batch_size);
+ auto column = block_read.get_by_position(0).column;
auto* real_column =
(vectorized::ColumnComplexType<BitmapValue>*)column.get();
for (size_t j = 0; j < batch_size; ++j) {
auto bitmap_str =
convert_bitmap_to_string(real_column->get_element(j));
@@ -456,12 +463,12 @@ TEST_F(TestBlockSpill, TestBitmap) {
}
}
- st = spill_block_reader->read(&block_read);
+ st = spill_block_reader->read(&block_read, &eos);
spill_block_reader->close();
EXPECT_TRUE(st.ok());
- EXPECT_EQ(block_read->rows(), 1);
- auto column = block_read->get_by_position(0).column;
+ EXPECT_EQ(block_read.rows(), 1);
+ auto column = block_read.get_by_position(0).column;
auto* real_column =
(vectorized::ColumnComplexType<BitmapValue>*)column.get();
auto bitmap_str = convert_bitmap_to_string(real_column->get_element(0));
EXPECT_EQ(bitmap_str, expected_bitmap_str[3 * batch_size]);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]