This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit f1e9dd513fd2c7a121fc35673c6069662b3caeb2
Author: Gabriel <[email protected]>
AuthorDate: Thu May 19 16:35:15 2022 +0800

    [BUG] fix bug for vectorized compaction and some storage vectorization bug 
(#9610)
---
 be/src/olap/compaction.cpp             |  5 +++-
 be/src/vec/olap/block_reader.cpp       |  2 +-
 be/src/vec/olap/vcollect_iterator.cpp  | 45 ++++++++++++++++++++++++++++------
 be/src/vec/olap/vcollect_iterator.h    |  2 +-
 be/src/vec/olap/vgeneric_iterators.cpp |  5 +++-
 5 files changed, 47 insertions(+), 12 deletions(-)

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 35fce9be5c..a2947cc515 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -81,7 +81,10 @@ OLAPStatus Compaction::do_compaction_impl(int64_t permits) {
     _output_version =
             Version(_input_rowsets.front()->start_version(), 
_input_rowsets.back()->end_version());
 
-    LOG(INFO) << "start " << compaction_name() << ". tablet=" << 
_tablet->full_name()
+    auto use_vectorized_compaction = false;
+    string merge_type = use_vectorized_compaction ? "v" : "";
+
+    LOG(INFO) << "start " << merge_type << compaction_name() << ". tablet=" << 
_tablet->full_name()
               << ", output_version=" << _output_version << ", permits: " << 
permits;
 
     RETURN_NOT_OK(construct_output_rowset_writer());
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 1959fc70fa..e9da92f1ba 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -64,7 +64,7 @@ OLAPStatus BlockReader::_init_collect_iter(const 
ReaderParams& read_params,
         }
     }
 
-    _vcollect_iter.build_heap(*valid_rs_readers);
+    RETURN_NOT_OK(_vcollect_iter.build_heap(*valid_rs_readers));
     if (_vcollect_iter.is_merge()) {
         auto status = _vcollect_iter.current_row(&_next_row);
         _eof = status == OLAP_ERR_DATA_EOF;
diff --git a/be/src/vec/olap/vcollect_iterator.cpp 
b/be/src/vec/olap/vcollect_iterator.cpp
index 18d80225d4..411c96f4e5 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -25,6 +25,14 @@ namespace vectorized {
 
 VCollectIterator::~VCollectIterator() {}
 
+#define RETURN_IF_NOT_EOF_AND_OK(stmt)                                         
         \
+    do {                                                                       
         \
+        OLAPStatus _status_ = (stmt);                                          
         \
+        if (UNLIKELY(_status_ != OLAP_SUCCESS && _status_ != 
OLAP_ERR_DATA_EOF)) {      \
+            return _status_;                                                   
         \
+        }                                                                      
         \
+    } while (false)
+
 void VCollectIterator::init(TabletReader* reader) {
     _reader = reader;
     // when aggregate is enabled or key_type is DUP_KEYS, we don't merge
@@ -44,20 +52,24 @@ OLAPStatus 
VCollectIterator::add_child(RowsetReaderSharedPtr rs_reader) {
 // Build a merge heap. If _merge is true, a rowset with the max rownum
 // status will be used as the base rowset, and the other rowsets will be 
merged first and
 // then merged with the base rowset.
-void VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& 
rs_readers) {
+OLAPStatus VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& 
rs_readers) {
     DCHECK(rs_readers.size() == _children.size());
     _skip_same = _reader->_tablet->tablet_schema().keys_type() == 
KeysType::UNIQUE_KEYS;
     if (_children.empty()) {
         _inner_iter.reset(nullptr);
-        return;
+        return OLAP_SUCCESS;
     } else if (_merge) {
         DCHECK(!rs_readers.empty());
         for (auto [c_iter, r_iter] = std::pair {_children.begin(), 
rs_readers.begin()};
              c_iter != _children.end();) {
-            if ((*c_iter)->init() != OLAP_SUCCESS) {
+            OLAPStatus s = (*c_iter)->init();
+            if (s != OLAP_SUCCESS) {
                 delete (*c_iter);
                 c_iter = _children.erase(c_iter);
                 r_iter = rs_readers.erase(r_iter);
+                if (s != OLAP_ERR_DATA_EOF) {
+                    return s;
+                }
             } else {
                 ++c_iter;
                 ++r_iter;
@@ -90,7 +102,7 @@ void 
VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers
             }
             Level1Iterator* cumu_iter = new Level1Iterator(cumu_children, 
_reader,
                                                            
cumu_children.size() > 1, _skip_same);
-            cumu_iter->init();
+            RETURN_IF_NOT_EOF_AND_OK(cumu_iter->init());
             std::list<LevelIterator*> children;
             children.push_back(*base_reader_child);
             children.push_back(cumu_iter);
@@ -102,9 +114,10 @@ void 
VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers
     } else {
         _inner_iter.reset(new Level1Iterator(_children, _reader, _merge, 
_skip_same));
     }
-    _inner_iter->init();
+    RETURN_IF_NOT_EOF_AND_OK(_inner_iter->init());
     // Clear _children earlier to release any related references
     _children.clear();
+    return OLAP_SUCCESS;
 }
 
 bool VCollectIterator::LevelIteratorComparator::operator()(LevelIterator* lhs, 
LevelIterator* rhs) {
@@ -194,9 +207,13 @@ OLAPStatus 
VCollectIterator::Level0Iterator::_refresh_current_row() {
             _ref.row_pos = 0;
             _block->clear_column_data();
             auto res = _rs_reader->next_block(_block.get());
-            if (res != OLAP_SUCCESS) {
+            if (res != OLAP_SUCCESS && res != OLAP_ERR_DATA_EOF) {
                 return res;
             }
+            if (res == OLAP_ERR_DATA_EOF && _block->rows() == 0) {
+                _ref.row_pos = -1;
+                return OLAP_ERR_DATA_EOF;
+            }
         }
     } while (_block->rows() != 0);
     _ref.row_pos = -1;
@@ -206,13 +223,25 @@ OLAPStatus 
VCollectIterator::Level0Iterator::_refresh_current_row() {
 OLAPStatus VCollectIterator::Level0Iterator::next(IteratorRowRef* ref) {
     _ref.row_pos++;
     RETURN_NOT_OK(_refresh_current_row());
-
     *ref = _ref;
     return OLAP_SUCCESS;
 }
 
 OLAPStatus VCollectIterator::Level0Iterator::next(Block* block) {
-    return _rs_reader->next_block(block);
+    if (UNLIKELY(_ref.block->rows() > 0 && _ref.row_pos == 0)) {
+        block->swap(*_ref.block);
+        _ref.row_pos = -1;
+        return OLAP_SUCCESS;
+    } else {
+        auto res = _rs_reader->next_block(block);
+        if (res != OLAP_SUCCESS && res != OLAP_ERR_DATA_EOF) {
+            return res;
+        }
+        if (res == OLAP_ERR_DATA_EOF && _block->rows() == 0) {
+            return OLAP_ERR_DATA_EOF;
+        }
+        return OLAP_SUCCESS;
+    }
 }
 
 VCollectIterator::Level1Iterator::Level1Iterator(
diff --git a/be/src/vec/olap/vcollect_iterator.h 
b/be/src/vec/olap/vcollect_iterator.h
index b5c0089498..a37eb5e2c4 100644
--- a/be/src/vec/olap/vcollect_iterator.h
+++ b/be/src/vec/olap/vcollect_iterator.h
@@ -49,7 +49,7 @@ public:
 
     OLAPStatus add_child(RowsetReaderSharedPtr rs_reader);
 
-    void build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers);
+    OLAPStatus build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers);
     // Get top row of the heap, nullptr if reach end.
     OLAPStatus current_row(IteratorRowRef* ref) const;
 
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp 
b/be/src/vec/olap/vgeneric_iterators.cpp
index e63bb8d28a..19e7be78b0 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -346,7 +346,10 @@ Status VMergeIterator::next_batch(vectorized::Block* 
block) {
             delete ctx;
         }
     }
-
+    if (!_merge_heap.empty()) {
+        return Status::OK();
+    }
+    // Still last batch needs to be processed
     return Status::EndOfFile("no more data in segment");
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to