This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new d17ed5e [vectorization](storage)support seq column in storage layer
(#8186)
d17ed5e is described below
commit d17ed5e27aee00f916bfb7c34ecba15df9006531
Author: wangbo <[email protected]>
AuthorDate: Wed Feb 23 12:23:31 2022 +0800
[vectorization](storage)support seq column in storage layer (#8186)
[vectorization](storage)support seq column in storage layer (#8186)
---
be/src/olap/rowset/beta_rowset_reader.cpp | 2 +-
be/src/vec/core/block.h | 8 ++
be/src/vec/olap/vgeneric_iterators.cpp | 23 ++++--
be/src/vec/olap/vgeneric_iterators.h | 2 +-
be/test/vec/exec/vgeneric_iterators_test.cpp | 118 ++++++++++++++++++++++++++-
5 files changed, 143 insertions(+), 10 deletions(-)
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 0134cb5..7ac05a5 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -135,7 +135,7 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext*
read_context) {
}
#else
if (read_context->need_ordered_result &&
_rowset->rowset_meta()->is_segments_overlapping()) {
- final_iterator = vectorized::new_merge_iterator(iterators,
_parent_tracker);
+ final_iterator = vectorized::new_merge_iterator(iterators,
_parent_tracker, read_context->sequence_id_idx);
} else {
final_iterator = vectorized::new_union_iterator(iterators,
_parent_tracker);
}
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 621dabf..816ed29 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -274,6 +274,14 @@ public:
return 0;
}
+ //note(wb) no DCHECK here, because this method is only used after
compare_at now, so no need to repeat check here.
+ // If this method is used in more places, you can add DCHECK case by case.
+ int compare_column_at(size_t n, size_t m, size_t col_idx, const Block&
rhs, int nan_direction_hint) const {
+ auto res = get_by_position(col_idx).column->compare_at(n, m,
*(rhs.get_by_position(col_idx).column),
+
nan_direction_hint);
+ return res;
+ }
+
doris::Tuple* deep_copy_tuple(const TupleDescriptor&, MemPool*, int, int,
bool padding_char = false);
private:
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp
b/be/src/vec/olap/vgeneric_iterators.cpp
index f0f148d..8145cc3 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -121,7 +121,7 @@ Status VAutoIncrementIterator::init(const
StorageReadOptions& opts) {
// }
class VMergeIteratorContext {
public:
- VMergeIteratorContext(RowwiseIterator* iter) : _iter(iter) {}
+ VMergeIteratorContext(RowwiseIterator* iter, int sequence_id_idx) :
_iter(iter), _sequence_id_idx(sequence_id_idx) {}
VMergeIteratorContext(const VMergeIteratorContext&) = delete;
VMergeIteratorContext(VMergeIteratorContext&&) = delete;
VMergeIteratorContext& operator=(const VMergeIteratorContext&) = delete;
@@ -166,6 +166,13 @@ public:
if (cmp_res != 0) {
return cmp_res > 0;
}
+
+ if (_sequence_id_idx != -1) {
+ int col_cmp_res = this->_block.compare_column_at(_index_in_block,
rhs._index_in_block, _sequence_id_idx, rhs._block, -1);
+ if (col_cmp_res != 0) {
+ return col_cmp_res < 0;
+ }
+ }
return this->data_id() < rhs.data_id();
}
@@ -203,15 +210,15 @@ private:
// Load next block into _block
Status _load_next_block();
-private:
RowwiseIterator* _iter;
- // used to store data load from iteerator->next_batch(Vectorized::Block*)
+ // used to store data load from iterator->next_batch(Vectorized::Block*)
vectorized::Block _block;
bool _valid = false;
size_t _index_in_block = -1;
int _block_row_max = 4096;
+ int _sequence_id_idx = -1;
};
Status VMergeIteratorContext::init(const StorageReadOptions& opts) {
@@ -259,7 +266,8 @@ Status VMergeIteratorContext::_load_next_block() {
class VMergeIterator : public RowwiseIterator {
public:
// VMergeIterator takes the ownership of input iterators
- VMergeIterator(std::vector<RowwiseIterator*>& iters,
std::shared_ptr<MemTracker> parent) : _origin_iters(iters) {
+ VMergeIterator(std::vector<RowwiseIterator*>& iters,
std::shared_ptr<MemTracker> parent, int sequence_id_idx) :
+ _origin_iters(iters),_sequence_id_idx(sequence_id_idx) {
// use for count the mem use of Block use in Merge
_mem_tracker = MemTracker::CreateTracker(-1, "VMergeIterator", parent,
false);
}
@@ -297,6 +305,7 @@ private:
VMergeHeap _merge_heap;
int block_row_max = 0;
+ int _sequence_id_idx = -1;
};
Status VMergeIterator::init(const StorageReadOptions& opts) {
@@ -306,7 +315,7 @@ Status VMergeIterator::init(const StorageReadOptions& opts)
{
_schema = &(*_origin_iters.begin())->schema();
for (auto iter : _origin_iters) {
- auto ctx = std::make_unique<VMergeIteratorContext>(iter);
+ auto ctx = std::make_unique<VMergeIteratorContext>(iter,
_sequence_id_idx);
RETURN_IF_ERROR(ctx->init(opts));
if (!ctx->valid()) {
continue;
@@ -403,11 +412,11 @@ Status VUnionIterator::next_batch(vectorized::Block*
block) {
}
-RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs,
std::shared_ptr<MemTracker> parent) {
+RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs,
std::shared_ptr<MemTracker> parent, int sequence_id_idx) {
if (inputs.size() == 1) {
return *(inputs.begin());
}
- return new VMergeIterator(inputs, parent);
+ return new VMergeIterator(inputs, parent, sequence_id_idx);
}
RowwiseIterator* new_union_iterator(std::vector<RowwiseIterator*>& inputs,
std::shared_ptr<MemTracker> parent) {
diff --git a/be/src/vec/olap/vgeneric_iterators.h
b/be/src/vec/olap/vgeneric_iterators.h
index 8177a63..af9733b 100644
--- a/be/src/vec/olap/vgeneric_iterators.h
+++ b/be/src/vec/olap/vgeneric_iterators.h
@@ -27,7 +27,7 @@ namespace vectorized {
//
// Inputs iterators' ownership is taken by created merge iterator. And client
// should delete returned iterator after usage.
-RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs,
std::shared_ptr<MemTracker> parent);
+RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs,
std::shared_ptr<MemTracker> parent, int sequence_id_idx);
// Create a union iterator for input iterators. Union iterator will read
// input iterators one by one.
diff --git a/be/test/vec/exec/vgeneric_iterators_test.cpp
b/be/test/vec/exec/vgeneric_iterators_test.cpp
index bfe3511..405c9a9 100644
--- a/be/test/vec/exec/vgeneric_iterators_test.cpp
+++ b/be/test/vec/exec/vgeneric_iterators_test.cpp
@@ -149,7 +149,7 @@ TEST(VGenericIteratorsTest, Merge) {
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 200));
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 300));
- auto iter = vectorized::new_merge_iterator(inputs,
MemTracker::CreateTracker(-1, "VMergeIterator", nullptr, false));
+ auto iter = vectorized::new_merge_iterator(inputs,
MemTracker::CreateTracker(-1, "VMergeIterator", nullptr, false), -1);
StorageReadOptions opts;
auto st = iter->init(opts);
ASSERT_TRUE(st.ok());
@@ -189,6 +189,122 @@ TEST(VGenericIteratorsTest, Merge) {
delete iter;
}
+// only used for Seq Column UT
+class SeqColumnUtIterator : public RowwiseIterator {
+public:
+ // Will generate num_rows rows in total
+ SeqColumnUtIterator(const Schema& schema, size_t num_rows, size_t
rows_returned, size_t seq_col_idx, size_t seq_col_rows_returned)
+ : _schema(schema), _num_rows(num_rows),
_rows_returned(rows_returned), _seq_col_idx(seq_col_idx),
_seq_col_rows_returned(seq_col_rows_returned) {}
+ ~SeqColumnUtIterator() override {}
+
+ // NOTE: Currently, this function will ignore StorageReadOptions
+ Status init(const StorageReadOptions& opts) override {
+ return Status::OK();
+ };
+
+ Status next_batch(vectorized::Block* block) override {
+ int row_idx = 0;
+ while (_rows_returned < _num_rows) {
+ for (int j = 0; j < _schema.num_columns(); ++j) {
+ vectorized::ColumnWithTypeAndName& vc =
block->get_by_position(j);
+ vectorized::IColumn& vi = (vectorized::IColumn&)(*vc.column);
+
+ char data[16] = {};
+ size_t data_len = 0;
+ const auto* col_schema = _schema.column(j);
+ switch (col_schema->type()) {
+ case OLAP_FIELD_TYPE_SMALLINT:
+ *(int16_t*)data = j == _seq_col_idx ?
_seq_col_rows_returned : 1;
+ data_len = sizeof(int16_t);
+ break;
+ case OLAP_FIELD_TYPE_INT:
+ *(int32_t*)data = j == _seq_col_idx ?
_seq_col_rows_returned : 1;
+ data_len = sizeof(int32_t);
+ break;
+ case OLAP_FIELD_TYPE_BIGINT:
+ *(int64_t*)data = j == _seq_col_idx ?
_seq_col_rows_returned : 1;
+ data_len = sizeof(int64_t);
+ break;
+ case OLAP_FIELD_TYPE_FLOAT:
+ *(float*)data = j == _seq_col_idx ?
_seq_col_rows_returned : 1;
+ data_len = sizeof(float);
+ break;
+ case OLAP_FIELD_TYPE_DOUBLE:
+ *(double*)data = j == _seq_col_idx ?
_seq_col_rows_returned : 1;
+ data_len = sizeof(double);
+ break;
+ default:
+ break;
+ }
+
+ vi.insert_data(data, data_len);
+ }
+
+ ++_rows_returned;
+ _seq_col_rows_returned++;
+ row_idx++;
+ }
+
+ if (row_idx > 0)
+ return Status::OK();
+ return Status::EndOfFile("End of VAutoIncrementIterator");
+ }
+
+ const Schema& schema() const override { return _schema; }
+
+ const Schema& _schema;
+ size_t _num_rows;
+ size_t _rows_returned;
+ int _seq_col_idx = -1;
+ int _seq_col_rows_returned = -1;
+};
+
+TEST(VGenericIteratorsTest, MergeWithSeqColumn) {
+ ASSERT_TRUE(1);
+ auto schema = create_schema();
+ std::vector<RowwiseIterator*> inputs;
+
+ int seq_column_id = 2;
+ int seg_iter_num = 10;
+ int num_rows = 1;
+ int rows_begin = 0;
+ // The same key in each file will only keep one with the largest seq id
+ // keep the key columns all the same, but seq column value different
+ // input seg file in Ascending, expect output seq column in Descending
+ for (int i = 0; i < seg_iter_num; i++) {
+ int seq_id_in_every_file = i;
+ inputs.push_back(new SeqColumnUtIterator(schema, num_rows, rows_begin,
seq_column_id, seq_id_in_every_file));
+ }
+
+ auto iter = vectorized::new_merge_iterator(inputs,
MemTracker::CreateTracker(-1, "VMergeIterator", nullptr, false), seq_column_id);
+ StorageReadOptions opts;
+ auto st = iter->init(opts);
+ ASSERT_TRUE(st.ok());
+
+ vectorized::Block block;
+ create_block(schema, block);
+
+ do {
+ st = iter->next_batch(&block);
+ } while (st.ok());
+
+ ASSERT_TRUE(st.is_end_of_file());
+ ASSERT_EQ(block.rows(), seg_iter_num);
+
+ auto col0 = block.get_by_position(0).column;
+ auto col1 = block.get_by_position(1).column;
+ auto seq_col = block.get_by_position(seq_column_id).column;
+
+ for (size_t i = 0; i < seg_iter_num; i++) {
+ size_t expected_value = seg_iter_num - i - 1; // in Descending
+ size_t actual_value = (*seq_col)[i].get<int>();
+ ASSERT_EQ(expected_value, actual_value);
+ }
+
+ delete iter;
+}
+
+
} // namespace vectorized
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]