This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c1816d0bc695bcda500b4e15ea5c9c48e4275fef
Author: huanghaibin <[email protected]>
AuthorDate: Fri Jul 28 16:41:00 2023 +0800

    (vertical compaction) fix vertical compaction core (#22275)
    
    * (vertical compaction) fix vertical compaction core
    co-author:@zhannngchen
---
 be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 25 ++++++++--
 be/src/olap/rowset/vertical_beta_rowset_writer.h   |  9 ++--
 be/src/vec/olap/vertical_block_reader.cpp          | 57 +++++++++++++++++++---
 be/src/vec/olap/vertical_block_reader.h            |  8 ++-
 be/src/vec/olap/vertical_merge_iterator.cpp        | 31 +++++++++---
 be/src/vec/olap/vertical_merge_iterator.h          |  4 ++
 be/test/vec/olap/vertical_compaction_test.cpp      | 28 ++++++++---
 7 files changed, 132 insertions(+), 30 deletions(-)

diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp 
b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
index e7639a516f..4ae2ee017f 100644
--- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
@@ -99,19 +99,33 @@ Status VerticalBetaRowsetWriter::add_columns(const 
vectorized::Block* block,
         uint32_t num_rows_written = 
_segment_writers[_cur_writer_idx]->num_rows_written();
         VLOG_NOTICE << "num_rows_written: " << num_rows_written
                     << ", _cur_writer_idx: " << _cur_writer_idx;
+        uint32_t num_rows_key_group = 
_segment_writers[_cur_writer_idx]->row_count();
         // init if it's first value column write in current segment
         if (_cur_writer_idx == 0 && num_rows_written == 0) {
             VLOG_NOTICE << "init first value column segment writer";
             RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->init(col_ids, 
is_key));
         }
-        if (num_rows_written > max_rows_per_segment) {
+        // when splitting segment, need to make rows align between key columns 
and value columns
+        size_t start_offset = 0, limit = num_rows;
+        if (num_rows_written + num_rows >= num_rows_key_group &&
+            _cur_writer_idx < _segment_writers.size() - 1) {
+            RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(
+                    block, 0, num_rows_key_group - num_rows_written));
             
RETURN_IF_ERROR(_flush_columns(&_segment_writers[_cur_writer_idx]));
-            // switch to next writer
+            start_offset = num_rows_key_group - num_rows_written;
+            limit = num_rows - start_offset;
             ++_cur_writer_idx;
-            VLOG_NOTICE << "init next value column segment writer: " << 
_cur_writer_idx;
+            // switch to next writer
             RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->init(col_ids, 
is_key));
+            num_rows_written = 0;
+            num_rows_key_group = 
_segment_writers[_cur_writer_idx]->row_count();
+        }
+        if (limit > 0) {
+            RETURN_IF_ERROR(
+                    _segment_writers[_cur_writer_idx]->append_block(block, 
start_offset, limit));
+            DCHECK(_segment_writers[_cur_writer_idx]->num_rows_written() <=
+                   _segment_writers[_cur_writer_idx]->row_count());
         }
