This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d17ed5e  [vectorization](storage)support seq column in storage layer 
(#8186)
d17ed5e is described below

commit d17ed5e27aee00f916bfb7c34ecba15df9006531
Author: wangbo <[email protected]>
AuthorDate: Wed Feb 23 12:23:31 2022 +0800

    [vectorization](storage)support seq column in storage layer (#8186)
    
    [vectorization](storage)support seq column in storage layer (#8186)
---
 be/src/olap/rowset/beta_rowset_reader.cpp    |   2 +-
 be/src/vec/core/block.h                      |   8 ++
 be/src/vec/olap/vgeneric_iterators.cpp       |  23 ++++--
 be/src/vec/olap/vgeneric_iterators.h         |   2 +-
 be/test/vec/exec/vgeneric_iterators_test.cpp | 118 ++++++++++++++++++++++++++-
 5 files changed, 143 insertions(+), 10 deletions(-)

diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp 
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 0134cb5..7ac05a5 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -135,7 +135,7 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* 
read_context) {
     }
 #else
     if (read_context->need_ordered_result && 
_rowset->rowset_meta()->is_segments_overlapping()) {
-        final_iterator = vectorized::new_merge_iterator(iterators, 
_parent_tracker);
+        final_iterator = vectorized::new_merge_iterator(iterators, 
_parent_tracker, read_context->sequence_id_idx);
     } else {
         final_iterator = vectorized::new_union_iterator(iterators, 
_parent_tracker);
     }
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 621dabf..816ed29 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -274,6 +274,14 @@ public:
         return 0;
     }
 
+    //note(wb) no DCHECK here, because this method is only used after 
compare_at now, so no need to repeat check here.
+    // If this method is used in more places, you can add DCHECK case by case.
+    int compare_column_at(size_t n, size_t m, size_t col_idx, const Block& 
rhs, int nan_direction_hint) const {
+        auto res = get_by_position(col_idx).column->compare_at(n, m, 
*(rhs.get_by_position(col_idx).column),
+                                                             
nan_direction_hint);
+        return res;
+    }
+
     doris::Tuple* deep_copy_tuple(const TupleDescriptor&, MemPool*, int, int, 
bool padding_char = false);
 
 private:
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp 
b/be/src/vec/olap/vgeneric_iterators.cpp
index f0f148d..8145cc3 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -121,7 +121,7 @@ Status VAutoIncrementIterator::init(const 
StorageReadOptions& opts) {
 //      }
 class VMergeIteratorContext {
 public:
-    VMergeIteratorContext(RowwiseIterator* iter) : _iter(iter) {}
+    VMergeIteratorContext(RowwiseIterator* iter, int sequence_id_idx) : 
_iter(iter), _sequence_id_idx(sequence_id_idx) {}
     VMergeIteratorContext(const VMergeIteratorContext&) = delete;
     VMergeIteratorContext(VMergeIteratorContext&&) = delete;
     VMergeIteratorContext& operator=(const VMergeIteratorContext&) = delete;
@@ -166,6 +166,13 @@ public:
         if (cmp_res != 0) {
             return cmp_res > 0;
         }
+        
+        if (_sequence_id_idx != -1) {
+            int col_cmp_res = this->_block.compare_column_at(_index_in_block, 
rhs._index_in_block, _sequence_id_idx, rhs._block, -1);
+            if (col_cmp_res != 0) {
+                return col_cmp_res < 0;
+            }
+        }
         return this->data_id() < rhs.data_id();
     }
 
@@ -203,15 +210,15 @@ private:
     // Load next block into _block
     Status _load_next_block();
 
-private:
     RowwiseIterator* _iter;
 
-    // used to store data load from iteerator->next_batch(Vectorized::Block*)
+    // used to store data load from iterator->next_batch(Vectorized::Block*)
     vectorized::Block _block;
 
     bool _valid = false;
     size_t _index_in_block = -1;
     int _block_row_max = 4096;
+    int _sequence_id_idx = -1;
 };
 
 Status VMergeIteratorContext::init(const StorageReadOptions& opts) {
@@ -259,7 +266,8 @@ Status VMergeIteratorContext::_load_next_block() {
 class VMergeIterator : public RowwiseIterator {
 public:
     // VMergeIterator takes the ownership of input iterators
-    VMergeIterator(std::vector<RowwiseIterator*>& iters, 
std::shared_ptr<MemTracker> parent) : _origin_iters(iters) {
+    VMergeIterator(std::vector<RowwiseIterator*>& iters, 
std::shared_ptr<MemTracker> parent, int sequence_id_idx) : 
+        _origin_iters(iters),_sequence_id_idx(sequence_id_idx) {
         // use for count the mem use of Block use in Merge
         _mem_tracker = MemTracker::CreateTracker(-1, "VMergeIterator", parent, 
false);
     }
@@ -297,6 +305,7 @@ private:
     VMergeHeap _merge_heap;
 
     int block_row_max = 0;
+    int _sequence_id_idx = -1;
 };
 
 Status VMergeIterator::init(const StorageReadOptions& opts) {
@@ -306,7 +315,7 @@ Status VMergeIterator::init(const StorageReadOptions& opts) 
{
     _schema = &(*_origin_iters.begin())->schema();
 
     for (auto iter : _origin_iters) {
-        auto ctx = std::make_unique<VMergeIteratorContext>(iter);
+        auto ctx = std::make_unique<VMergeIteratorContext>(iter, 
_sequence_id_idx);
         RETURN_IF_ERROR(ctx->init(opts));
         if (!ctx->valid()) {
             continue;
@@ -403,11 +412,11 @@ Status VUnionIterator::next_batch(vectorized::Block* 
block) {
 }
 
 
-RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, 
std::shared_ptr<MemTracker> parent) {
+RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, 
std::shared_ptr<MemTracker> parent, int sequence_id_idx) {
     if (inputs.size() == 1) {
         return *(inputs.begin());
     }
-    return new VMergeIterator(inputs, parent);
+    return new VMergeIterator(inputs, parent, sequence_id_idx);
 }
 
 RowwiseIterator* new_union_iterator(std::vector<RowwiseIterator*>& inputs, 
std::shared_ptr<MemTracker> parent) {
diff --git a/be/src/vec/olap/vgeneric_iterators.h 
b/be/src/vec/olap/vgeneric_iterators.h
index 8177a63..af9733b 100644
--- a/be/src/vec/olap/vgeneric_iterators.h
+++ b/be/src/vec/olap/vgeneric_iterators.h
@@ -27,7 +27,7 @@ namespace vectorized {
 //
 // Inputs iterators' ownership is taken by created merge iterator. And client
 // should delete returned iterator after usage.
-RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, 
std::shared_ptr<MemTracker> parent);
+RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, 
std::shared_ptr<MemTracker> parent, int sequence_id_idx);
 
 // Create a union iterator for input iterators. Union iterator will read
 // input iterators one by one.
diff --git a/be/test/vec/exec/vgeneric_iterators_test.cpp 
b/be/test/vec/exec/vgeneric_iterators_test.cpp
index bfe3511..405c9a9 100644
--- a/be/test/vec/exec/vgeneric_iterators_test.cpp
+++ b/be/test/vec/exec/vgeneric_iterators_test.cpp
@@ -149,7 +149,7 @@ TEST(VGenericIteratorsTest, Merge) {
     inputs.push_back(vectorized::new_auto_increment_iterator(schema, 200));
     inputs.push_back(vectorized::new_auto_increment_iterator(schema, 300));
 
-    auto iter = vectorized::new_merge_iterator(inputs, 
MemTracker::CreateTracker(-1, "VMergeIterator", nullptr, false));
+    auto iter = vectorized::new_merge_iterator(inputs, 
MemTracker::CreateTracker(-1, "VMergeIterator", nullptr, false), -1);
     StorageReadOptions opts;
     auto st = iter->init(opts);
     ASSERT_TRUE(st.ok());
@@ -189,6 +189,122 @@ TEST(VGenericIteratorsTest, Merge) {
     delete iter;
 }
 
+// only used for Seq Column UT
+class SeqColumnUtIterator : public RowwiseIterator {
+public:
+    // Will generate num_rows rows in total
+    SeqColumnUtIterator(const Schema& schema, size_t num_rows, size_t 
rows_returned, size_t seq_col_idx, size_t seq_col_rows_returned)
+            : _schema(schema), _num_rows(num_rows), 
_rows_returned(rows_returned), _seq_col_idx(seq_col_idx), 
_seq_col_rows_returned(seq_col_rows_returned) {}
+    ~SeqColumnUtIterator() override {}
+
+    // NOTE: Currently, this function will ignore StorageReadOptions
+    Status init(const StorageReadOptions& opts) override {
+        return Status::OK();
+    };
+
+    Status next_batch(vectorized::Block* block) override {
+        int row_idx = 0;
+        while (_rows_returned < _num_rows) {
+            for (int j = 0; j < _schema.num_columns(); ++j) {
+                vectorized::ColumnWithTypeAndName& vc = 
block->get_by_position(j);
+                vectorized::IColumn& vi = (vectorized::IColumn&)(*vc.column);
+
+                char data[16] = {};
+                size_t data_len = 0;
+                const auto* col_schema = _schema.column(j);
+                switch (col_schema->type()) {
+                    case OLAP_FIELD_TYPE_SMALLINT:
+                        *(int16_t*)data = j == _seq_col_idx ? 
_seq_col_rows_returned : 1;
+                        data_len = sizeof(int16_t);
+                        break; 
+                    case OLAP_FIELD_TYPE_INT:
+                        *(int32_t*)data = j == _seq_col_idx ? 
_seq_col_rows_returned : 1;
+                        data_len = sizeof(int32_t);
+                        break;
+                    case OLAP_FIELD_TYPE_BIGINT:
+                        *(int64_t*)data = j == _seq_col_idx ? 
_seq_col_rows_returned : 1;
+                        data_len = sizeof(int64_t);
+                        break;
+                    case OLAP_FIELD_TYPE_FLOAT: 
+                        *(float*)data = j == _seq_col_idx ? 
_seq_col_rows_returned : 1;
+                        data_len = sizeof(float);
+                        break;
+                    case OLAP_FIELD_TYPE_DOUBLE: 
+                        *(double*)data = j == _seq_col_idx ? 
_seq_col_rows_returned : 1;
+                        data_len = sizeof(double);
+                        break;
+                    default:
+                        break;
+                }
+
+                vi.insert_data(data, data_len);
+            }
+
+            ++_rows_returned;
+            _seq_col_rows_returned++;
+            row_idx++;
+        }
+        
+        if (row_idx > 0)
+            return Status::OK();
+        return Status::EndOfFile("End of VAutoIncrementIterator");
+    }
+
+    const Schema& schema() const override { return _schema; }
+
+    const Schema& _schema;
+    size_t _num_rows;
+    size_t _rows_returned;
+    int _seq_col_idx = -1;
+    int _seq_col_rows_returned = -1;
+};
+
+TEST(VGenericIteratorsTest, MergeWithSeqColumn) {
+    ASSERT_TRUE(1);
+    auto schema = create_schema();
+    std::vector<RowwiseIterator*> inputs;
+
+    int seq_column_id = 2;
+    int seg_iter_num = 10;
+    int num_rows = 1;
+    int rows_begin = 0;
+    // The same key in each file will only keep one with the largest seq id
+    // keep the key columns all the same, but seq column value different
+    // input seg file in Ascending,  expect output seq column in Descending 
+    for (int i = 0; i < seg_iter_num; i++) {
+        int seq_id_in_every_file = i;
+        inputs.push_back(new SeqColumnUtIterator(schema, num_rows, rows_begin, 
seq_column_id, seq_id_in_every_file));
+    }
+
+    auto iter = vectorized::new_merge_iterator(inputs, 
MemTracker::CreateTracker(-1, "VMergeIterator", nullptr, false), seq_column_id);
+    StorageReadOptions opts;
+    auto st = iter->init(opts);
+    ASSERT_TRUE(st.ok());
+
+    vectorized::Block block;
+    create_block(schema, block);
+
+    do {
+        st = iter->next_batch(&block);
+    } while (st.ok());
+
+    ASSERT_TRUE(st.is_end_of_file());
+    ASSERT_EQ(block.rows(), seg_iter_num);
+
+    auto col0 = block.get_by_position(0).column;
+    auto col1 = block.get_by_position(1).column;
+    auto seq_col = block.get_by_position(seq_column_id).column;
+
+    for (size_t i = 0; i < seg_iter_num; i++) {
+        size_t expected_value = seg_iter_num - i - 1; // in Descending 
+        size_t actual_value = (*seq_col)[i].get<int>();
+        ASSERT_EQ(expected_value, actual_value);
+    }
+
+    delete iter;
+}
+
+
 } // namespace vectorized
 
 } // namespace doris

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to