This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
commit c1816d0bc695bcda500b4e15ea5c9c48e4275fef Author: huanghaibin <[email protected]> AuthorDate: Fri Jul 28 16:41:00 2023 +0800 (vertical compaction) fix vertical compaction core (#22275) * (vertical compaction) fix vertical compaction core co-author:@zhannngchen --- be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 25 ++++++++-- be/src/olap/rowset/vertical_beta_rowset_writer.h | 9 ++-- be/src/vec/olap/vertical_block_reader.cpp | 57 +++++++++++++++++++--- be/src/vec/olap/vertical_block_reader.h | 8 ++- be/src/vec/olap/vertical_merge_iterator.cpp | 31 +++++++++--- be/src/vec/olap/vertical_merge_iterator.h | 4 ++ be/test/vec/olap/vertical_compaction_test.cpp | 28 ++++++++--- 7 files changed, 132 insertions(+), 30 deletions(-) diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp index e7639a516f..4ae2ee017f 100644 --- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp +++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp @@ -99,19 +99,33 @@ Status VerticalBetaRowsetWriter::add_columns(const vectorized::Block* block, uint32_t num_rows_written = _segment_writers[_cur_writer_idx]->num_rows_written(); VLOG_NOTICE << "num_rows_written: " << num_rows_written << ", _cur_writer_idx: " << _cur_writer_idx; + uint32_t num_rows_key_group = _segment_writers[_cur_writer_idx]->row_count(); // init if it's first value column write in current segment if (_cur_writer_idx == 0 && num_rows_written == 0) { VLOG_NOTICE << "init first value column segment writer"; RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->init(col_ids, is_key)); } - if (num_rows_written > max_rows_per_segment) { + // when splitting segment, need to make rows align between key columns and value columns + size_t start_offset = 0, limit = num_rows; + if (num_rows_written + num_rows >= num_rows_key_group && + _cur_writer_idx < _segment_writers.size() - 1) { + RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block( + block, 0, num_rows_key_group - num_rows_written)); RETURN_IF_ERROR(_flush_columns(&_segment_writers[_cur_writer_idx])); - // switch to next writer + start_offset = num_rows_key_group - num_rows_written; + limit = num_rows - start_offset; ++_cur_writer_idx; - VLOG_NOTICE << "init next value column segment writer: " << _cur_writer_idx; + // switch to next writer RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->init(col_ids, is_key)); + num_rows_written = 0; + num_rows_key_group = _segment_writers[_cur_writer_idx]->row_count(); + } + if (limit > 0) { + RETURN_IF_ERROR( + _segment_writers[_cur_writer_idx]->append_block(block, start_offset, limit)); + DCHECK(_segment_writers[_cur_writer_idx]->num_rows_written() <= + _segment_writers[_cur_writer_idx]->row_count()); } - RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(block, 0, num_rows)); } if (is_key) { _num_rows_written += num_rows; @@ -126,6 +140,7 @@ Status VerticalBetaRowsetWriter::_flush_columns( RETURN_IF_ERROR((*segment_writer)->finalize_columns_data()); RETURN_IF_ERROR((*segment_writer)->finalize_columns_index(&index_size)); if (is_key) { + _total_key_group_rows += (*segment_writer)->row_count(); // record segment key bound KeyBoundsPB key_bounds; Slice min_key = (*segment_writer)->min_encoded_key(); @@ -147,7 +162,7 @@ Status VerticalBetaRowsetWriter::flush_columns(bool is_key) { return Status::OK(); } - DCHECK(_segment_writers[_cur_writer_idx]); + DCHECK(_cur_writer_idx < _segment_writers.size() && _segment_writers[_cur_writer_idx]); RETURN_IF_ERROR(_flush_columns(&_segment_writers[_cur_writer_idx], is_key)); _cur_writer_idx = 0; return Status::OK(); diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.h b/be/src/olap/rowset/vertical_beta_rowset_writer.h index 71b132b9b4..8e318686db 100644 --- a/be/src/olap/rowset/vertical_beta_rowset_writer.h +++ b/be/src/olap/rowset/vertical_beta_rowset_writer.h @@ -39,13 +39,15 @@ public: ~VerticalBetaRowsetWriter(); Status add_columns(const vectorized::Block* block, const std::vector<uint32_t>& col_ids, - bool is_key, uint32_t max_rows_per_segment); + bool is_key, uint32_t max_rows_per_segment) override; // flush last segment's column - Status flush_columns(bool is_key); + Status flush_columns(bool is_key) override; // flush when all column finished, flush column footer - Status final_flush(); + Status final_flush() override; + + int64_t num_rows() const override { return _total_key_group_rows; } private: // only key group will create segment writer @@ -58,6 +60,7 @@ private: private: std::vector<std::unique_ptr<segment_v2::SegmentWriter>> _segment_writers; size_t _cur_writer_idx = 0; + size_t _total_key_group_rows = 0; }; } // namespace doris \ No newline at end of file diff --git a/be/src/vec/olap/vertical_block_reader.cpp b/be/src/vec/olap/vertical_block_reader.cpp index 3023f28408..5aac3034a2 100644 --- a/be/src/vec/olap/vertical_block_reader.cpp +++ b/be/src/vec/olap/vertical_block_reader.cpp @@ -42,6 +42,8 @@ namespace doris::vectorized { using namespace ErrorCode; +uint64_t VerticalBlockReader::nextId = 1; + VerticalBlockReader::~VerticalBlockReader() { for (int i = 0; i < _agg_functions.size(); ++i) { _agg_functions[i]->destroy(_agg_places[i]); @@ -399,7 +401,12 @@ Status VerticalBlockReader::_unique_key_next_block(Block* block, bool* eof) { // _vcollect_iter->next_batch(block) will fill row_source_buffer but delete sign is ignored // we calc delete sign column if it's base compaction and update row_sourece_buffer's agg flag // after we get current block - auto row_source_idx = _row_sources_buffer->buffered_size(); + VLOG_NOTICE << "reader id: " << _id + << ", buffer size: " << _row_sources_buffer->buffered_size(); + uint64_t row_source_idx = _row_sources_buffer->buffered_size(); + uint64_t row_buffer_size_start = row_source_idx; + uint64_t merged_rows_start = _vcollect_iter->merged_rows(); + uint64_t filtered_rows_start = _stats.rows_del_filtered; auto res = _vcollect_iter->next_batch(block); if (UNLIKELY(!res.ok() && !res.is<END_OF_FILE>())) { @@ -412,7 +419,20 @@ Status VerticalBlockReader::_unique_key_next_block(Block* block, bool* eof) { } DCHECK_EQ(_block_row_locations.size(), block->rows()); } - auto block_rows = block->rows(); + + if (_row_sources_buffer->buffered_size() < row_buffer_size_start) { + row_buffer_size_start = 0; + row_source_idx = 0; + } + + size_t merged_rows_in_rs_buffer = 0; + for (uint64_t i = row_buffer_size_start; i < _row_sources_buffer->buffered_size(); i++) { + if (_row_sources_buffer->get_agg_flag(i)) { + merged_rows_in_rs_buffer++; + } + } + + size_t block_rows = block->rows(); if (_filter_delete && block_rows > 0) { int ori_delete_sign_idx = _reader_context.tablet_schema->field_index(DELETE_SIGN); if (ori_delete_sign_idx < 0) { @@ -433,14 +453,23 @@ Status VerticalBlockReader::_unique_key_next_block(Block* block, bool* eof) { reinterpret_cast<ColumnInt8*>(target_columns[delete_sign_idx].get()) ->get_data() .data(); - for (int i = 0; i < block_rows; ++i) { - bool sign = (delete_data[i] == 0); - filter_data[i] = sign; + + int cur_row = 0; + while (cur_row < block_rows) { + if (_row_sources_buffer->get_agg_flag(row_source_idx)) { + row_source_idx++; + continue; + } + bool sign = (delete_data[cur_row] == 0); + filter_data[cur_row] = sign; if (UNLIKELY(!sign)) { _row_sources_buffer->set_agg_flag(row_source_idx, true); } - // skip same rows filtered in vertical_merge_iterator - row_source_idx += _row_sources_buffer->continuous_agg_count(row_source_idx); + cur_row++; + row_source_idx++; + } + while (row_source_idx < _row_sources_buffer->buffered_size()) { + row_source_idx++; } ColumnWithTypeAndName column_with_type_and_name {_delete_filter_column, @@ -451,6 +480,20 @@ Status VerticalBlockReader::_unique_key_next_block(Block* block, bool* eof) { _stats.rows_del_filtered += block_rows - block->rows(); DCHECK(block->try_get_by_name("__DORIS_COMPACTION_FILTER__") == nullptr); } + + size_t filtered_rows_in_rs_buffer = 0; + for (auto i = row_buffer_size_start; i < _row_sources_buffer->buffered_size(); i++) { + if (_row_sources_buffer->get_agg_flag(i)) { + filtered_rows_in_rs_buffer++; + } + } + filtered_rows_in_rs_buffer -= merged_rows_in_rs_buffer; + + auto merged_rows_cur_batch = _vcollect_iter->merged_rows() - merged_rows_start; + auto filtered_rows_cur_batch = _stats.rows_del_filtered - filtered_rows_start; + + DCHECK_EQ(merged_rows_in_rs_buffer, merged_rows_cur_batch); + DCHECK_EQ(filtered_rows_in_rs_buffer, filtered_rows_cur_batch); *eof = (res.is<END_OF_FILE>()); _eof = *eof; return Status::OK(); diff --git a/be/src/vec/olap/vertical_block_reader.h b/be/src/vec/olap/vertical_block_reader.h index 2bc912193b..e945360129 100644 --- a/be/src/vec/olap/vertical_block_reader.h +++ b/be/src/vec/olap/vertical_block_reader.h @@ -49,7 +49,9 @@ class RowSourcesBuffer; class VerticalBlockReader final : public TabletReader { public: VerticalBlockReader(RowSourcesBuffer* row_sources_buffer) - : _row_sources_buffer(row_sources_buffer) {} + : _row_sources_buffer(row_sources_buffer) { + _id = nextId++; + } ~VerticalBlockReader() override; @@ -68,8 +70,11 @@ public: DCHECK(_vcollect_iter); return _vcollect_iter->merged_rows(); } + std::vector<RowLocation> current_block_row_locations() { return _block_row_locations; } + static uint64_t nextId; + private: // Directly read row from rowset and pass to upper caller. No need to do aggregation. // This is usually used for DUPLICATE KEY tables @@ -95,6 +100,7 @@ private: void _update_agg_value(MutableColumns& columns, int begin, int end, bool is_close = true); private: + size_t _id; std::shared_ptr<RowwiseIterator> _vcollect_iter; IteratorRowRef _next_row {{}, -1, false}; diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp b/be/src/vec/olap/vertical_merge_iterator.cpp index 7ace9b457c..27bcc4f8e0 100644 --- a/be/src/vec/olap/vertical_merge_iterator.cpp +++ b/be/src/vec/olap/vertical_merge_iterator.cpp @@ -68,10 +68,15 @@ uint16_t RowSource::data() const { Status RowSourcesBuffer::append(const std::vector<RowSource>& row_sources) { if (_buffer->allocated_bytes() + row_sources.size() * sizeof(UInt16) > config::vertical_compaction_max_row_source_memory_mb * 1024 * 1024) { - // serialize current buffer - RETURN_IF_ERROR(_create_buffer_file()); - RETURN_IF_ERROR(_serialize()); - _reset_buffer(); + if (_buffer->allocated_bytes() - _buffer->size() * sizeof(UInt16) < + row_sources.size() * sizeof(UInt16)) { + VLOG_NOTICE << "RowSourceBuffer is too large, serialize and reset buffer: " + << _buffer->allocated_bytes() << ", total size: " << _total_size; + // serialize current buffer + RETURN_IF_ERROR(_create_buffer_file()); + RETURN_IF_ERROR(_serialize()); + _reset_buffer(); + } } for (const auto& source : row_sources) { _buffer->insert_value(source.data()); @@ -116,6 +121,12 @@ void RowSourcesBuffer::set_agg_flag(uint64_t index, bool agg) { _buffer->get_data()[index] = ori.data(); } +bool RowSourcesBuffer::get_agg_flag(uint64_t index) { + DCHECK(index < _buffer->size()); + RowSource ori(_buffer->get_data()[index]); + return ori.agg_flag(); +} + size_t RowSourcesBuffer::continuous_agg_count(uint64_t index) { size_t result = 1; int start = index + 1; @@ -167,8 +178,9 @@ Status RowSourcesBuffer::_create_buffer_file() { LOG(INFO) << "Vertical compaction row sources buffer path: " << file_path; _fd = mkstemp(file_path.data()); if (_fd < 0) { - LOG(WARNING) << "failed to create tmp file, file_path=" << file_path; - return Status::InternalError("failed to create tmp file"); + LOG(WARNING) << "failed to create tmp file, file_path=" << file_path + << ", err: " << strerror(errno); + return Status::InternalError("failed to create tmp file "); } // file will be released after fd is close unlink(file_path.data()); @@ -638,6 +650,9 @@ Status VerticalMaskMergeIterator::next_row(vectorized::IteratorRowRef* ref) { // Except first row, we call advance first and than get cur row ctx->set_cur_row_ref(ref); ref->is_same = row_source.agg_flag(); + if (ref->is_same) { + _filtered_rows++; + } ctx->set_is_first_row(false); _row_sources_buf->advance(); @@ -646,6 +661,9 @@ Status VerticalMaskMergeIterator::next_row(vectorized::IteratorRowRef* ref) { RETURN_IF_ERROR(ctx->advance()); ctx->set_cur_row_ref(ref); ref->is_same = row_source.agg_flag(); + if (ref->is_same) { + _filtered_rows++; + } _row_sources_buf->advance(); return Status::OK(); @@ -679,6 +697,7 @@ Status VerticalMaskMergeIterator::unique_key_next_row(vectorized::IteratorRowRef ctx->set_cur_row_ref(ref); return Status::OK(); } + _filtered_rows++; st = _row_sources_buf->has_remaining(); } diff --git a/be/src/vec/olap/vertical_merge_iterator.h b/be/src/vec/olap/vertical_merge_iterator.h index a5443c72ae..b587b6f2cd 100644 --- a/be/src/vec/olap/vertical_merge_iterator.h +++ b/be/src/vec/olap/vertical_merge_iterator.h @@ -113,6 +113,7 @@ public: uint64_t total_size() const { return _total_size; } uint64_t buffered_size() { return _buffer->size(); } void set_agg_flag(uint64_t index, bool agg); + bool get_agg_flag(uint64_t index); Status has_remaining(); @@ -374,6 +375,8 @@ public: Status unique_key_next_row(IteratorRowRef* ref) override; + uint64_t merged_rows() const override { return _filtered_rows; } + private: int _get_size(Block* block) { return block->rows(); } @@ -388,6 +391,7 @@ private: const Schema* _schema = nullptr; int _block_row_max = 0; + size_t _filtered_rows = 0; RowSourcesBuffer* _row_sources_buf; StorageReadOptions _opts; }; diff --git a/be/test/vec/olap/vertical_compaction_test.cpp b/be/test/vec/olap/vertical_compaction_test.cpp index 7446c4ffd5..14a202845a 100644 --- a/be/test/vec/olap/vertical_compaction_test.cpp +++ b/be/test/vec/olap/vertical_compaction_test.cpp @@ -78,6 +78,7 @@ static StorageEngine* k_engine = nullptr; class VerticalCompactionTest : public ::testing::Test { protected: void SetUp() override { + config::vertical_compaction_max_row_source_memory_mb = 1; char buffer[MAX_PATH_LEN]; EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr); absolute_dir = std::string(buffer) + kTestDir; @@ -87,11 +88,15 @@ protected: ->create_directory(absolute_dir + "/tablet_path") .ok()); + _data_dir = new DataDir(absolute_dir, 100000000); + _data_dir->init(); + doris::EngineOptions options; k_engine = new StorageEngine(options); StorageEngine::_s_instance = k_engine; } void TearDown() override { + SAFE_DELETE(_data_dir); EXPECT_TRUE(io::global_local_filesystem()->delete_directory(absolute_dir).ok()); if (k_engine != nullptr) { k_engine->stop(); @@ -334,8 +339,14 @@ protected: UniqueId(1, 2), TTabletType::TABLET_TYPE_DISK, TCompressionType::LZ4F, 0, enable_unique_key_merge_on_write)); - TabletSharedPtr tablet(new Tablet(tablet_meta, nullptr)); + TabletSharedPtr tablet(new Tablet(tablet_meta, _data_dir)); tablet->init(); + bool exists = false; + auto res = io::global_local_filesystem()->exists(tablet->tablet_path(), &exists); + EXPECT_TRUE(res.ok() && !exists); + res = io::global_local_filesystem()->create_directory(tablet->tablet_path()); + EXPECT_TRUE(res.ok()); + if (has_delete_handler) { // delete data with key < 1000 std::vector<TCondition> conditions; @@ -392,6 +403,7 @@ protected: private: const std::string kTestDir = "/ut_dir/vertical_compaction_test"; string absolute_dir; + DataDir* _data_dir = nullptr; }; TEST_F(VerticalCompactionTest, TestRowSourcesBuffer) { @@ -454,7 +466,7 @@ TEST_F(VerticalCompactionTest, TestRowSourcesBuffer) { TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) { auto num_input_rowset = 2; auto num_segments = 2; - auto rows_per_segment = 100; + auto rows_per_segment = 2 * 100 * 1024; SegmentsOverlapPB overlap = NONOVERLAPPING; std::vector<std::vector<std::vector<std::tuple<int64_t, int64_t>>>> input_data; generate_input_data(num_input_rowset, num_segments, rows_per_segment, overlap, input_data); @@ -561,7 +573,7 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) { TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMerge) { auto num_input_rowset = 2; auto num_segments = 2; - auto rows_per_segment = 100; + auto rows_per_segment = 2 * 100 * 1024; SegmentsOverlapPB overlap = NONOVERLAPPING; std::vector<std::vector<std::vector<std::tuple<int64_t, int64_t>>>> input_data; generate_input_data(num_input_rowset, num_segments, rows_per_segment, overlap, input_data); @@ -669,7 +681,7 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMerge) { TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) { auto num_input_rowset = 2; auto num_segments = 2; - auto rows_per_segment = 100; + auto rows_per_segment = 2 * 100 * 1024; SegmentsOverlapPB overlap = NONOVERLAPPING; std::vector<std::vector<std::vector<std::tuple<int64_t, int64_t>>>> input_data; generate_input_data(num_input_rowset, num_segments, rows_per_segment, overlap, input_data); @@ -720,7 +732,7 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) { RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, tablet_schema, - input_rs_readers, output_rs_writer.get(), 100, &stats); + input_rs_readers, output_rs_writer.get(), 10000, &stats); EXPECT_TRUE(s.ok()); RowsetSharedPtr out_rowset = output_rs_writer->build(); @@ -769,7 +781,7 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) { TEST_F(VerticalCompactionTest, TestDupKeyVerticalMergeWithDelete) { auto num_input_rowset = 2; auto num_segments = 2; - auto rows_per_segment = 100; + auto rows_per_segment = 2 * 100 * 1024; SegmentsOverlapPB overlap = NONOVERLAPPING; std::vector<std::vector<std::vector<std::tuple<int64_t, int64_t>>>> input_data; generate_input_data(num_input_rowset, num_segments, rows_per_segment, overlap, input_data); @@ -863,7 +875,7 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMergeWithDelete) { TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMergeWithDelete) { auto num_input_rowset = 2; auto num_segments = 2; - auto rows_per_segment = 100; + auto rows_per_segment = 2 * 100 * 1024; SegmentsOverlapPB overlap = NONOVERLAPPING; std::vector<std::vector<std::vector<std::tuple<int64_t, int64_t>>>> input_data; generate_input_data(num_input_rowset, num_segments, rows_per_segment, overlap, input_data); @@ -957,7 +969,7 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMergeWithDelete) { TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) { auto num_input_rowset = 2; auto num_segments = 2; - auto rows_per_segment = 100; + auto rows_per_segment = 2 * 100 * 1024; SegmentsOverlapPB overlap = NONOVERLAPPING; std::vector<std::vector<std::vector<std::tuple<int64_t, int64_t>>>> input_data; generate_input_data(num_input_rowset, num_segments, rows_per_segment, overlap, input_data); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