-        RETURN_IF_ERROR(_segment_writers[_cur_writer_idx]->append_block(block, 
0, num_rows));
     }
     if (is_key) {
         _num_rows_written += num_rows;
@@ -126,6 +140,7 @@ Status VerticalBetaRowsetWriter::_flush_columns(
     RETURN_IF_ERROR((*segment_writer)->finalize_columns_data());
     RETURN_IF_ERROR((*segment_writer)->finalize_columns_index(&index_size));
     if (is_key) {
+        _total_key_group_rows += (*segment_writer)->row_count();
         // record segment key bound
         KeyBoundsPB key_bounds;
         Slice min_key = (*segment_writer)->min_encoded_key();
@@ -147,7 +162,7 @@ Status VerticalBetaRowsetWriter::flush_columns(bool is_key) 
{
         return Status::OK();
     }
 
-    DCHECK(_segment_writers[_cur_writer_idx]);
+    DCHECK(_cur_writer_idx < _segment_writers.size() && 
_segment_writers[_cur_writer_idx]);
     RETURN_IF_ERROR(_flush_columns(&_segment_writers[_cur_writer_idx], 
is_key));
     _cur_writer_idx = 0;
     return Status::OK();
diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.h 
b/be/src/olap/rowset/vertical_beta_rowset_writer.h
index 71b132b9b4..8e318686db 100644
--- a/be/src/olap/rowset/vertical_beta_rowset_writer.h
+++ b/be/src/olap/rowset/vertical_beta_rowset_writer.h
@@ -39,13 +39,15 @@ public:
     ~VerticalBetaRowsetWriter();
 
     Status add_columns(const vectorized::Block* block, const 
std::vector<uint32_t>& col_ids,
-                       bool is_key, uint32_t max_rows_per_segment);
+                       bool is_key, uint32_t max_rows_per_segment) override;
 
     // flush last segment's column
-    Status flush_columns(bool is_key);
+    Status flush_columns(bool is_key) override;
 
     // flush when all column finished, flush column footer
-    Status final_flush();
+    Status final_flush() override;
+
+    int64_t num_rows() const override { return _total_key_group_rows; }
 
 private:
     // only key group will create segment writer
@@ -58,6 +60,7 @@ private:
 private:
     std::vector<std::unique_ptr<segment_v2::SegmentWriter>> _segment_writers;
     size_t _cur_writer_idx = 0;
+    size_t _total_key_group_rows = 0;
 };
 
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/olap/vertical_block_reader.cpp 
b/be/src/vec/olap/vertical_block_reader.cpp
index 3023f28408..5aac3034a2 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -42,6 +42,8 @@
 namespace doris::vectorized {
 using namespace ErrorCode;
 
+uint64_t VerticalBlockReader::nextId = 1;
+
 VerticalBlockReader::~VerticalBlockReader() {
     for (int i = 0; i < _agg_functions.size(); ++i) {
         _agg_functions[i]->destroy(_agg_places[i]);
@@ -399,7 +401,12 @@ Status VerticalBlockReader::_unique_key_next_block(Block* 
block, bool* eof) {
         // _vcollect_iter->next_batch(block) will fill row_source_buffer but 
delete sign is ignored
         // we calc delete sign column if it's base compaction and update 
row_sourece_buffer's agg flag
         // after we get current block
-        auto row_source_idx = _row_sources_buffer->buffered_size();
+        VLOG_NOTICE << "reader id: " << _id
+                    << ", buffer size: " << 
_row_sources_buffer->buffered_size();
+        uint64_t row_source_idx = _row_sources_buffer->buffered_size();
+        uint64_t row_buffer_size_start = row_source_idx;
+        uint64_t merged_rows_start = _vcollect_iter->merged_rows();
+        uint64_t filtered_rows_start = _stats.rows_del_filtered;
 
         auto res = _vcollect_iter->next_batch(block);
         if (UNLIKELY(!res.ok() && !res.is<END_OF_FILE>())) {
@@ -412,7 +419,20 @@ Status VerticalBlockReader::_unique_key_next_block(Block* 
block, bool* eof) {
             }
             DCHECK_EQ(_block_row_locations.size(), block->rows());
         }
-        auto block_rows = block->rows();
+
+        if (_row_sources_buffer->buffered_size() < row_buffer_size_start) {
+            row_buffer_size_start = 0;
+            row_source_idx = 0;
+        }
+
+        size_t merged_rows_in_rs_buffer = 0;
+        for (uint64_t i = row_buffer_size_start; i < 
_row_sources_buffer->buffered_size(); i++) {
+            if (_row_sources_buffer->get_agg_flag(i)) {
+                merged_rows_in_rs_buffer++;
+            }
+        }
+
+        size_t block_rows = block->rows();
         if (_filter_delete && block_rows > 0) {
             int ori_delete_sign_idx = 
_reader_context.tablet_schema->field_index(DELETE_SIGN);
             if (ori_delete_sign_idx < 0) {
@@ -433,14 +453,23 @@ Status VerticalBlockReader::_unique_key_next_block(Block* 
block, bool* eof) {
                     
reinterpret_cast<ColumnInt8*>(target_columns[delete_sign_idx].get())
                             ->get_data()
                             .data();
-            for (int i = 0; i < block_rows; ++i) {
-                bool sign = (delete_data[i] == 0);
-                filter_data[i] = sign;
+
+            int cur_row = 0;
+            while (cur_row < block_rows) {
+                if (_row_sources_buffer->get_agg_flag(row_source_idx)) {
+                    row_source_idx++;
+                    continue;
+                }
+                bool sign = (delete_data[cur_row] == 0);
+                filter_data[cur_row] = sign;
                 if (UNLIKELY(!sign)) {
                     _row_sources_buffer->set_agg_flag(row_source_idx, true);
                 }
-                // skip same rows filtered in vertical_merge_iterator
-                row_source_idx += 
_row_sources_buffer->continuous_agg_count(row_source_idx);
+                cur_row++;
+                row_source_idx++;
+            }
+            while (row_source_idx < _row_sources_buffer->buffered_size()) {
+                row_source_idx++;
             }
 
             ColumnWithTypeAndName column_with_type_and_name 
{_delete_filter_column,
@@ -451,6 +480,20 @@ Status VerticalBlockReader::_unique_key_next_block(Block* 
block, bool* eof) {
             _stats.rows_del_filtered += block_rows - block->rows();
             DCHECK(block->try_get_by_name("__DORIS_COMPACTION_FILTER__") == 
nullptr);
         }
+
+        size_t filtered_rows_in_rs_buffer = 0;
+        for (auto i = row_buffer_size_start; i < 
_row_sources_buffer->buffered_size(); i++) {
+            if (_row_sources_buffer->get_agg_flag(i)) {
+                filtered_rows_in_rs_buffer++;
+            }
+        }
+        filtered_rows_in_rs_buffer -= merged_rows_in_rs_buffer;
+
+        auto merged_rows_cur_batch = _vcollect_iter->merged_rows() - 
merged_rows_start;
+        auto filtered_rows_cur_batch = _stats.rows_del_filtered - 
filtered_rows_start;
+
+        DCHECK_EQ(merged_rows_in_rs_buffer, merged_rows_cur_batch);
+        DCHECK_EQ(filtered_rows_in_rs_buffer, filtered_rows_cur_batch);
         *eof = (res.is<END_OF_FILE>());
         _eof = *eof;
         return Status::OK();
diff --git a/be/src/vec/olap/vertical_block_reader.h 
b/be/src/vec/olap/vertical_block_reader.h
index 2bc912193b..e945360129 100644
--- a/be/src/vec/olap/vertical_block_reader.h
+++ b/be/src/vec/olap/vertical_block_reader.h
@@ -49,7 +49,9 @@ class RowSourcesBuffer;
 class VerticalBlockReader final : public TabletReader {
 public:
     VerticalBlockReader(RowSourcesBuffer* row_sources_buffer)
-            : _row_sources_buffer(row_sources_buffer) {}
+            : _row_sources_buffer(row_sources_buffer) {
+        _id = nextId++;
+    }
 
     ~VerticalBlockReader() override;
 
@@ -68,8 +70,11 @@ public:
         DCHECK(_vcollect_iter);
         return _vcollect_iter->merged_rows();
     }
+
     std::vector<RowLocation> current_block_row_locations() { return 
_block_row_locations; }
 
+    static uint64_t nextId;
+
 private:
     // Directly read row from rowset and pass to upper caller. No need to do 
aggregation.
     // This is usually used for DUPLICATE KEY tables
@@ -95,6 +100,7 @@ private:
     void _update_agg_value(MutableColumns& columns, int begin, int end, bool 
is_close = true);
 
 private:
+    size_t _id;
     std::shared_ptr<RowwiseIterator> _vcollect_iter;
     IteratorRowRef _next_row {{}, -1, false};
 
diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp 
b/be/src/vec/olap/vertical_merge_iterator.cpp
index 7ace9b457c..27bcc4f8e0 100644
--- a/be/src/vec/olap/vertical_merge_iterator.cpp
+++ b/be/src/vec/olap/vertical_merge_iterator.cpp
@@ -68,10 +68,15 @@ uint16_t RowSource::data() const {
 Status RowSourcesBuffer::append(const std::vector<RowSource>& row_sources) {
     if (_buffer->allocated_bytes() + row_sources.size() * sizeof(UInt16) >
         config::vertical_compaction_max_row_source_memory_mb * 1024 * 1024) {
-        // serialize current buffer
-        RETURN_IF_ERROR(_create_buffer_file());
-        RETURN_IF_ERROR(_serialize());
-        _reset_buffer();
+        if (_buffer->allocated_bytes() - _buffer->size() * sizeof(UInt16) <
+            row_sources.size() * sizeof(UInt16)) {
+            VLOG_NOTICE << "RowSourceBuffer is too large, serialize and reset 
buffer: "
+                        << _buffer->allocated_bytes() << ", total size: " << 
_total_size;
+            // serialize current buffer
+            RETURN_IF_ERROR(_create_buffer_file());
+            RETURN_IF_ERROR(_serialize());
+            _reset_buffer();
+        }
     }
     for (const auto& source : row_sources) {
         _buffer->insert_value(source.data());
@@ -116,6 +121,12 @@ void RowSourcesBuffer::set_agg_flag(uint64_t index, bool 
agg) {
     _buffer->get_data()[index] = ori.data();
 }
 
+bool RowSourcesBuffer::get_agg_flag(uint64_t index) {
+    DCHECK(index < _buffer->size());
+    RowSource ori(_buffer->get_data()[index]);
+    return ori.agg_flag();
+}
+
 size_t RowSourcesBuffer::continuous_agg_count(uint64_t index) {
     size_t result = 1;
     int start = index + 1;
@@ -167,8 +178,9 @@ Status RowSourcesBuffer::_create_buffer_file() {
     LOG(INFO) << "Vertical compaction row sources buffer path: " << file_path;
     _fd = mkstemp(file_path.data());
     if (_fd < 0) {
-        LOG(WARNING) << "failed to create tmp file, file_path=" << file_path;
-        return Status::InternalError("failed to create tmp file");
+        LOG(WARNING) << "failed to create tmp file, file_path=" << file_path
+                     << ", err: " << strerror(errno);
+        return Status::InternalError("failed to create tmp file ");
     }
     // file will be released after fd is close
     unlink(file_path.data());
@@ -638,6 +650,9 @@ Status 
VerticalMaskMergeIterator::next_row(vectorized::IteratorRowRef* ref) {
         // Except first row, we call advance first and than get cur row
         ctx->set_cur_row_ref(ref);
         ref->is_same = row_source.agg_flag();
+        if (ref->is_same) {
+            _filtered_rows++;
+        }
 
         ctx->set_is_first_row(false);
         _row_sources_buf->advance();
@@ -646,6 +661,9 @@ Status 
VerticalMaskMergeIterator::next_row(vectorized::IteratorRowRef* ref) {
     RETURN_IF_ERROR(ctx->advance());
     ctx->set_cur_row_ref(ref);
     ref->is_same = row_source.agg_flag();
+    if (ref->is_same) {
+        _filtered_rows++;
+    }
 
     _row_sources_buf->advance();
     return Status::OK();
@@ -679,6 +697,7 @@ Status 
VerticalMaskMergeIterator::unique_key_next_row(vectorized::IteratorRowRef
             ctx->set_cur_row_ref(ref);
             return Status::OK();
         }
+        _filtered_rows++;
         st = _row_sources_buf->has_remaining();
     }
 
diff --git a/be/src/vec/olap/vertical_merge_iterator.h 
b/be/src/vec/olap/vertical_merge_iterator.h
index a5443c72ae..b587b6f2cd 100644
--- a/be/src/vec/olap/vertical_merge_iterator.h
+++ b/be/src/vec/olap/vertical_merge_iterator.h
@@ -113,6 +113,7 @@ public:
     uint64_t total_size() const { return _total_size; }
     uint64_t buffered_size() { return _buffer->size(); }
     void set_agg_flag(uint64_t index, bool agg);
+    bool get_agg_flag(uint64_t index);
 
     Status has_remaining();
 
@@ -374,6 +375,8 @@ public:
 
     Status unique_key_next_row(IteratorRowRef* ref) override;
 
+    uint64_t merged_rows() const override { return _filtered_rows; }
+
 private:
     int _get_size(Block* block) { return block->rows(); }
 
@@ -388,6 +391,7 @@ private:
     const Schema* _schema = nullptr;
 
     int _block_row_max = 0;
+    size_t _filtered_rows = 0;
     RowSourcesBuffer* _row_sources_buf;
     StorageReadOptions _opts;
 };
diff --git a/be/test/vec/olap/vertical_compaction_test.cpp 
b/be/test/vec/olap/vertical_compaction_test.cpp
index 7446c4ffd5..14a202845a 100644
--- a/be/test/vec/olap/vertical_compaction_test.cpp
+++ b/be/test/vec/olap/vertical_compaction_test.cpp
@@ -78,6 +78,7 @@ static StorageEngine* k_engine = nullptr;
 class VerticalCompactionTest : public ::testing::Test {
 protected:
     void SetUp() override {
+        config::vertical_compaction_max_row_source_memory_mb = 1;
         char buffer[MAX_PATH_LEN];
         EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
         absolute_dir = std::string(buffer) + kTestDir;
@@ -87,11 +88,15 @@ protected:
                             ->create_directory(absolute_dir + "/tablet_path")
                             .ok());
 
+        _data_dir = new DataDir(absolute_dir, 100000000);
+        _data_dir->init();
+
         doris::EngineOptions options;
         k_engine = new StorageEngine(options);
         StorageEngine::_s_instance = k_engine;
     }
     void TearDown() override {
+        SAFE_DELETE(_data_dir);
         
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(absolute_dir).ok());
         if (k_engine != nullptr) {
             k_engine->stop();
@@ -334,8 +339,14 @@ protected:
                                UniqueId(1, 2), TTabletType::TABLET_TYPE_DISK,
                                TCompressionType::LZ4F, 0, 
enable_unique_key_merge_on_write));
 
-        TabletSharedPtr tablet(new Tablet(tablet_meta, nullptr));
+        TabletSharedPtr tablet(new Tablet(tablet_meta, _data_dir));
         tablet->init();
+        bool exists = false;
+        auto res = 
io::global_local_filesystem()->exists(tablet->tablet_path(), &exists);
+        EXPECT_TRUE(res.ok() && !exists);
+        res = 
io::global_local_filesystem()->create_directory(tablet->tablet_path());
+        EXPECT_TRUE(res.ok());
+
         if (has_delete_handler) {
             // delete data with key < 1000
             std::vector<TCondition> conditions;
@@ -392,6 +403,7 @@ protected:
 private:
     const std::string kTestDir = "/ut_dir/vertical_compaction_test";
     string absolute_dir;
+    DataDir* _data_dir = nullptr;
 };
 
 TEST_F(VerticalCompactionTest, TestRowSourcesBuffer) {
@@ -454,7 +466,7 @@ TEST_F(VerticalCompactionTest, TestRowSourcesBuffer) {
 TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) {
     auto num_input_rowset = 2;
     auto num_segments = 2;
-    auto rows_per_segment = 100;
+    auto rows_per_segment = 2 * 100 * 1024;
     SegmentsOverlapPB overlap = NONOVERLAPPING;
     std::vector<std::vector<std::vector<std::tuple<int64_t, int64_t>>>> 
input_data;
     generate_input_data(num_input_rowset, num_segments, rows_per_segment, 
overlap, input_data);
@@ -561,7 +573,7 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) {
 TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMerge) {
     auto num_input_rowset = 2;
     auto num_segments = 2;
-    auto rows_per_segment = 100;
+    auto rows_per_segment = 2 * 100 * 1024;
     SegmentsOverlapPB overlap = NONOVERLAPPING;
     std::vector<std::vector<std::vector<std::tuple<int64_t, int64_t>>>> 
input_data;
     generate_input_data(num_input_rowset, num_segments, rows_per_segment, 
overlap, input_data);
@@ -669,7 +681,7 @@ TEST_F(VerticalCompactionTest, 
TestDupWithoutKeyVerticalMerge) {
 TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) {
     auto num_input_rowset = 2;
     auto num_segments = 2;
-    auto rows_per_segment = 100;
+    auto rows_per_segment = 2 * 100 * 1024;
     SegmentsOverlapPB overlap = NONOVERLAPPING;
     std::vector<std::vector<std::vector<std::tuple<int64_t, int64_t>>>> 
input_data;
     generate_input_data(num_input_rowset, num_segments, rows_per_segment, 
overlap, input_data);
@@ -720,7 +732,7 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) {
     RowIdConversion rowid_conversion;
     stats.rowid_conversion = &rowid_conversion;
     s = Merger::vertical_merge_rowsets(tablet, 
ReaderType::READER_BASE_COMPACTION, tablet_schema,
-                                       input_rs_readers, 
output_rs_writer.get(), 100, &stats);
+                                       input_rs_readers, 
output_rs_writer.get(), 10000, &stats);
     EXPECT_TRUE(s.ok());
     RowsetSharedPtr out_rowset = output_rs_writer->build();
 
@@ -769,7 +781,7 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) {
 TEST_F(VerticalCompactionTest, TestDupKeyVerticalMergeWithDelete) {
     auto num_input_rowset = 2;
     auto num_segments = 2;
-    auto rows_per_segment = 100;
+    auto rows_per_segment = 2 * 100 * 1024;
     SegmentsOverlapPB overlap = NONOVERLAPPING;
     std::vector<std::vector<std::vector<std::tuple<int64_t, int64_t>>>> 
input_data;
     generate_input_data(num_input_rowset, num_segments, rows_per_segment, 
overlap, input_data);
@@ -863,7 +875,7 @@ TEST_F(VerticalCompactionTest, 
TestDupKeyVerticalMergeWithDelete) {
 TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMergeWithDelete) {
     auto num_input_rowset = 2;
     auto num_segments = 2;
-    auto rows_per_segment = 100;
+    auto rows_per_segment = 2 * 100 * 1024;
     SegmentsOverlapPB overlap = NONOVERLAPPING;
     std::vector<std::vector<std::vector<std::tuple<int64_t, int64_t>>>> 
input_data;
     generate_input_data(num_input_rowset, num_segments, rows_per_segment, 
overlap, input_data);
@@ -957,7 +969,7 @@ TEST_F(VerticalCompactionTest, 
TestDupWithoutKeyVerticalMergeWithDelete) {
 TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) {
     auto num_input_rowset = 2;
     auto num_segments = 2;
-    auto rows_per_segment = 100;
+    auto rows_per_segment = 2 * 100 * 1024;
     SegmentsOverlapPB overlap = NONOVERLAPPING;
     std::vector<std::vector<std::vector<std::tuple<int64_t, int64_t>>>> 
input_data;
     generate_input_data(num_input_rowset, num_segments, rows_per_segment, 
overlap, input_data);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to