This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 0e694f19dbb [fix](merge-on-write) segcompaction should process delete
bitmap if necessary (#38369) (#39707)
0e694f19dbb is described below
commit 0e694f19dbbde6f75dea1119da8a13e5b9c18161
Author: zhannngchen <[email protected]>
AuthorDate: Thu Aug 22 00:42:56 2024 +0800
[fix](merge-on-write) segcompaction should process delete bitmap if
necessary (#38369) (#39707)
## Proposed changes
Issue Number: close #xxx
cherry-pick #38369 and #38800
---
be/src/olap/merger.cpp | 19 +-
be/src/olap/merger.h | 6 +-
be/src/olap/rowset/beta_rowset_writer.cpp | 35 +-
be/src/olap/rowset/beta_rowset_writer.h | 3 +-
be/src/olap/rowset/segcompaction.cpp | 72 ++-
be/src/olap/rowset/segcompaction.h | 13 +
be/src/olap/simple_rowid_conversion.h | 84 +++
be/src/vec/olap/vertical_block_reader.cpp | 1 +
be/test/common/status_test.cpp | 2 -
be/test/olap/segcompaction_mow_test.cpp | 908 ++++++++++++++++++++++++++++++
10 files changed, 1124 insertions(+), 19 deletions(-)
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index b2e789b5b5a..216a7911067 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -307,16 +307,12 @@ Status Merger::vertical_compact_one_group(
}
// for segcompaction
-Status Merger::vertical_compact_one_group(TabletSharedPtr tablet, ReaderType
reader_type,
- TabletSchemaSPtr tablet_schema, bool
is_key,
- const std::vector<uint32_t>&
column_group,
- vectorized::RowSourcesBuffer*
row_source_buf,
- vectorized::VerticalBlockReader&
src_block_reader,
- segment_v2::SegmentWriter&
dst_segment_writer,
- int64_t max_rows_per_segment,
Statistics* stats_output,
- uint64_t* index_size, KeyBoundsPB&
key_bounds) {
- // build tablet reader
- VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" <<
max_rows_per_segment;
+Status Merger::vertical_compact_one_group(
+ TabletSharedPtr tablet, ReaderType reader_type, TabletSchemaSPtr
tablet_schema, bool is_key,
+ const std::vector<uint32_t>& column_group,
vectorized::RowSourcesBuffer* row_source_buf,
+ vectorized::VerticalBlockReader& src_block_reader,
+ segment_v2::SegmentWriter& dst_segment_writer, Statistics*
stats_output,
+ uint64_t* index_size, KeyBoundsPB& key_bounds, SimpleRowIdConversion*
rowid_conversion) {
// TODO: record_rowids
vectorized::Block block = tablet_schema->create_block(column_group);
size_t output_rows = 0;
@@ -333,6 +329,9 @@ Status Merger::vertical_compact_one_group(TabletSharedPtr
tablet, ReaderType rea
"failed to write block when merging
rowsets of tablet " +
std::to_string(tablet->tablet_id()));
+ if (is_key && rowid_conversion != nullptr) {
+
rowid_conversion->add(src_block_reader.current_block_row_locations());
+ }
output_rows += block.rows();
block.clear_column_data();
}
diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h
index 49ca1e5227f..e5d35001eea 100644
--- a/be/src/olap/merger.h
+++ b/be/src/olap/merger.h
@@ -25,6 +25,7 @@
#include "io/io_common.h"
#include "olap/iterators.h"
#include "olap/rowset/rowset_reader.h"
+#include "olap/simple_rowid_conversion.h"
#include "olap/tablet.h"
#include "olap/tablet_schema.h"
@@ -86,8 +87,9 @@ public:
vectorized::RowSourcesBuffer*
row_source_buf,
vectorized::VerticalBlockReader&
src_block_reader,
segment_v2::SegmentWriter&
dst_segment_writer,
- int64_t max_rows_per_segment,
Statistics* stats_output,
- uint64_t* index_size,
KeyBoundsPB& key_bounds);
+ Statistics* stats_output,
uint64_t* index_size,
+ KeyBoundsPB& key_bounds,
+ SimpleRowIdConversion*
rowid_conversion);
// for mow with cluster key table, the key group also contains cluster key
columns.
// the `key_group_cluster_key_idxes` marks the positions of cluster key
columns in key group.
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 101571f0256..7824da38e1f 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -245,7 +245,12 @@ Status
BetaRowsetWriter::_find_longest_consecutive_small_segment(
if (is_large_segment) {
if (segid == _segcompacted_point) {
// skip large segments at the front
+ auto dst_seg_id = _num_segcompacted.load();
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
+ if (_segcompaction_worker->need_convert_delete_bitmap()) {
+ _segcompaction_worker->convert_segment_delete_bitmap(
+ _context.mow_context->delete_bitmap, segid,
dst_seg_id);
+ }
continue;
} else {
// stop because we need consecutive segments
@@ -270,7 +275,13 @@ Status
BetaRowsetWriter::_find_longest_consecutive_small_segment(
}
if (s == 1) { // poor bachelor, let it go
VLOG_DEBUG << "only one candidate segment";
+ auto src_seg_id = _segcompacted_point.load();
+ auto dst_seg_id = _num_segcompacted.load();
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
+ if (_segcompaction_worker->need_convert_delete_bitmap()) {
+ _segcompaction_worker->convert_segment_delete_bitmap(
+ _context.mow_context->delete_bitmap, src_seg_id,
dst_seg_id);
+ }
segments->clear();
return Status::OK();
}
@@ -454,7 +465,7 @@ Status
BetaRowsetWriter::_segcompaction_rename_last_segments() {
"code: {}",
_segcompaction_status.load());
}
- if (!_is_segcompacted() || _segcompacted_point == _num_segment) {
+ if (!is_segcompacted() || _segcompacted_point == _num_segment) {
// no need if never segcompact before or all segcompacted
return Status::OK();
}
@@ -462,7 +473,12 @@ Status
BetaRowsetWriter::_segcompaction_rename_last_segments() {
// so that transaction can be committed ASAP
VLOG_DEBUG << "segcompaction last few segments";
for (int32_t segid = _segcompacted_point; segid < _num_segment; segid++) {
+ auto dst_segid = _num_segcompacted.load();
RETURN_IF_ERROR(_rename_compacted_segment_plain(_segcompacted_point++));
+ if (_segcompaction_worker->need_convert_delete_bitmap()) {
+ _segcompaction_worker->convert_segment_delete_bitmap(
+ _context.mow_context->delete_bitmap, segid, dst_segid);
+ }
}
return Status::OK();
}
@@ -584,6 +600,21 @@ Status BetaRowsetWriter::_close_file_writers() {
RETURN_NOT_OK_STATUS_WITH_WARN(_segcompaction_worker->get_file_writer()->close(),
"close segment compaction worker
failed");
}
+ // process delete bitmap for mow table
+ if (is_segcompacted() &&
_segcompaction_worker->need_convert_delete_bitmap()) {
+ auto converted_delete_bitmap =
_segcompaction_worker->get_converted_delete_bitmap();
+ // which means the segment compaction is triggerd
+ if (converted_delete_bitmap != nullptr) {
+ RowsetIdUnorderedSet rowsetids;
+ rowsetids.insert(rowset_id());
+ auto tablet = static_cast<Tablet*>(_context.tablet.get());
+
tablet->add_sentinel_mark_to_delete_bitmap(converted_delete_bitmap.get(),
+ rowsetids);
+ context().mow_context->delete_bitmap->remove({rowset_id(), 0,
0},
+ {rowset_id(),
UINT32_MAX, INT64_MAX});
+
context().mow_context->delete_bitmap->merge(*converted_delete_bitmap);
+ }
+ }
}
return Status::OK();
}
@@ -616,7 +647,7 @@ int64_t BaseBetaRowsetWriter::_num_seg() const {
}
int64_t BetaRowsetWriter::_num_seg() const {
- return _is_segcompacted() ? _num_segcompacted : _num_segment;
+ return is_segcompacted() ? _num_segcompacted : _num_segment;
}
// update tablet schema when meet variant columns, before commit_txn
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index 3e285c7e508..b897d96c9b2 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -205,6 +205,8 @@ public:
std::unique_ptr<segment_v2::SegmentWriter>* writer, uint64_t
index_size,
KeyBoundsPB& key_bounds);
+ bool is_segcompacted() const { return _num_segcompacted > 0; }
+
private:
Status _generate_delete_bitmap(int32_t segment_id) override;
@@ -220,7 +222,6 @@ private:
Status _segcompaction_rename_last_segments();
Status _load_noncompacted_segment(segment_v2::SegmentSharedPtr& segment,
int32_t segment_id);
Status
_find_longest_consecutive_small_segment(SegCompactionCandidatesSharedPtr&
segments);
- bool _is_segcompacted() const { return _num_segcompacted > 0; }
bool _check_and_set_is_doing_segcompaction();
Status _rename_compacted_segments(int64_t begin, int64_t end);
Status _rename_compacted_segment_plain(uint64_t seg_id);
diff --git a/be/src/olap/rowset/segcompaction.cpp
b/be/src/olap/rowset/segcompaction.cpp
index 1c152c75c0f..cd76409c822 100644
--- a/be/src/olap/rowset/segcompaction.cpp
+++ b/be/src/olap/rowset/segcompaction.cpp
@@ -76,11 +76,14 @@ Status SegcompactionWorker::_get_segcompaction_reader(
std::vector<uint32_t>& return_columns,
std::unique_ptr<vectorized::VerticalBlockReader>* reader) {
auto ctx = _writer->_context;
+ bool record_rowids = need_convert_delete_bitmap() && is_key;
StorageReadOptions read_options;
read_options.stats = stat;
read_options.use_page_cache = false;
read_options.tablet_schema = ctx.tablet_schema;
+ read_options.record_rowids = record_rowids;
std::vector<std::unique_ptr<RowwiseIterator>> seg_iterators;
+ std::map<uint32_t, uint32_t> segment_rows;
for (auto& seg_ptr : *segments) {
std::unique_ptr<RowwiseIterator> iter;
auto s = seg_ptr->new_iterator(schema, read_options, &iter);
@@ -89,6 +92,10 @@ Status SegcompactionWorker::_get_segcompaction_reader(
s.to_string());
}
seg_iterators.push_back(std::move(iter));
+ segment_rows.emplace(seg_ptr->id(), seg_ptr->num_rows());
+ }
+ if (record_rowids && _rowid_conversion != nullptr) {
+ _rowid_conversion->reset_segment_map(segment_rows);
}
*reader = std::unique_ptr<vectorized::VerticalBlockReader> {
@@ -103,6 +110,7 @@ Status SegcompactionWorker::_get_segcompaction_reader(
reader_params.return_columns = return_columns;
reader_params.is_key_column_group = is_key;
reader_params.use_page_cache = false;
+ reader_params.record_rowids = record_rowids;
return (*reader)->init(reader_params, nullptr);
}
@@ -232,6 +240,9 @@ Status
SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
DCHECK(ctx.tablet);
auto tablet = std::static_pointer_cast<Tablet>(ctx.tablet);
+ if (need_convert_delete_bitmap() && _rowid_conversion == nullptr) {
+ _rowid_conversion =
std::make_unique<SimpleRowIdConversion>(_writer->rowset_id());
+ }
std::vector<std::vector<uint32_t>> column_groups;
Merger::vertical_split_columns(ctx.tablet_schema, &column_groups);
@@ -262,8 +273,8 @@ Status
SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
Merger::Statistics merger_stats;
RETURN_IF_ERROR(Merger::vertical_compact_one_group(
tablet, ReaderType::READER_SEGMENT_COMPACTION,
ctx.tablet_schema, is_key,
- column_ids, &row_sources_buf, *reader, *writer, INT_MAX,
&merger_stats, &index_size,
- key_bounds));
+ column_ids, &row_sources_buf, *reader, *writer, &merger_stats,
&index_size,
+ key_bounds, _rowid_conversion.get()));
total_index_size += index_size;
if (is_key) {
RETURN_IF_ERROR(row_sources_buf.flush());
@@ -289,6 +300,10 @@ Status
SegcompactionWorker::_do_compact_segments(SegCompactionCandidatesSharedPt
}
RETURN_IF_ERROR(_delete_original_segments(begin, end));
+ if (_rowid_conversion != nullptr) {
+ convert_segment_delete_bitmap(ctx.mow_context->delete_bitmap, begin,
end,
+ _writer->_num_segcompacted);
+ }
RETURN_IF_ERROR(_writer->_rename_compacted_segments(begin, end));
if (VLOG_DEBUG_IS_ON) {
@@ -349,6 +364,59 @@ void
SegcompactionWorker::compact_segments(SegCompactionCandidatesSharedPtr segm
_is_compacting_state_mutable = true;
}
+bool SegcompactionWorker::need_convert_delete_bitmap() {
+ if (_writer == nullptr) {
+ return false;
+ }
+ auto tablet = _writer->context().tablet;
+ return tablet != nullptr && tablet->keys_type() == KeysType::UNIQUE_KEYS &&
+ tablet->enable_unique_key_merge_on_write() &&
+ tablet->tablet_schema()->has_sequence_col();
+}
+
+void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr
src_delete_bitmap,
+ uint32_t src_seg_id,
uint32_t dest_seg_id) {
+ // lazy init
+ if (nullptr == _converted_delete_bitmap) {
+ _converted_delete_bitmap =
std::make_shared<DeleteBitmap>(_writer->context().tablet_id);
+ }
+ auto rowset_id = _writer->context().rowset_id;
+ const auto* seg_map =
+ src_delete_bitmap->get({rowset_id, src_seg_id,
DeleteBitmap::TEMP_VERSION_COMMON});
+ if (seg_map != nullptr) {
+ _converted_delete_bitmap->set({rowset_id, dest_seg_id,
DeleteBitmap::TEMP_VERSION_COMMON},
+ *seg_map);
+ }
+}
+
+void SegcompactionWorker::convert_segment_delete_bitmap(DeleteBitmapPtr
src_delete_bitmap,
+ uint32_t src_begin,
uint32_t src_end,
+ uint32_t dst_seg_id) {
+ // lazy init
+ if (nullptr == _converted_delete_bitmap) {
+ _converted_delete_bitmap =
std::make_shared<DeleteBitmap>(_writer->context().tablet_id);
+ }
+ auto rowset_id = _writer->context().rowset_id;
+ RowLocation src(rowset_id, 0, 0);
+ for (uint32_t seg_id = src_begin; seg_id <= src_end; seg_id++) {
+ const auto* seg_map =
+ src_delete_bitmap->get({rowset_id, seg_id,
DeleteBitmap::TEMP_VERSION_COMMON});
+ if (!seg_map) {
+ continue;
+ }
+ src.segment_id = seg_id;
+ for (unsigned int row_id : *seg_map) {
+ src.row_id = row_id;
+ auto dst_row_id = _rowid_conversion->get(src);
+ if (dst_row_id < 0) {
+ continue;
+ }
+ _converted_delete_bitmap->add(
+ {rowset_id, dst_seg_id,
DeleteBitmap::TEMP_VERSION_COMMON}, dst_row_id);
+ }
+ }
+}
+
bool SegcompactionWorker::cancel() {
// return true if the task is canncellable (actual compaction is not
started)
// return false when the task is not cancellable (it is in the middle of
segcompaction)
diff --git a/be/src/olap/rowset/segcompaction.h
b/be/src/olap/rowset/segcompaction.h
index 5aef89992d3..67dd6889aad 100644
--- a/be/src/olap/rowset/segcompaction.h
+++ b/be/src/olap/rowset/segcompaction.h
@@ -23,6 +23,7 @@
#include "common/status.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "olap/merger.h"
+#include "olap/simple_rowid_conversion.h"
#include "olap/tablet.h"
#include "segment_v2/segment.h"
@@ -51,6 +52,14 @@ public:
void compact_segments(SegCompactionCandidatesSharedPtr segments);
+ bool need_convert_delete_bitmap();
+
+ void convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap,
uint32_t src_seg_id,
+ uint32_t dest_seg_id);
+ void convert_segment_delete_bitmap(DeleteBitmapPtr src_delete_bitmap,
uint32_t src_begin,
+ uint32_t src_end, uint32_t dest_seg_id);
+ DeleteBitmapPtr get_converted_delete_bitmap() { return
_converted_delete_bitmap; }
+
io::FileWriterPtr& get_file_writer() { return _file_writer; }
// set the cancel flag, tasks already started will not be cancelled.
@@ -78,6 +87,10 @@ private:
BetaRowsetWriter* _writer = nullptr;
io::FileWriterPtr _file_writer;
+ // for unique key mow table
+ std::unique_ptr<SimpleRowIdConversion> _rowid_conversion;
+ DeleteBitmapPtr _converted_delete_bitmap;
+
// the state is not mutable when 1)actual compaction operation started or
2) cancelled
std::atomic<bool> _is_compacting_state_mutable = true;
};
diff --git a/be/src/olap/simple_rowid_conversion.h
b/be/src/olap/simple_rowid_conversion.h
new file mode 100644
index 00000000000..1a89b01838f
--- /dev/null
+++ b/be/src/olap/simple_rowid_conversion.h
@@ -0,0 +1,84 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <map>
+#include <vector>
+
+#include "olap/olap_common.h"
+#include "olap/utils.h"
+
+namespace doris {
+
+// Simple verion of rowid conversion, for segcompaction
+// convert rows from several segments to rows in 1 segment
+class SimpleRowIdConversion {
+public:
+ SimpleRowIdConversion(const RowsetId& rowset_id) : _rowst_id(rowset_id) {};
+ ~SimpleRowIdConversion() = default;
+
+ // resize segment rowid map to its rows num
+ void reset_segment_map(const std::map<uint32_t, uint32_t>& num_rows) {
+ _cur_dst_segment_rowid = 0;
+ for (auto seg_rows : num_rows) {
+ _segments_rowid_map.emplace(seg_rows.first,
+ std::vector<uint32_t>(seg_rows.second,
UINT32_MAX));
+ }
+ }
+
+ // add row id to the map
+ void add(const std::vector<RowLocation>& rss_row_ids) {
+ for (auto& item : rss_row_ids) {
+ if (item.row_id == -1) {
+ continue;
+ }
+ DCHECK(_segments_rowid_map.find(item.segment_id) !=
_segments_rowid_map.end() &&
+ _segments_rowid_map[item.segment_id].size() > item.row_id);
+ _segments_rowid_map[item.segment_id][item.row_id] =
_cur_dst_segment_rowid++;
+ }
+ }
+
+ // get destination RowLocation
+ // return non-zero if the src RowLocation does not exist
+ int get(const RowLocation& src) const {
+ auto it = _segments_rowid_map.find(src.segment_id);
+ if (it == _segments_rowid_map.end()) {
+ return -1;
+ }
+ const auto& rowid_map = it->second;
+ if (src.row_id >= rowid_map.size() || UINT32_MAX ==
rowid_map[src.row_id]) {
+ return -1;
+ }
+
+ return rowid_map[src.row_id];
+ }
+
+private:
+ // key: index indicates src segment.
+ // value: index indicates row id of source segment, value indicates row id
of destination
+ // segment. UINT32_MAX indicates current row not exist.
+ std::map<uint32_t, std::vector<uint32_t>> _segments_rowid_map;
+
+ // dst rowset id
+ RowsetId _rowst_id;
+
+ // current rowid of dst segment
+ std::uint32_t _cur_dst_segment_rowid = 0;
+};
+
+} // namespace doris
diff --git a/be/src/vec/olap/vertical_block_reader.cpp
b/be/src/vec/olap/vertical_block_reader.cpp
index 58a2332d5a8..c16cb1f1bc2 100644
--- a/be/src/vec/olap/vertical_block_reader.cpp
+++ b/be/src/vec/olap/vertical_block_reader.cpp
@@ -132,6 +132,7 @@ Status VerticalBlockReader::_init_collect_iter(const
ReaderParams& read_params,
_reader_context.need_ordered_result = true; // TODO: should it be?
_reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS;
_reader_context.is_key_column_group = read_params.is_key_column_group;
+ _reader_context.record_rowids = read_params.record_rowids;
}
// build heap if key column iterator or build vertical merge iterator if
value column
diff --git a/be/test/common/status_test.cpp b/be/test/common/status_test.cpp
index c1197dad0b1..e5477db1127 100644
--- a/be/test/common/status_test.cpp
+++ b/be/test/common/status_test.cpp
@@ -50,8 +50,6 @@ TEST_F(StatusTest, OK) {
TEST_F(StatusTest, TStatusCodeWithStatus) {
// The definition in status.h
//extern ErrorCode::ErrorCodeState
error_states[ErrorCode::MAX_ERROR_CODE_DEFINE_NUM];
- extern ErrorCode::ErrorCodeState error_states;
- extern ErrorCode::ErrorCodeInitializer error_code_init;
// The definition in Status_types.h
extern const std::map<int, const char*> _TStatusCode_VALUES_TO_NAMES;
ErrorCode::error_code_init.check_init();
diff --git a/be/test/olap/segcompaction_mow_test.cpp
b/be/test/olap/segcompaction_mow_test.cpp
new file mode 100644
index 00000000000..65769ae9f1d
--- /dev/null
+++ b/be/test/olap/segcompaction_mow_test.cpp
@@ -0,0 +1,908 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include "common/config.h"
+#include "gen_cpp/AgentService_types.h"
+#include "gen_cpp/olap_file.pb.h"
+#include "io/fs/local_file_system.h"
+#include "olap/data_dir.h"
+#include "olap/row_cursor.h"
+#include "olap/rowset/beta_rowset_reader.h"
+#include "olap/rowset/beta_rowset_writer.h"
+#include "olap/rowset/rowset_factory.h"
+#include "olap/rowset/rowset_reader_context.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/rowset_writer_context.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_meta.h"
+#include "olap/tablet_schema.h"
+#include "olap/utils.h"
+#include "runtime/exec_env.h"
+#include "runtime/memory/mem_tracker.h"
+#include "util/slice.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+static const uint32_t MAX_PATH_LEN = 1024;
+static const uint32_t TABLET_ID = 12345;
+static StorageEngine* s_engine;
+static const std::string lTestDir = "./data_test/data/segcompaction_mow_test";
+
+class SegCompactionMoWTest : public ::testing::TestWithParam<std::string> {
+public:
+ SegCompactionMoWTest() = default;
+
+ void SetUp() {
+ config::enable_segcompaction = true;
+ config::tablet_map_shard_size = 1;
+ config::txn_map_shard_size = 1;
+ config::txn_shard_size = 1;
+
+ char buffer[MAX_PATH_LEN];
+ EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
+ config::storage_root_path = std::string(buffer) + "/data_test";
+
+ auto st =
io::global_local_filesystem()->delete_directory(config::storage_root_path);
+ ASSERT_TRUE(st.ok()) << st;
+ st =
io::global_local_filesystem()->create_directory(config::storage_root_path);
+ ASSERT_TRUE(st.ok()) << st;
+
+ std::vector<StorePath> paths;
+ paths.emplace_back(config::storage_root_path, -1);
+
+ doris::EngineOptions options;
+ options.store_paths = paths;
+
+ s_engine = new StorageEngine(options);
+ ExecEnv::GetInstance()->set_storage_engine(s_engine);
+
+ Status s = s_engine->open();
+ EXPECT_TRUE(s.ok()) << s.to_string();
+
+ _data_dir = new DataDir(lTestDir, 1000000000);
+ static_cast<void>(_data_dir->init());
+ static_cast<void>(_data_dir->update_capacity());
+
+
EXPECT_TRUE(io::global_local_filesystem()->create_directory(lTestDir).ok());
+
+ s = s_engine->start_bg_threads();
+ EXPECT_TRUE(s.ok()) << s.to_string();
+ }
+
+ void TearDown() {
+ SAFE_DELETE(_data_dir);
+
EXPECT_TRUE(io::global_local_filesystem()->delete_directory(lTestDir).ok());
+ if (s_engine != nullptr) {
+ s_engine->stop();
+ delete s_engine;
+ s_engine = nullptr;
+ ExecEnv::GetInstance()->set_storage_engine(nullptr);
+ }
+ config::enable_segcompaction = false;
+ }
+
+protected:
+ OlapReaderStatistics _stats;
+
+ bool check_dir(std::vector<std::string>& vec) {
+ std::vector<std::string> result;
+ for (const auto& entry :
std::filesystem::directory_iterator(lTestDir)) {
+ result.push_back(std::filesystem::path(entry.path()).filename());
+ }
+
+ LOG(INFO) << "expected ls:" << std::endl;
+ for (auto& i : vec) {
+ LOG(INFO) << i;
+ }
+ LOG(INFO) << "acutal ls:" << std::endl;
+ for (auto& i : result) {
+ LOG(INFO) << i;
+ }
+
+ if (result.size() != vec.size()) {
+ return false;
+ } else {
+ for (auto& i : vec) {
+ if (std::find(result.begin(), result.end(), i) ==
result.end()) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ // (k1 int, k2 varchar(20), k3 int) keys (k1, k2)
+ void create_tablet_schema(TabletSchemaSPtr tablet_schema) {
+ TabletSchemaPB tablet_schema_pb;
+ tablet_schema_pb.set_keys_type(UNIQUE_KEYS);
+ tablet_schema_pb.set_num_short_key_columns(2);
+ tablet_schema_pb.set_num_rows_per_row_block(1024);
+ tablet_schema_pb.set_compress_kind(COMPRESS_NONE);
+ tablet_schema_pb.set_next_column_unique_id(5);
+ // add seq column so that segcompaction will process delete bitmap
+ tablet_schema_pb.set_sequence_col_idx(3);
+
+ ColumnPB* column_1 = tablet_schema_pb.add_column();
+ column_1->set_unique_id(1);
+ column_1->set_name("k1");
+ column_1->set_type("INT");
+ column_1->set_is_key(true);
+ column_1->set_length(4);
+ column_1->set_index_length(4);
+ column_1->set_is_nullable(true);
+ column_1->set_is_bf_column(false);
+
+ ColumnPB* column_2 = tablet_schema_pb.add_column();
+ column_2->set_unique_id(2);
+ column_2->set_name("k2");
+ column_2->set_type(
+ "INT"); // TODO change to varchar(20) when dict encoding for
string is supported
+ column_2->set_length(4);
+ column_2->set_index_length(4);
+ column_2->set_is_nullable(true);
+ column_2->set_is_key(true);
+ column_2->set_is_nullable(true);
+ column_2->set_is_bf_column(false);
+
+ ColumnPB* v_column = tablet_schema_pb.add_column();
+ v_column->set_unique_id(3);
+ v_column->set_name(fmt::format("v1"));
+ v_column->set_type("INT");
+ v_column->set_length(4);
+ v_column->set_is_key(false);
+ v_column->set_is_nullable(false);
+ v_column->set_is_bf_column(false);
+ v_column->set_default_value(std::to_string(10));
+ v_column->set_aggregation("NONE");
+
+ ColumnPB* seq_column = tablet_schema_pb.add_column();
+ seq_column->set_unique_id(4);
+ seq_column->set_name(SEQUENCE_COL);
+ seq_column->set_type("INT");
+ seq_column->set_length(4);
+ seq_column->set_is_key(false);
+ seq_column->set_is_nullable(false);
+ seq_column->set_is_bf_column(false);
+ seq_column->set_default_value(std::to_string(10));
+ seq_column->set_aggregation("NONE");
+
+ tablet_schema->init_from_pb(tablet_schema_pb);
+ }
+
+ // use different id to avoid conflict
+ void create_rowset_writer_context(int64_t id, TabletSchemaSPtr
tablet_schema,
+ RowsetWriterContext*
rowset_writer_context) {
+ RowsetId rowset_id;
+ rowset_id.init(id);
+ // rowset_writer_context->data_dir = _data_dir.get();
+ rowset_writer_context->rowset_id = rowset_id;
+ rowset_writer_context->tablet_id = TABLET_ID;
+ rowset_writer_context->tablet_schema_hash = 1111;
+ rowset_writer_context->partition_id = 10;
+ rowset_writer_context->rowset_type = BETA_ROWSET;
+ rowset_writer_context->rowset_dir = lTestDir;
+ rowset_writer_context->rowset_state = VISIBLE;
+ rowset_writer_context->tablet_schema = tablet_schema;
+ rowset_writer_context->version.first = 10;
+ rowset_writer_context->version.second = 10;
+
+ TabletMetaSharedPtr tablet_meta = std::make_shared<TabletMeta>();
+ tablet_meta->_tablet_id = TABLET_ID;
+ static_cast<void>(tablet_meta->set_partition_id(10000));
+ tablet_meta->_schema = tablet_schema;
+ tablet_meta->_enable_unique_key_merge_on_write = true;
+ auto tablet = std::make_shared<Tablet>(*s_engine, tablet_meta,
_data_dir, "test_str");
+ // tablet->key
+ rowset_writer_context->tablet = tablet;
+ }
+
+ void create_and_init_rowset_reader(Rowset* rowset, RowsetReaderContext&
context,
+ RowsetReaderSharedPtr* result) {
+ auto s = rowset->create_reader(result);
+ EXPECT_EQ(Status::OK(), s);
+ EXPECT_TRUE(*result != nullptr);
+
+ s = (*result)->init(&context);
+ EXPECT_EQ(Status::OK(), s);
+ }
+
+ bool check_data_read_with_delete_bitmap(TabletSchemaSPtr tablet_schema,
+ DeleteBitmapPtr delete_bitmap,
RowsetSharedPtr rowset,
+ int expect_total_rows, int
rows_mark_deleted,
+ bool skip_value_check = false) {
+ RowsetReaderContext reader_context;
+ reader_context.tablet_schema = tablet_schema;
+ // use this type to avoid cache from other ut
+ reader_context.reader_type = ReaderType::READER_QUERY;
+ reader_context.need_ordered_result = true;
+ std::vector<uint32_t> return_columns = {0, 1, 2};
+ reader_context.return_columns = &return_columns;
+ reader_context.stats = &_stats;
+ reader_context.delete_bitmap = delete_bitmap.get();
+
+ std::vector<uint32_t> segment_num_rows;
+ Status s;
+
+ // without predicates
+ {
+ RowsetReaderSharedPtr rowset_reader;
+ create_and_init_rowset_reader(rowset.get(), reader_context,
&rowset_reader);
+
+ uint32_t num_rows_read = 0;
+ bool eof = false;
+ while (!eof) {
+ std::shared_ptr<vectorized::Block> output_block =
+ std::make_shared<vectorized::Block>(
+ tablet_schema->create_block(return_columns));
+ s = rowset_reader->next_block(output_block.get());
+ if (s != Status::OK()) {
+ eof = true;
+ }
+ EXPECT_EQ(return_columns.size(), output_block->columns());
+ for (int i = 0; i < output_block->rows(); ++i) {
+ vectorized::ColumnPtr col0 =
output_block->get_by_position(0).column;
+ vectorized::ColumnPtr col1 =
output_block->get_by_position(1).column;
+ vectorized::ColumnPtr col2 =
output_block->get_by_position(2).column;
+ auto field1 = (*col0)[i];
+ auto field2 = (*col1)[i];
+ auto field3 = (*col2)[i];
+ uint32_t k1 =
*reinterpret_cast<uint32_t*>((char*)(&field1));
+ uint32_t k2 =
*reinterpret_cast<uint32_t*>((char*)(&field2));
+ uint32_t v3 =
*reinterpret_cast<uint32_t*>((char*)(&field3));
+ EXPECT_EQ(100 * v3 + k2, k1);
+ if (!skip_value_check) {
+ // all v3%3==0 is deleted in all segments with an even
number of ids.
+ EXPECT_TRUE(k2 % 2 != 0 || v3 % 3 != 0);
+ }
+ num_rows_read++;
+ }
+ output_block->clear();
+ }
+ EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
+ EXPECT_EQ(rowset->rowset_meta()->num_rows(), expect_total_rows);
+ EXPECT_EQ(num_rows_read, expect_total_rows - rows_mark_deleted);
+
EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok());
+ size_t total_num_rows = 0;
+ for (const auto& i : segment_num_rows) {
+ total_num_rows += i;
+ }
+ EXPECT_EQ(total_num_rows, expect_total_rows);
+ }
+ return true;
+ }
+
+private:
+ DataDir* _data_dir = nullptr;
+};
+
+TEST_P(SegCompactionMoWTest, SegCompactionThenRead) {
+ std::string delete_ratio = GetParam();
+ config::enable_segcompaction = true;
+ Status s;
+ TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+ create_tablet_schema(tablet_schema);
+
+ RowsetSharedPtr rowset;
+ const int num_segments = 15;
+ const uint32_t rows_per_segment = 4096;
+ config::segcompaction_candidate_max_rows = 6000; // set threshold above
+ // rows_per_segment
+ config::segcompaction_batch_size = 10;
+ std::vector<uint32_t> segment_num_rows;
+ DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID);
+ uint32_t rows_mark_deleted = 0;
+ { // write `num_segments * rows_per_segment` rows to rowset
+ RowsetWriterContext writer_context;
+ int raw_rsid = rand();
+ create_rowset_writer_context(raw_rsid, tablet_schema, &writer_context);
+ RowsetIdUnorderedSet rsids;
+ std::vector<RowsetSharedPtr> rowset_ptrs;
+ writer_context.mow_context =
+ std::make_shared<MowContext>(1, 1, rsids, rowset_ptrs,
delete_bitmap);
+ auto rowset_id = writer_context.rowset_id;
+
+ std::unique_ptr<RowsetWriter> rowset_writer;
+ auto res = RowsetFactory::create_rowset_writer(writer_context, false,
&rowset_writer);
+ EXPECT_TRUE(s.ok());
+ // for segment "i", row "rid"
+ // k1 := rid*10 + i
+ // k2 := k1 * 10
+ // k3 := rid
+ for (int i = 0; i < num_segments; ++i) {
+ vectorized::Block block = tablet_schema->create_block();
+ auto columns = block.mutate_columns();
+ for (int rid = 0; rid < rows_per_segment; ++rid) {
+ uint32_t k1 = rid * 100 + i;
+ uint32_t k2 = i;
+ uint32_t k3 = rid;
+ uint32_t seq = 0;
+ columns[0]->insert_data((const char*)&k1, sizeof(k1));
+ columns[1]->insert_data((const char*)&k2, sizeof(k2));
+ columns[2]->insert_data((const char*)&k3, sizeof(k3));
+ columns[3]->insert_data((const char*)&seq, sizeof(seq));
+ if (delete_ratio == "full") { // delete all data
+ writer_context.mow_context->delete_bitmap->add(
+ {rowset_id, i, DeleteBitmap::TEMP_VERSION_COMMON},
rid);
+ rows_mark_deleted++;
+ } else {
+ // mark delete every 3 rows, for segments that seg_id is
even number
+ if (i % 2 == 0 && rid % 3 == 0) {
+ writer_context.mow_context->delete_bitmap->add(
+ {rowset_id, i,
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+ rows_mark_deleted++;
+ }
+ }
+ }
+ s = rowset_writer->add_block(&block);
+ EXPECT_TRUE(s.ok());
+ s = rowset_writer->flush();
+ EXPECT_EQ(Status::OK(), s);
+ sleep(1);
+ }
+
+ size_t total_cardinality1 = 0;
+ for (auto entry : delete_bitmap->delete_bitmap) {
+ total_cardinality1 += entry.second.cardinality();
+ }
+ if (delete_ratio == "full") {
+ EXPECT_EQ(num_segments, delete_bitmap->delete_bitmap.size());
+ } else {
+ EXPECT_EQ(num_segments / 2 + num_segments % 2,
delete_bitmap->delete_bitmap.size());
+ }
+ EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
+ std::vector<std::string> ls;
+ ls.push_back(fmt::format("{}_0.dat", raw_rsid));
+ ls.push_back(fmt::format("{}_1.dat", raw_rsid));
+ ls.push_back(fmt::format("{}_2.dat", raw_rsid));
+ ls.push_back(fmt::format("{}_3.dat", raw_rsid));
+ ls.push_back(fmt::format("{}_4.dat", raw_rsid));
+ ls.push_back(fmt::format("{}_5.dat", raw_rsid));
+ ls.push_back(fmt::format("{}_6.dat", raw_rsid));
+ EXPECT_TRUE(check_dir(ls));
+ // 7 segments plus 1 sentinel mark
+ size_t total_cardinality2 = 0;
+ for (auto entry : delete_bitmap->delete_bitmap) {
+ if (std::get<1>(entry.first) == DeleteBitmap::INVALID_SEGMENT_ID) {
+ continue;
+ }
+ total_cardinality2 += entry.second.cardinality();
+ }
+ if (delete_ratio == "full") {
+ // 7 segments + 1 sentinel mark
+ EXPECT_EQ(8, delete_bitmap->delete_bitmap.size());
+ } else {
+ EXPECT_EQ(5, delete_bitmap->delete_bitmap.size());
+ }
+ EXPECT_EQ(total_cardinality1, total_cardinality2);
+ }
+
+ EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema,
delete_bitmap, rowset,
+ num_segments *
rows_per_segment,
+ rows_mark_deleted));
+}
+
+TEST_F(SegCompactionMoWTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO)
{
+ config::enable_segcompaction = true;
+ Status s;
+ TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+ create_tablet_schema(tablet_schema);
+
+ RowsetSharedPtr rowset;
+ config::segcompaction_candidate_max_rows = 6000; // set threshold above
+ // rows_per_segment
+ DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID);
+ uint32_t rows_mark_deleted = 0;
+ uint32_t total_written_rows = 0;
+ std::vector<uint32_t> segment_num_rows;
+ { // write `num_segments * rows_per_segment` rows to rowset
+ RowsetWriterContext writer_context;
+ create_rowset_writer_context(20048, tablet_schema, &writer_context);
+ RowsetIdUnorderedSet rsids;
+ std::vector<RowsetSharedPtr> rowset_ptrs;
+ writer_context.mow_context =
+ std::make_shared<MowContext>(1, 1, rsids, rowset_ptrs,
delete_bitmap);
+ auto rowset_id = writer_context.rowset_id;
+
+ std::unique_ptr<RowsetWriter> rowset_writer;
+ auto res = RowsetFactory::create_rowset_writer(writer_context, false,
&rowset_writer);
+ EXPECT_TRUE(s.ok());
+
+ // for segment "i", row "rid"
+ // k1 := rid*10 + i
+ // k2 := k1 * 10
+ // k3 := 4096 * i + rid
+ int num_segments = 4;
+ uint32_t rows_per_segment = 4096;
+ int segid = 0;
+ for (int i = 0; i < num_segments; ++i) {
+ vectorized::Block block = tablet_schema->create_block();
+ auto columns = block.mutate_columns();
+ for (int rid = 0; rid < rows_per_segment; ++rid) {
+ uint32_t k1 = rid * 100 + segid;
+ uint32_t k2 = segid;
+ uint32_t k3 = rid;
+ uint32_t seq = 0;
+ columns[0]->insert_data((const char*)&k1, sizeof(k1));
+ columns[1]->insert_data((const char*)&k2, sizeof(k2));
+ columns[2]->insert_data((const char*)&k3, sizeof(k3));
+ columns[3]->insert_data((const char*)&seq, sizeof(seq));
+ // mark delete every 3 rows, for segments that seg_id is even
number
+ if (segid % 2 == 0 && rid % 3 == 0) {
+ writer_context.mow_context->delete_bitmap->add(
+ {rowset_id, segid,
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+ rows_mark_deleted++;
+ }
+ }
+ s = rowset_writer->add_block(&block);
+ EXPECT_TRUE(s.ok());
+ s = rowset_writer->flush();
+ EXPECT_EQ(Status::OK(), s);
+ segid++;
+ total_written_rows += rows_per_segment;
+ }
+ num_segments = 2;
+ rows_per_segment = 6400;
+ for (int i = 0; i < num_segments; ++i) {
+ vectorized::Block block = tablet_schema->create_block();
+ auto columns = block.mutate_columns();
+ for (int rid = 0; rid < rows_per_segment; ++rid) {
+ uint32_t k1 = rid * 100 + segid;
+ uint32_t k2 = segid;
+ uint32_t k3 = rid;
+ uint32_t seq = 0;
+ columns[0]->insert_data((const char*)&k1, sizeof(k1));
+ columns[1]->insert_data((const char*)&k2, sizeof(k2));
+ columns[2]->insert_data((const char*)&k3, sizeof(k3));
+ columns[3]->insert_data((const char*)&seq, sizeof(seq));
+ // mark delete every 3 rows, for segments that seg_id is even
number
+ if (segid % 2 == 0 && rid % 3 == 0) {
+ writer_context.mow_context->delete_bitmap->add(
+ {rowset_id, segid,
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+ rows_mark_deleted++;
+ }
+ }
+ s = rowset_writer->add_block(&block);
+ EXPECT_TRUE(s.ok());
+ s = rowset_writer->flush();
+ EXPECT_EQ(Status::OK(), s);
+ segid++;
+ total_written_rows += rows_per_segment;
+ }
+ num_segments = 1;
+ rows_per_segment = 4096;
+ for (int i = 0; i < num_segments; ++i) {
+ vectorized::Block block = tablet_schema->create_block();
+ auto columns = block.mutate_columns();
+ for (int rid = 0; rid < rows_per_segment; ++rid) {
+ uint32_t k1 = rid * 100 + segid;
+ uint32_t k2 = segid;
+ uint32_t k3 = rid;
+ uint32_t seq = 0;
+ columns[0]->insert_data((const char*)&k1, sizeof(k1));
+ columns[1]->insert_data((const char*)&k2, sizeof(k2));
+ columns[2]->insert_data((const char*)&k3, sizeof(k3));
+ columns[3]->insert_data((const char*)&seq, sizeof(seq));
+ // mark delete every 3 rows, for segments that seg_id is even
number
+ if (segid % 2 == 0 && rid % 3 == 0) {
+ writer_context.mow_context->delete_bitmap->add(
+ {rowset_id, segid,
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+ rows_mark_deleted++;
+ }
+ }
+ s = rowset_writer->add_block(&block);
+ EXPECT_TRUE(s.ok());
+ s = rowset_writer->flush();
+ EXPECT_EQ(Status::OK(), s);
+ segid++;
+ total_written_rows += rows_per_segment;
+ }
+ num_segments = 1;
+ rows_per_segment = 6400;
+ for (int i = 0; i < num_segments; ++i) {
+ vectorized::Block block = tablet_schema->create_block();
+ auto columns = block.mutate_columns();
+ for (int rid = 0; rid < rows_per_segment; ++rid) {
+ uint32_t k1 = rid * 100 + segid;
+ uint32_t k2 = segid;
+ uint32_t k3 = rid;
+ uint32_t seq = 0;
+ columns[0]->insert_data((const char*)&k1, sizeof(k1));
+ columns[1]->insert_data((const char*)&k2, sizeof(k2));
+ columns[2]->insert_data((const char*)&k3, sizeof(k3));
+ columns[3]->insert_data((const char*)&seq, sizeof(seq));
+ // mark delete every 3 rows, for segments that seg_id is even
number
+ if (segid % 2 == 0 && rid % 3 == 0) {
+ writer_context.mow_context->delete_bitmap->add(
+ {rowset_id, segid,
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+ rows_mark_deleted++;
+ }
+ }
+ s = rowset_writer->add_block(&block);
+ EXPECT_TRUE(s.ok());
+ s = rowset_writer->flush();
+ EXPECT_EQ(Status::OK(), s);
+ segid++;
+ total_written_rows += rows_per_segment;
+ }
+ num_segments = 8;
+ rows_per_segment = 4096;
+ std::map<uint32_t, uint32_t> unique_keys;
+ for (int i = 0; i < num_segments; ++i) {
+ vectorized::Block block = tablet_schema->create_block();
+ auto columns = block.mutate_columns();
+ for (int rid = 0; rid < rows_per_segment; ++rid) {
+ // generate some duplicate rows, segment compaction will merge
them
+ int rand_i = rand() % (num_segments - 3);
+ uint32_t k1 = rid * 100 + rand_i;
+ uint32_t k2 = rand_i;
+ uint32_t k3 = rid;
+ uint32_t seq = 0;
+ columns[0]->insert_data((const char*)&k1, sizeof(k1));
+ columns[1]->insert_data((const char*)&k2, sizeof(k2));
+ columns[2]->insert_data((const char*)&k3, sizeof(k3));
+ columns[3]->insert_data((const char*)&seq, sizeof(seq));
+ // mark delete every 3 rows
+ if (rid % 3 == 0) {
+ writer_context.mow_context->delete_bitmap->add(
+ {rowset_id, segid,
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+ }
+ unique_keys.emplace(k1, rid);
+ }
+ s = rowset_writer->add_block(&block);
+ EXPECT_TRUE(s.ok());
+ s = rowset_writer->flush();
+ EXPECT_EQ(Status::OK(), s);
+ sleep(1);
+ segid++;
+ }
+ // these 8 segments should be compacted to 1 segment finally
+ // so the finally written rows should be the unique rows after
compaction
+ total_written_rows += unique_keys.size();
+ for (auto entry : unique_keys) {
+ if (entry.second % 3 == 0) {
+ rows_mark_deleted++;
+ }
+ }
+
+ num_segments = 1;
+ rows_per_segment = 6400;
+ for (int i = 0; i < num_segments; ++i) {
+ vectorized::Block block = tablet_schema->create_block();
+ auto columns = block.mutate_columns();
+ for (int rid = 0; rid < rows_per_segment; ++rid) {
+ uint32_t k1 = rid * 100 + segid;
+ uint32_t k2 = segid;
+ uint32_t k3 = rid;
+ uint32_t seq = 0;
+ columns[0]->insert_data((const char*)&k1, sizeof(k1));
+ columns[1]->insert_data((const char*)&k2, sizeof(k2));
+ columns[2]->insert_data((const char*)&k3, sizeof(k3));
+ columns[3]->insert_data((const char*)&seq, sizeof(seq));
+ // mark delete every 3 rows, for segments that seg_id is even
number
+ if (segid % 2 == 0 && rid % 3 == 0) {
+ writer_context.mow_context->delete_bitmap->add(
+ {rowset_id, segid,
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+ rows_mark_deleted++;
+ }
+ }
+ s = rowset_writer->add_block(&block);
+ EXPECT_TRUE(s.ok());
+ s = rowset_writer->flush();
+ EXPECT_EQ(Status::OK(), s);
+ sleep(1);
+ segid++;
+ total_written_rows += rows_per_segment;
+ }
+
+ EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
+ std::vector<std::string> ls;
+ // ooooOOoOooooooooO
+ ls.push_back("20048_0.dat"); // oooo
+ ls.push_back("20048_1.dat"); // O
+ ls.push_back("20048_2.dat"); // O
+ ls.push_back("20048_3.dat"); // o
+ ls.push_back("20048_4.dat"); // O
+ ls.push_back("20048_5.dat"); // oooooooo
+ ls.push_back("20048_6.dat"); // O
+ EXPECT_TRUE(check_dir(ls));
+ EXPECT_EQ(6, delete_bitmap->delete_bitmap.size());
+ }
+ EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema,
delete_bitmap, rowset,
+ total_written_rows,
rows_mark_deleted, true));
+}
+
+TEST_F(SegCompactionMoWTest, SegCompactionInterleaveWithBig_OoOoO) {
+ config::enable_segcompaction = true;
+ Status s;
+ TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+ create_tablet_schema(tablet_schema);
+
+ RowsetSharedPtr rowset;
+ config::segcompaction_candidate_max_rows = 6000; // set threshold above
+ config::segcompaction_batch_size = 5;
+ std::vector<uint32_t> segment_num_rows;
+ DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID);
+ uint32_t rows_mark_deleted = 0;
+ uint32_t total_written_rows = 0;
+ { // write `num_segments * rows_per_segment` rows to rowset
+ RowsetWriterContext writer_context;
+ create_rowset_writer_context(20049, tablet_schema, &writer_context);
+ RowsetIdUnorderedSet rsids;
+ std::vector<RowsetSharedPtr> rowset_ptrs;
+ writer_context.mow_context =
+ std::make_shared<MowContext>(1, 1, rsids, rowset_ptrs,
delete_bitmap);
+ auto rowset_id = writer_context.rowset_id;
+
+ std::unique_ptr<RowsetWriter> rowset_writer;
+ auto res = RowsetFactory::create_rowset_writer(writer_context, false,
&rowset_writer);
+ EXPECT_TRUE(s.ok());
+
+ // for segment "i", row "rid"
+ // k1 := rid*10 + i
+ // k2 := k1 * 10
+ // k3 := 4096 * i + rid
+ int num_segments = 1;
+ uint32_t rows_per_segment = 6400;
+ int segid = 0;
+ for (int i = 0; i < num_segments; ++i) {
+ vectorized::Block block = tablet_schema->create_block();
+ auto columns = block.mutate_columns();
+ for (int rid = 0; rid < rows_per_segment; ++rid) {
+ uint32_t k1 = rid * 100 + segid;
+ uint32_t k2 = segid;
+ uint32_t k3 = rid;
+ uint32_t seq = 0;
+ columns[0]->insert_data((const char*)&k1, sizeof(k1));
+ columns[1]->insert_data((const char*)&k2, sizeof(k2));
+ columns[2]->insert_data((const char*)&k3, sizeof(k3));
+ columns[3]->insert_data((const char*)&seq, sizeof(seq));
+ // mark delete every 3 rows, for segments that seg_id is even
number
+ if (segid % 2 == 0 && rid % 3 == 0) {
+ writer_context.mow_context->delete_bitmap->add(
+ {rowset_id, segid,
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+ rows_mark_deleted++;
+ }
+ }
+ s = rowset_writer->add_block(&block);
+ EXPECT_TRUE(s.ok());
+ s = rowset_writer->flush();
+ EXPECT_EQ(Status::OK(), s);
+ segid++;
+ total_written_rows += rows_per_segment;
+ }
+ num_segments = 1;
+ rows_per_segment = 4096;
+ for (int i = 0; i < num_segments; ++i) {
+ vectorized::Block block = tablet_schema->create_block();
+ auto columns = block.mutate_columns();
+ for (int rid = 0; rid < rows_per_segment; ++rid) {
+ uint32_t k1 = rid * 100 + segid;
+ uint32_t k2 = segid;
+ uint32_t k3 = rid;
+ uint32_t seq = 0;
+ columns[0]->insert_data((const char*)&k1, sizeof(k1));
+ columns[1]->insert_data((const char*)&k2, sizeof(k2));
+ columns[2]->insert_data((const char*)&k3, sizeof(k3));
+ columns[3]->insert_data((const char*)&seq, sizeof(seq));
+ // mark delete every 3 rows, for segments that seg_id is even
number
+ if (segid % 2 == 0 && rid % 3 == 0) {
+ writer_context.mow_context->delete_bitmap->add(
+ {rowset_id, segid,
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+ rows_mark_deleted++;
+ }
+ }
+ s = rowset_writer->add_block(&block);
+ EXPECT_TRUE(s.ok());
+ s = rowset_writer->flush();
+ EXPECT_EQ(Status::OK(), s);
+ segid++;
+ total_written_rows += rows_per_segment;
+ }
+ num_segments = 1;
+ rows_per_segment = 6400;
+ for (int i = 0; i < num_segments; ++i) {
+ vectorized::Block block = tablet_schema->create_block();
+ auto columns = block.mutate_columns();
+ for (int rid = 0; rid < rows_per_segment; ++rid) {
+ uint32_t k1 = rid * 100 + segid;
+ uint32_t k2 = segid;
+ uint32_t k3 = rid;
+ uint32_t seq = 0;
+ columns[0]->insert_data((const char*)&k1, sizeof(k1));
+ columns[1]->insert_data((const char*)&k2, sizeof(k2));
+ columns[2]->insert_data((const char*)&k3, sizeof(k3));
+ columns[3]->insert_data((const char*)&seq, sizeof(seq));
+ // mark delete every 3 rows, for segments that seg_id is even
number
+ if (segid % 2 == 0 && rid % 3 == 0) {
+ writer_context.mow_context->delete_bitmap->add(
+ {rowset_id, segid,
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+ rows_mark_deleted++;
+ }
+ }
+ s = rowset_writer->add_block(&block);
+ EXPECT_TRUE(s.ok());
+ s = rowset_writer->flush();
+ EXPECT_EQ(Status::OK(), s);
+ segid++;
+ total_written_rows += rows_per_segment;
+ }
+ num_segments = 1;
+ rows_per_segment = 4096;
+ for (int i = 0; i < num_segments; ++i) {
+ vectorized::Block block = tablet_schema->create_block();
+ auto columns = block.mutate_columns();
+ for (int rid = 0; rid < rows_per_segment; ++rid) {
+ uint32_t k1 = rid * 100 + segid;
+ uint32_t k2 = segid;
+ uint32_t k3 = rid;
+ uint32_t seq = 0;
+ columns[0]->insert_data((const char*)&k1, sizeof(k1));
+ columns[1]->insert_data((const char*)&k2, sizeof(k2));
+ columns[2]->insert_data((const char*)&k3, sizeof(k3));
+ columns[3]->insert_data((const char*)&seq, sizeof(seq));
+ // mark delete every 3 rows, for segments that seg_id is even
number
+ if (segid % 2 == 0 && rid % 3 == 0) {
+ writer_context.mow_context->delete_bitmap->add(
+ {rowset_id, segid,
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+ rows_mark_deleted++;
+ }
+ }
+ s = rowset_writer->add_block(&block);
+ EXPECT_TRUE(s.ok());
+ s = rowset_writer->flush();
+ EXPECT_EQ(Status::OK(), s);
+ segid++;
+ total_written_rows += rows_per_segment;
+ }
+ num_segments = 1;
+ rows_per_segment = 6400;
+ for (int i = 0; i < num_segments; ++i) {
+ vectorized::Block block = tablet_schema->create_block();
+ auto columns = block.mutate_columns();
+ for (int rid = 0; rid < rows_per_segment; ++rid) {
+ uint32_t k1 = rid * 100 + segid;
+ uint32_t k2 = segid;
+ uint32_t k3 = rid;
+ uint32_t seq = 0;
+ columns[0]->insert_data((const char*)&k1, sizeof(k1));
+ columns[1]->insert_data((const char*)&k2, sizeof(k2));
+ columns[2]->insert_data((const char*)&k3, sizeof(k3));
+ columns[3]->insert_data((const char*)&seq, sizeof(seq));
+ // mark delete every 3 rows, for segments that seg_id is even
number
+ if (segid % 2 == 0 && rid % 3 == 0) {
+ writer_context.mow_context->delete_bitmap->add(
+ {rowset_id, segid,
DeleteBitmap::TEMP_VERSION_COMMON}, rid);
+ rows_mark_deleted++;
+ }
+ }
+ s = rowset_writer->add_block(&block);
+ EXPECT_TRUE(s.ok());
+ s = rowset_writer->flush();
+ EXPECT_EQ(Status::OK(), s);
+ sleep(1);
+ segid++;
+ total_written_rows += rows_per_segment;
+ }
+
+ EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
+ std::vector<std::string> ls;
+ ls.push_back("20049_0.dat"); // O
+ ls.push_back("20049_1.dat"); // o
+ ls.push_back("20049_2.dat"); // O
+ ls.push_back("20049_3.dat"); // o
+ ls.push_back("20049_4.dat"); // O
+ EXPECT_TRUE(check_dir(ls));
+ }
+
+ EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema,
delete_bitmap, rowset,
+ total_written_rows,
rows_mark_deleted));
+}
+
+TEST_F(SegCompactionMoWTest, SegCompactionNotTrigger) {
+ config::enable_segcompaction = true;
+ Status s;
+ TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
+ create_tablet_schema(tablet_schema);
+
+ RowsetSharedPtr rowset;
+ const int num_segments = 8;
+ const uint32_t rows_per_segment = 4096;
+ config::segcompaction_candidate_max_rows = 6000; // set threshold above
+ // rows_per_segment
+ config::segcompaction_batch_size = 10;
+ std::vector<uint32_t> segment_num_rows;
+ DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID);
+ uint32_t rows_mark_deleted = 0;
+ { // write `num_segments * rows_per_segment` rows to rowset
+ RowsetWriterContext writer_context;
+ create_rowset_writer_context(20050, tablet_schema, &writer_context);
+ RowsetIdUnorderedSet rsids;
+ std::vector<RowsetSharedPtr> rowset_ptrs;
+ writer_context.mow_context =
+ std::make_shared<MowContext>(1, 1, rsids, rowset_ptrs,
delete_bitmap);
+ auto rowset_id = writer_context.rowset_id;
+
+ std::unique_ptr<RowsetWriter> rowset_writer;
+ auto res = RowsetFactory::create_rowset_writer(writer_context, false,
&rowset_writer);
+ EXPECT_TRUE(s.ok());
+
+ // for segment "i", row "rid"
+ // k1 := rid*10 + i
+ // k2 := k1 * 10
+ // k3 := rid
+ for (int i = 0; i < num_segments; ++i) {
+ vectorized::Block block = tablet_schema->create_block();
+ auto columns = block.mutate_columns();
+ for (int rid = 0; rid < rows_per_segment; ++rid) {
+ uint32_t k1 = rid * 100 + i;
+ uint32_t k2 = i;
+ uint32_t k3 = rid;
+ uint32_t seq = 0;
+ columns[0]->insert_data((const char*)&k1, sizeof(k1));
+ columns[1]->insert_data((const char*)&k2, sizeof(k2));
+ columns[2]->insert_data((const char*)&k3, sizeof(k3));
+ columns[3]->insert_data((const char*)&seq, sizeof(seq));
+ // mark delete every 3 rows, for segments that seg_id is even
number
+ if (i % 2 == 0 && rid % 3 == 0) {
+ writer_context.mow_context->delete_bitmap->add(
+ {rowset_id, i, DeleteBitmap::TEMP_VERSION_COMMON},
rid);
+ rows_mark_deleted++;
+ }
+ }
+ s = rowset_writer->add_block(&block);
+ EXPECT_TRUE(s.ok());
+ s = rowset_writer->flush();
+ EXPECT_EQ(Status::OK(), s);
+ sleep(1);
+ }
+
+ EXPECT_EQ(num_segments / 2 + num_segments % 2,
delete_bitmap->delete_bitmap.size());
+ EXPECT_EQ(Status::OK(), rowset_writer->build(rowset));
+ std::vector<std::string> ls;
+ ls.push_back("20050_0.dat");
+ ls.push_back("20050_1.dat");
+ ls.push_back("20050_2.dat");
+ ls.push_back("20050_3.dat");
+ ls.push_back("20050_4.dat");
+ ls.push_back("20050_5.dat");
+ ls.push_back("20050_6.dat");
+ ls.push_back("20050_7.dat");
+ EXPECT_TRUE(check_dir(ls));
+ EXPECT_EQ(num_segments / 2 + num_segments % 2,
delete_bitmap->delete_bitmap.size());
+
+
EXPECT_FALSE(static_cast<BetaRowsetWriter*>(rowset_writer.get())->is_segcompacted());
+ }
+
+ EXPECT_TRUE(check_data_read_with_delete_bitmap(tablet_schema,
delete_bitmap, rowset,
+ num_segments *
rows_per_segment,
+ rows_mark_deleted));
+}
+
+INSTANTIATE_TEST_SUITE_P(Params, SegCompactionMoWTest,
+ ::testing::ValuesIn(std::vector<std::string>
{"partial", "full"}));
+
+} // namespace doris
+
+// @brief Test Stub
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]