This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit f1e9dd513fd2c7a121fc35673c6069662b3caeb2 Author: Gabriel <[email protected]> AuthorDate: Thu May 19 16:35:15 2022 +0800 [BUG] fix bug for vectorized compaction and some storage vectorization bug (#9610) --- be/src/olap/compaction.cpp | 5 +++- be/src/vec/olap/block_reader.cpp | 2 +- be/src/vec/olap/vcollect_iterator.cpp | 45 ++++++++++++++++++++++++++++------ be/src/vec/olap/vcollect_iterator.h | 2 +- be/src/vec/olap/vgeneric_iterators.cpp | 5 +++- 5 files changed, 47 insertions(+), 12 deletions(-) diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index 35fce9be5c..a2947cc515 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -81,7 +81,10 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) { _output_version = Version(_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()); - LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->full_name() + auto use_vectorized_compaction = false; + string merge_type = use_vectorized_compaction ? "v" : ""; + + LOG(INFO) << "start " << merge_type << compaction_name() << ". tablet=" << _tablet->full_name() << ", output_version=" << _output_version << ", permits: " << permits; RETURN_NOT_OK(construct_output_rowset_writer()); diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index 1959fc70fa..e9da92f1ba 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -64,7 +64,7 @@ OLAPStatus BlockReader::_init_collect_iter(const ReaderParams& read_params, } } - _vcollect_iter.build_heap(*valid_rs_readers); + RETURN_NOT_OK(_vcollect_iter.build_heap(*valid_rs_readers)); if (_vcollect_iter.is_merge()) { auto status = _vcollect_iter.current_row(&_next_row); _eof = status == OLAP_ERR_DATA_EOF; diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp index 18d80225d4..411c96f4e5 100644 --- a/be/src/vec/olap/vcollect_iterator.cpp +++ b/be/src/vec/olap/vcollect_iterator.cpp @@ -25,6 +25,14 @@ namespace vectorized { VCollectIterator::~VCollectIterator() {} +#define RETURN_IF_NOT_EOF_AND_OK(stmt) \ + do { \ + OLAPStatus _status_ = (stmt); \ + if (UNLIKELY(_status_ != OLAP_SUCCESS && _status_ != OLAP_ERR_DATA_EOF)) { \ + return _status_; \ + } \ + } while (false) + void VCollectIterator::init(TabletReader* reader) { _reader = reader; // when aggregate is enabled or key_type is DUP_KEYS, we don't merge @@ -44,20 +52,24 @@ OLAPStatus VCollectIterator::add_child(RowsetReaderSharedPtr rs_reader) { // Build a merge heap. If _merge is true, a rowset with the max rownum // status will be used as the base rowset, and the other rowsets will be merged first and // then merged with the base rowset. -void VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers) { +OLAPStatus VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers) { DCHECK(rs_readers.size() == _children.size()); _skip_same = _reader->_tablet->tablet_schema().keys_type() == KeysType::UNIQUE_KEYS; if (_children.empty()) { _inner_iter.reset(nullptr); - return; + return OLAP_SUCCESS; } else if (_merge) { DCHECK(!rs_readers.empty()); for (auto [c_iter, r_iter] = std::pair {_children.begin(), rs_readers.begin()}; c_iter != _children.end();) { - if ((*c_iter)->init() != OLAP_SUCCESS) { + OLAPStatus s = (*c_iter)->init(); + if (s != OLAP_SUCCESS) { delete (*c_iter); c_iter = _children.erase(c_iter); r_iter = rs_readers.erase(r_iter); + if (s != OLAP_ERR_DATA_EOF) { + return s; + } } else { ++c_iter; ++r_iter; @@ -90,7 +102,7 @@ void VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers } Level1Iterator* cumu_iter = new Level1Iterator(cumu_children, _reader, cumu_children.size() > 1, _skip_same); - cumu_iter->init(); + RETURN_IF_NOT_EOF_AND_OK(cumu_iter->init()); std::list<LevelIterator*> children; children.push_back(*base_reader_child); children.push_back(cumu_iter); @@ -102,9 +114,10 @@ void VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers } else { _inner_iter.reset(new Level1Iterator(_children, _reader, _merge, _skip_same)); } - _inner_iter->init(); + RETURN_IF_NOT_EOF_AND_OK(_inner_iter->init()); // Clear _children earlier to release any related references _children.clear(); + return OLAP_SUCCESS; } bool VCollectIterator::LevelIteratorComparator::operator()(LevelIterator* lhs, LevelIterator* rhs) { @@ -194,9 +207,13 @@ OLAPStatus VCollectIterator::Level0Iterator::_refresh_current_row() { _ref.row_pos = 0; _block->clear_column_data(); auto res = _rs_reader->next_block(_block.get()); - if (res != OLAP_SUCCESS) { + if (res != OLAP_SUCCESS && res != OLAP_ERR_DATA_EOF) { return res; } + if (res == OLAP_ERR_DATA_EOF && _block->rows() == 0) { + _ref.row_pos = -1; + return OLAP_ERR_DATA_EOF; + } } } while (_block->rows() != 0); _ref.row_pos = -1; @@ -206,13 +223,25 @@ OLAPStatus VCollectIterator::Level0Iterator::_refresh_current_row() { OLAPStatus VCollectIterator::Level0Iterator::next(IteratorRowRef* ref) { _ref.row_pos++; RETURN_NOT_OK(_refresh_current_row()); - *ref = _ref; return OLAP_SUCCESS; } OLAPStatus VCollectIterator::Level0Iterator::next(Block* block) { - return _rs_reader->next_block(block); + if (UNLIKELY(_ref.block->rows() > 0 && _ref.row_pos == 0)) { + block->swap(*_ref.block); + _ref.row_pos = -1; + return OLAP_SUCCESS; + } else { + auto res = _rs_reader->next_block(block); + if (res != OLAP_SUCCESS && res != OLAP_ERR_DATA_EOF) { + return res; + } + if (res == OLAP_ERR_DATA_EOF && _block->rows() == 0) { + return OLAP_ERR_DATA_EOF; + } + return OLAP_SUCCESS; + } } VCollectIterator::Level1Iterator::Level1Iterator( diff --git a/be/src/vec/olap/vcollect_iterator.h b/be/src/vec/olap/vcollect_iterator.h index b5c0089498..a37eb5e2c4 100644 --- a/be/src/vec/olap/vcollect_iterator.h +++ b/be/src/vec/olap/vcollect_iterator.h @@ -49,7 +49,7 @@ public: OLAPStatus add_child(RowsetReaderSharedPtr rs_reader); - void build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers); + OLAPStatus build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers); // Get top row of the heap, nullptr if reach end. OLAPStatus current_row(IteratorRowRef* ref) const; diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp index e63bb8d28a..19e7be78b0 100644 --- a/be/src/vec/olap/vgeneric_iterators.cpp +++ b/be/src/vec/olap/vgeneric_iterators.cpp @@ -346,7 +346,10 @@ Status VMergeIterator::next_batch(vectorized::Block* block) { delete ctx; } } - + if (!_merge_heap.empty()) { + return Status::OK(); + } + // Still last batch needs to be processed return Status::EndOfFile("no more data in segment"); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
