This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch dev-1.0.1 in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
commit f5386cb8e7b38471d6f19c56b3e51779dd6d3260 Author: HappenLee <[email protected]> AuthorDate: Tue May 31 12:29:16 2022 +0800 [Bug][Fix] One Rowset have same key output in unique table (#9858) Co-authored-by: lihaopeng <[email protected]> --- be/src/olap/collect_iterator.cpp | 2 +- be/src/olap/generic_iterators.cpp | 52 ++++++++++++++++-------- be/src/olap/generic_iterators.h | 2 +- be/src/olap/reader.cpp | 1 + be/src/olap/rowset/beta_rowset_reader.cpp | 4 +- be/src/olap/rowset/rowset_reader_context.h | 1 + be/src/olap/schema_change.cpp | 2 + be/src/olap/tuple_reader.cpp | 25 ++++++++---- be/src/vec/olap/vcollect_iterator.cpp | 8 +--- be/src/vec/olap/vgeneric_iterators.cpp | 44 +++++++++++++------- be/src/vec/olap/vgeneric_iterators.h | 2 +- be/test/olap/generic_iterators_test.cpp | 38 ++++++++++++++++- be/test/vec/exec/vgeneric_iterators_test.cpp | 61 ++++++++++++++++++++++------ 13 files changed, 175 insertions(+), 67 deletions(-) diff --git a/be/src/olap/collect_iterator.cpp b/be/src/olap/collect_iterator.cpp index 404da7e35d..df1f3231a8 100644 --- a/be/src/olap/collect_iterator.cpp +++ b/be/src/olap/collect_iterator.cpp @@ -117,7 +117,7 @@ bool CollectIterator::LevelIteratorComparator::operator()(const LevelIterator* a return cmp_res > 0; } - // Second: If sequence_id_idx != 0 means we need to compare sequence. sequence only use + // Second: If _sequence_id_idx != 0 means we need to compare sequence. sequence only use // in unique key. so keep reverse order here if (_sequence_id_idx != -1) { auto seq_first_cell = first->cell(_sequence_id_idx); diff --git a/be/src/olap/generic_iterators.cpp b/be/src/olap/generic_iterators.cpp index 1b8f176637..ba4cc5559d 100644 --- a/be/src/olap/generic_iterators.cpp +++ b/be/src/olap/generic_iterators.cpp @@ -150,6 +150,10 @@ public: uint64_t data_id() const { return _iter->data_id(); } + bool need_skip() const { return _skip; } + + void set_skip(bool skip) const { _skip = skip; } + private: // Load next block into _block Status _load_next_block(); @@ -161,6 +165,7 @@ private: RowBlockV2 _block; bool _valid = false; + mutable bool _skip = false; size_t _index_in_block = -1; }; @@ -174,6 +179,7 @@ Status MergeIteratorContext::init(const StorageReadOptions& opts) { } Status MergeIteratorContext::advance() { + _skip = false; // NOTE: we increase _index_in_block directly to valid one check do { _index_in_block++; @@ -207,8 +213,8 @@ Status MergeIteratorContext::_load_next_block() { class MergeIterator : public RowwiseIterator { public: // MergeIterator takes the ownership of input iterators - MergeIterator(std::vector<RowwiseIterator*> iters, std::shared_ptr<MemTracker> parent, int sequence_id_idx) - : _origin_iters(std::move(iters)), _sequence_id_idx(sequence_id_idx), _merge_heap(MergeContextComparator(_sequence_id_idx)) { + MergeIterator(std::vector<RowwiseIterator*> iters, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique) + : _origin_iters(std::move(iters)), _sequence_id_idx(sequence_id_idx), _is_unique(is_unique), _merge_heap(MergeContextComparator(_sequence_id_idx, is_unique)) { // use for count the mem use of Block use in Merge _mem_tracker = MemTracker::CreateTracker(-1, "MergeIterator", std::move(parent), false); } @@ -232,11 +238,13 @@ private: std::vector<RowwiseIterator*> _origin_iters; int _sequence_id_idx; + bool _is_unique; std::unique_ptr<Schema> _schema; struct MergeContextComparator { - explicit MergeContextComparator(int idx) : sequence_id_idx(idx) {}; + MergeContextComparator(int idx, bool is_unique) + : _sequence_id_idx(idx), _is_unique(is_unique) {}; bool operator()(const MergeIteratorContext* lhs, const MergeIteratorContext* rhs) const { auto lhs_row = lhs->current_row(); @@ -245,23 +253,29 @@ private: if (cmp_res != 0) { return cmp_res > 0; } - - // Second: If sequence_id_idx != 0 means we need to compare sequence. sequence only use + auto res = 0; + // Second: If _sequence_id_idx != 0 means we need to compare sequence. sequence only use // in unique key. so keep reverse order of sequence id here - if (sequence_id_idx != -1) { - auto l_cell = lhs_row.cell(sequence_id_idx); - auto r_cell = rhs_row.cell(sequence_id_idx); - auto res = lhs_row.schema()->column(sequence_id_idx)->compare_cell(l_cell, r_cell); - if (res != 0) return res < 0; + if (_sequence_id_idx != -1) { + auto l_cell = lhs_row.cell(_sequence_id_idx); + auto r_cell = rhs_row.cell(_sequence_id_idx); + res = lhs_row.schema()->column(_sequence_id_idx)->compare_cell(l_cell, r_cell); } + // if row cursors equal, compare segment id. // here we sort segment id in reverse order, because of the row order in AGG_KEYS // dose no matter, but in UNIQUE_KEYS table we only read the latest is one, so we // return the row in reverse order of segment id - return lhs->data_id() < rhs->data_id(); + bool result = res == 0 ? lhs->data_id() < rhs->data_id() : res < 0; + if (_is_unique) { + result ? lhs->set_skip(true) : rhs->set_skip(true); + } + + return result; } - int sequence_id_idx; + int _sequence_id_idx; + bool _is_unique; }; using MergeHeap = std::priority_queue<MergeIteratorContext*, @@ -292,13 +306,15 @@ Status MergeIterator::init(const StorageReadOptions& opts) { Status MergeIterator::next_batch(RowBlockV2* block) { size_t row_idx = 0; - for (; row_idx < block->capacity() && !_merge_heap.empty(); ++row_idx) { + for (; row_idx < block->capacity() && !_merge_heap.empty();) { auto ctx = _merge_heap.top(); _merge_heap.pop(); - RowBlockRow dst_row = block->row(row_idx); - // copy current row to block - copy_row(&dst_row, ctx->current_row(), block->pool()); + if (!ctx->need_skip()) { + RowBlockRow dst_row = block->row(row_idx++); + // copy current row to block + copy_row(&dst_row, ctx->current_row(), block->pool()); + } RETURN_IF_ERROR(ctx->advance()); if (ctx->valid()) { @@ -374,11 +390,11 @@ Status UnionIterator::next_batch(RowBlockV2* block) { return Status::EndOfFile("End of UnionIterator"); } -RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx) { +RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique) { if (inputs.size() == 1) { return *(inputs.begin()); } - return new MergeIterator(std::move(inputs), parent, sequence_id_idx); + return new MergeIterator(std::move(inputs), parent, sequence_id_idx, is_unique); } RowwiseIterator* new_union_iterator(std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent) { diff --git a/be/src/olap/generic_iterators.h b/be/src/olap/generic_iterators.h index e8f4528885..1a6ca90030 100644 --- a/be/src/olap/generic_iterators.h +++ b/be/src/olap/generic_iterators.h @@ -25,7 +25,7 @@ namespace doris { // // Inputs iterators' ownership is taken by created merge iterator. And client // should delete returned iterator after usage. -RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx); +RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique); // Create a union iterator for input iterators. Union iterator will read // input iterators one by one. diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp index 4542ae3f65..819e84a9a1 100644 --- a/be/src/olap/reader.cpp +++ b/be/src/olap/reader.cpp @@ -223,6 +223,7 @@ OLAPStatus TabletReader::_capture_rs_readers(const ReaderParams& read_params, _reader_context.use_page_cache = read_params.use_page_cache; _reader_context.sequence_id_idx = _sequence_col_idx; _reader_context.batch_size = _batch_size; + _reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS; *valid_rs_readers = *rs_readers; diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp index 4e475fa9b1..3148656dd6 100644 --- a/be/src/olap/rowset/beta_rowset_reader.cpp +++ b/be/src/olap/rowset/beta_rowset_reader.cpp @@ -120,13 +120,13 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) { RowwiseIterator* final_iterator; if (config::enable_storage_vectorization && read_context->is_vec) { if (read_context->need_ordered_result && _rowset->rowset_meta()->is_segments_overlapping()) { - final_iterator = vectorized::new_merge_iterator(iterators, _parent_tracker, read_context->sequence_id_idx); + final_iterator = vectorized::new_merge_iterator(iterators, _parent_tracker, read_context->sequence_id_idx, read_context->is_unique); } else { final_iterator = vectorized::new_union_iterator(iterators, _parent_tracker); } } else { if (read_context->need_ordered_result && _rowset->rowset_meta()->is_segments_overlapping()) { - final_iterator = new_merge_iterator(iterators, _parent_tracker, read_context->sequence_id_idx); + final_iterator = new_merge_iterator(iterators, _parent_tracker, read_context->sequence_id_idx, read_context->is_unique); } else { final_iterator = new_union_iterator(iterators, _parent_tracker); } diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h index 07d9340fdf..0ae42f6cf4 100644 --- a/be/src/olap/rowset/rowset_reader_context.h +++ b/be/src/olap/rowset/rowset_reader_context.h @@ -63,6 +63,7 @@ struct RowsetReaderContext { int sequence_id_idx = -1; int batch_size = 1024; bool is_vec = false; + bool is_unique = false; }; } // namespace doris diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 0cd9ae7567..b27fea5f71 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -1505,6 +1505,7 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe // for schema change, seek_columns is the same to return_columns reader_context.seek_columns = &return_columns; reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx(); + reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS; auto mem_tracker = MemTracker::CreateTracker(-1, "AlterTablet:" + std::to_string(base_tablet->tablet_id()) + "-" + std::to_string(new_tablet->tablet_id()), _mem_tracker, true, false, MemTrackerLevel::TASK); @@ -1724,6 +1725,7 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tabl reader_context.return_columns = &return_columns; reader_context.seek_columns = &return_columns; reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx(); + reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS; RowsetReaderSharedPtr rowset_reader; RETURN_NOT_OK((*base_rowset)->create_reader(_mem_tracker, &rowset_reader)); diff --git a/be/src/olap/tuple_reader.cpp b/be/src/olap/tuple_reader.cpp index 5c15c2b42f..33640777f5 100644 --- a/be/src/olap/tuple_reader.cpp +++ b/be/src/olap/tuple_reader.cpp @@ -185,15 +185,24 @@ OLAPStatus TupleReader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem // in UNIQUE_KEY highest version is the final result, there is no need to // merge the lower versions direct_copy_row(row_cursor, *_next_key); - // skip the lower version rows; - auto res = _collect_iter.next(&_next_key, &_next_delete_flag); - if (LIKELY(res != OLAP_ERR_DATA_EOF)) { - if (UNLIKELY(res != OLAP_SUCCESS)) { - LOG(WARNING) << "next failed: " << res; - return res; + while (_next_key) { + // skip the lower version rows; + auto res = _collect_iter.next(&_next_key, &_next_delete_flag); + if (LIKELY(res != OLAP_ERR_DATA_EOF)) { + if (UNLIKELY(res != OLAP_SUCCESS)) { + LOG(WARNING) << "next failed: " << res; + return res; + } + + if (!equal_row(_key_cids, *row_cursor, *_next_key)) { + agg_finalize_row(_value_cids, row_cursor, mem_pool); + break; + } + _merged_rows++; + cur_delete_flag = _next_delete_flag; + } else { + break; } - agg_finalize_row(_value_cids, row_cursor, mem_pool); - cur_delete_flag = _next_delete_flag; } // if reader needs to filter delete row and current delete_flag is true, diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp index 411c96f4e5..0d351a55fb 100644 --- a/be/src/vec/olap/vcollect_iterator.cpp +++ b/be/src/vec/olap/vcollect_iterator.cpp @@ -142,14 +142,8 @@ bool VCollectIterator::LevelIteratorComparator::operator()(LevelIterator* lhs, L // for UNIQUE_KEYS just read the highest version and no need agg_update. // for AGG_KEYS if a version is deleted, the lower version no need to agg_update bool lower = (cmp_res != 0) ? (cmp_res < 0) : (lhs->version() < rhs->version()); + lower ? lhs->set_same(true) : rhs->set_same(true); - // if lhs or rhs set same is true, means some same value already output, so need to - // set another is same - if (lower) { - lhs->is_same() ? rhs->set_same(true) : lhs->set_same(true); - } else { - rhs->is_same() ? lhs->set_same(true) : rhs->set_same(true); - } return lower; } diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp index 19e7be78b0..2ffb4e224f 100644 --- a/be/src/vec/olap/vgeneric_iterators.cpp +++ b/be/src/vec/olap/vgeneric_iterators.cpp @@ -120,7 +120,7 @@ Status VAutoIncrementIterator::init(const StorageReadOptions& opts) { // } class VMergeIteratorContext { public: - VMergeIteratorContext(RowwiseIterator* iter, int sequence_id_idx) : _iter(iter), _sequence_id_idx(sequence_id_idx) {} + VMergeIteratorContext(RowwiseIterator* iter, int sequence_id_idx, bool is_unique) : _iter(iter), _sequence_id_idx(sequence_id_idx), _is_unique(is_unique) {} VMergeIteratorContext(const VMergeIteratorContext&) = delete; VMergeIteratorContext(VMergeIteratorContext&&) = delete; VMergeIteratorContext& operator=(const VMergeIteratorContext&) = delete; @@ -165,14 +165,18 @@ public: if (cmp_res != 0) { return cmp_res > 0; } - + + auto col_cmp_res = 0; if (_sequence_id_idx != -1) { - int col_cmp_res = this->_block.compare_column_at(_index_in_block, rhs._index_in_block, _sequence_id_idx, rhs._block, -1); - if (col_cmp_res != 0) { - return col_cmp_res < 0; - } + col_cmp_res = this->_block.compare_column_at(_index_in_block, rhs._index_in_block, + _sequence_id_idx, rhs._block, -1); } - return this->data_id() < rhs.data_id(); + auto result = col_cmp_res == 0 ? this->data_id() < rhs.data_id() : col_cmp_res < 0; + + if (_is_unique) { + result ? this->set_skip(true) : rhs.set_skip(true); + } + return result; } void copy_row(vectorized::Block* block) { @@ -203,6 +207,10 @@ public: uint64_t data_id() const { return _iter->data_id(); } + bool need_skip() const { return _skip; } + + void set_skip(bool skip) const { _skip = skip; } + private: // Load next block into _block Status _load_next_block(); @@ -212,10 +220,12 @@ private: // used to store data load from iterator->next_batch(Vectorized::Block*) vectorized::Block _block; + int _sequence_id_idx = -1; + bool _is_unique = false; bool _valid = false; + mutable bool _skip = false; size_t _index_in_block = -1; int _block_row_max = 4096; - int _sequence_id_idx = -1; }; Status VMergeIteratorContext::init(const StorageReadOptions& opts) { @@ -230,6 +240,7 @@ Status VMergeIteratorContext::init(const StorageReadOptions& opts) { } Status VMergeIteratorContext::advance() { + _skip = false; // NOTE: we increase _index_in_block directly to valid one check do { _index_in_block++; @@ -263,8 +274,8 @@ Status VMergeIteratorContext::_load_next_block() { class VMergeIterator : public RowwiseIterator { public: // VMergeIterator takes the ownership of input iterators - VMergeIterator(std::vector<RowwiseIterator*>& iters, std::shared_ptr<MemTracker> parent, int sequence_id_idx) : - _origin_iters(iters),_sequence_id_idx(sequence_id_idx) { + VMergeIterator(std::vector<RowwiseIterator*>& iters, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique) : + _origin_iters(iters),_sequence_id_idx(sequence_id_idx), _is_unique(is_unique) { // use for count the mem use of Block use in Merge _mem_tracker = MemTracker::CreateTracker(-1, "VMergeIterator", parent, false); } @@ -303,6 +314,7 @@ private: int block_row_max = 0; int _sequence_id_idx = -1; + bool _is_unique = false; }; Status VMergeIterator::init(const StorageReadOptions& opts) { @@ -312,7 +324,7 @@ Status VMergeIterator::init(const StorageReadOptions& opts) { _schema = &(*_origin_iters.begin())->schema(); for (auto iter : _origin_iters) { - auto ctx = std::make_unique<VMergeIteratorContext>(iter, _sequence_id_idx); + auto ctx = std::make_unique<VMergeIteratorContext>(iter, _sequence_id_idx, _is_unique); RETURN_IF_ERROR(ctx->init(opts)); if (!ctx->valid()) { continue; @@ -335,8 +347,10 @@ Status VMergeIterator::next_batch(vectorized::Block* block) { auto ctx = _merge_heap.top(); _merge_heap.pop(); - // copy current row to block - ctx->copy_row(block); + if (!ctx->need_skip()) { + // copy current row to block + ctx->copy_row(block); + } RETURN_IF_ERROR(ctx->advance()); if (ctx->valid()) { @@ -412,11 +426,11 @@ Status VUnionIterator::next_batch(vectorized::Block* block) { } -RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx) { +RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique) { if (inputs.size() == 1) { return *(inputs.begin()); } - return new VMergeIterator(inputs, parent, sequence_id_idx); + return new VMergeIterator(inputs, parent, sequence_id_idx, is_unique); } RowwiseIterator* new_union_iterator(std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent) { diff --git a/be/src/vec/olap/vgeneric_iterators.h b/be/src/vec/olap/vgeneric_iterators.h index af9733bf44..063d07da51 100644 --- a/be/src/vec/olap/vgeneric_iterators.h +++ b/be/src/vec/olap/vgeneric_iterators.h @@ -27,7 +27,7 @@ namespace vectorized { // // Inputs iterators' ownership is taken by created merge iterator. And client // should delete returned iterator after usage. -RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx); +RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique); // Create a union iterator for input iterators. Union iterator will read // input iterators one by one. diff --git a/be/test/olap/generic_iterators_test.cpp b/be/test/olap/generic_iterators_test.cpp index b73ad0271e..de25b071bb 100644 --- a/be/test/olap/generic_iterators_test.cpp +++ b/be/test/olap/generic_iterators_test.cpp @@ -115,7 +115,7 @@ TEST(GenericIteratorsTest, Union) { delete iter; } -TEST(GenericIteratorsTest, Merge) { +TEST(GenericIteratorsTest, MergeAgg) { auto schema = create_schema(); std::vector<RowwiseIterator*> inputs; @@ -124,7 +124,7 @@ TEST(GenericIteratorsTest, Merge) { inputs.push_back(new_auto_increment_iterator(schema, 300)); auto iter = new_merge_iterator( - std::move(inputs), MemTracker::CreateTracker(-1, "MergeIterator", nullptr, false), -1); + std::move(inputs), MemTracker::CreateTracker(-1, "MergeIterator", nullptr, false), -1, false); StorageReadOptions opts; auto st = iter->init(opts); ASSERT_TRUE(st.ok()); @@ -158,6 +158,40 @@ TEST(GenericIteratorsTest, Merge) { delete iter; } +TEST(GenericIteratorsTest, MergeUnique) { + auto schema = create_schema(); + std::vector<RowwiseIterator*> inputs; + + inputs.push_back(new_auto_increment_iterator(schema, 100)); + inputs.push_back(new_auto_increment_iterator(schema, 200)); + inputs.push_back(new_auto_increment_iterator(schema, 300)); + + auto iter = new_merge_iterator(std::move(inputs), -1, true); + StorageReadOptions opts; + auto st = iter->init(opts); + EXPECT_TRUE(st.ok()); + + RowBlockV2 block(schema, 128); + + size_t row_count = 0; + do { + block.clear(); + st = iter->next_batch(&block); + for (int i = 0; i < block.num_rows(); ++i) { + size_t base_value = row_count; + auto row = block.row(i); + EXPECT_EQ(base_value, *(int16_t*)row.cell_ptr(0)); + EXPECT_EQ(base_value + 1, *(int32_t*)row.cell_ptr(1)); + EXPECT_EQ(base_value + 2, *(int64_t*)row.cell_ptr(2)); + row_count++; + } + } while (st.ok()); + EXPECT_TRUE(st.is_end_of_file()); + EXPECT_EQ(300, row_count); + + delete iter; +} + } // namespace doris int main(int argc, char** argv) { diff --git a/be/test/vec/exec/vgeneric_iterators_test.cpp b/be/test/vec/exec/vgeneric_iterators_test.cpp index 405c9a9103..6189fdd821 100644 --- a/be/test/vec/exec/vgeneric_iterators_test.cpp +++ b/be/test/vec/exec/vgeneric_iterators_test.cpp @@ -140,8 +140,8 @@ TEST(VGenericIteratorsTest, Union) { delete iter; } -TEST(VGenericIteratorsTest, Merge) { - ASSERT_TRUE(1); +TEST(VGenericIteratorsTest, MergeAgg) { + EXPECT_TRUE(1); auto schema = create_schema(); std::vector<RowwiseIterator*> inputs; @@ -149,7 +149,7 @@ TEST(VGenericIteratorsTest, Merge) { inputs.push_back(vectorized::new_auto_increment_iterator(schema, 200)); inputs.push_back(vectorized::new_auto_increment_iterator(schema, 300)); - auto iter = vectorized::new_merge_iterator(inputs, MemTracker::CreateTracker(-1, "VMergeIterator", nullptr, false), -1); + auto iter = vectorized::new_merge_iterator(inputs, MemTracker::CreateTracker(-1, "VMergeIterator", nullptr, false), -1, false); StorageReadOptions opts; auto st = iter->init(opts); ASSERT_TRUE(st.ok()); @@ -189,6 +189,47 @@ TEST(VGenericIteratorsTest, Merge) { delete iter; } +TEST(VGenericIteratorsTest, MergeUnique) { + EXPECT_TRUE(1); + auto schema = create_schema(); + std::vector<RowwiseIterator*> inputs; + + inputs.push_back(vectorized::new_auto_increment_iterator(schema, 100)); + inputs.push_back(vectorized::new_auto_increment_iterator(schema, 200)); + inputs.push_back(vectorized::new_auto_increment_iterator(schema, 300)); + + auto iter = vectorized::new_merge_iterator(inputs, -1, true); + StorageReadOptions opts; + auto st = iter->init(opts); + EXPECT_TRUE(st.ok()); + + vectorized::Block block; + create_block(schema, block); + + do { + st = iter->next_batch(&block); + } while (st.ok()); + + EXPECT_TRUE(st.is_end_of_file()); + EXPECT_EQ(block.rows(), 300); + + auto c0 = block.get_by_position(0).column; + auto c1 = block.get_by_position(1).column; + auto c2 = block.get_by_position(2).column; + + size_t row_count = 0; + for (size_t i = 0; i < block.rows(); ++i) { + size_t base_value = row_count; + + EXPECT_EQ(base_value, (*c0)[i].get<int>()); + EXPECT_EQ(base_value + 1, (*c1)[i].get<int>()); + EXPECT_EQ(base_value + 2, (*c2)[i].get<int>()); + row_count++; + } + + delete iter; +} + // only used for Seq Column UT class SeqColumnUtIterator : public RowwiseIterator { public: @@ -276,7 +317,7 @@ TEST(VGenericIteratorsTest, MergeWithSeqColumn) { inputs.push_back(new SeqColumnUtIterator(schema, num_rows, rows_begin, seq_column_id, seq_id_in_every_file)); } - auto iter = vectorized::new_merge_iterator(inputs, MemTracker::CreateTracker(-1, "VMergeIterator", nullptr, false), seq_column_id); + auto iter = vectorized::new_merge_iterator(inputs, MemTracker::CreateTracker(-1, "VMergeIterator", nullptr, false), seq_column_id, true); StorageReadOptions opts; auto st = iter->init(opts); ASSERT_TRUE(st.ok()); @@ -288,18 +329,14 @@ TEST(VGenericIteratorsTest, MergeWithSeqColumn) { st = iter->next_batch(&block); } while (st.ok()); - ASSERT_TRUE(st.is_end_of_file()); - ASSERT_EQ(block.rows(), seg_iter_num); + EXPECT_TRUE(st.is_end_of_file()); + EXPECT_EQ(block.rows(), 1); auto col0 = block.get_by_position(0).column; auto col1 = block.get_by_position(1).column; auto seq_col = block.get_by_position(seq_column_id).column; - - for (size_t i = 0; i < seg_iter_num; i++) { - size_t expected_value = seg_iter_num - i - 1; // in Descending - size_t actual_value = (*seq_col)[i].get<int>(); - ASSERT_EQ(expected_value, actual_value); - } + size_t actual_value = (*seq_col)[0].get<int>(); + EXPECT_EQ(seg_iter_num - 1, actual_value); delete iter; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
