This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
new aafce71250 [fix](merge-on-write) fix that the query result has
duplicate keys (#16336)
aafce71250 is described below
commit aafce71250cdd20f008655848ad19c30c20d6703
Author: Xin Liao <[email protected]>
AuthorDate: Mon Feb 6 17:09:53 2023 +0800
[fix](merge-on-write) fix that the query result has duplicate keys (#16336)
* [fix](merge-on-write) fix that the query result has duplicate keys
* add ut
---
be/src/olap/compaction.cpp | 4 ++++
be/src/olap/merger.cpp | 4 ++++
be/src/olap/rowset/beta_rowset_writer.cpp | 5 ++--
be/src/olap/rowset/beta_rowset_writer.h | 3 ++-
be/src/olap/rowset/vertical_beta_rowset_writer.cpp | 4 ++++
be/test/olap/rowid_conversion_test.cpp | 27 +++++++++++++++-------
6 files changed, 35 insertions(+), 12 deletions(-)
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 3856c6f85d..c7029e9722 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -250,6 +250,10 @@ bool Compaction::handle_ordered_data_compaction() {
if (!config::enable_ordered_data_compaction) {
return false;
}
+ if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
+ _tablet->enable_unique_key_merge_on_write()) {
+ return false;
+ }
// check delete version: if compaction type is base compaction and
// has a delete version, use original compaction
if (compaction_type() == ReaderType::READER_BASE_COMPACTION) {
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index ac75675496..58d9df0158 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -267,6 +267,10 @@ Status Merger::vertical_compact_one_group(
}
reader_params.tablet_schema = merge_tablet_schema;
+ if (tablet->enable_unique_key_merge_on_write()) {
+ reader_params.delete_bitmap = &tablet->tablet_meta()->delete_bitmap();
+ }
+
reader_params.return_columns = column_group;
reader_params.origin_return_columns = &reader_params.return_columns;
RETURN_NOT_OK(reader.init(reader_params));
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp
b/be/src/olap/rowset/beta_rowset_writer.cpp
index e64d4f97ee..081df80633 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -830,13 +830,11 @@ void
BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_met
int64_t num_rows_written = 0;
int64_t total_data_size = 0;
int64_t total_index_size = 0;
- std::vector<uint32_t> segment_num_rows;
std::vector<KeyBoundsPB> segments_encoded_key_bounds;
{
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
for (const auto& itr : _segid_statistics_map) {
num_rows_written += itr.second.row_num;
- segment_num_rows.push_back(itr.second.row_num);
total_data_size += itr.second.data_size;
total_index_size += itr.second.index_size;
segments_encoded_key_bounds.push_back(itr.second.key_bounds);
@@ -851,7 +849,6 @@ void
BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_met
}
rowset_meta->set_num_segments(num_seg);
- _segment_num_rows = segment_num_rows;
// TODO(zhangzhengyu): key_bounds.size() should equal num_seg, but
currently not always
rowset_meta->set_num_rows(num_rows_written + _num_rows_written);
rowset_meta->set_total_disk_size(total_data_size + _total_data_size);
@@ -992,6 +989,8 @@ Status
BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::Segme
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
CHECK_EQ(_segid_statistics_map.find(segid) ==
_segid_statistics_map.end(), true);
_segid_statistics_map.emplace(segid, segstat);
+ _segment_num_rows.resize(_num_segment);
+ _segment_num_rows[_num_segment - 1] = row_num;
}
VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid << "
row_num:" << row_num
<< " data_size:" << segment_size << " index_size:" <<
index_size;
diff --git a/be/src/olap/rowset/beta_rowset_writer.h
b/be/src/olap/rowset/beta_rowset_writer.h
index 754491bc82..6f32b24f08 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -143,7 +143,8 @@ protected:
std::unique_ptr<segment_v2::SegmentWriter> _segment_writer;
mutable SpinLock _lock; // protect following vectors.
- // record rows number of every segment
+ // record rows number of every segment already written, using for rowid
+ // conversion when compaction in unique key with MoW model
std::vector<uint32_t> _segment_num_rows;
std::vector<io::FileWriterPtr> _file_writers;
// for unique key table with merge-on-write
diff --git a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
index edcd3f13a3..3c292edefe 100644
--- a/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/vertical_beta_rowset_writer.cpp
@@ -64,6 +64,10 @@ Status VerticalBetaRowsetWriter::add_columns(const
vectorized::Block* block,
if (_segment_writers[_cur_writer_idx]->num_rows_written() >
max_rows_per_segment) {
// segment is full, need flush columns and create new segment
writer
RETURN_IF_ERROR(_flush_columns(&_segment_writers[_cur_writer_idx],
true));
+
+ _segment_num_rows.resize(_cur_writer_idx + 1);
+ _segment_num_rows[_cur_writer_idx] =
_segment_writers[_cur_writer_idx]->row_count();
+
std::unique_ptr<segment_v2::SegmentWriter> writer;
RETURN_IF_ERROR(_create_segment_writer(col_ids, is_key, &writer));
_segment_writers.emplace_back(std::move(writer));
diff --git a/be/test/olap/rowid_conversion_test.cpp
b/be/test/olap/rowid_conversion_test.cpp
index a08f17f4c4..2de735dd5d 100644
--- a/be/test/olap/rowid_conversion_test.cpp
+++ b/be/test/olap/rowid_conversion_test.cpp
@@ -355,14 +355,6 @@ protected:
input_rowsets.push_back(rowset);
}
- // create input rowset reader
- vector<RowsetReaderSharedPtr> input_rs_readers;
- for (auto& rowset : input_rowsets) {
- RowsetReaderSharedPtr rs_reader;
- EXPECT_TRUE(rowset->create_reader(&rs_reader).ok());
- input_rs_readers.push_back(std::move(rs_reader));
- }
-
// create output rowset writer
RowsetWriterContext writer_context;
create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 3456,
&writer_context);
@@ -374,6 +366,19 @@ protected:
TabletSharedPtr tablet =
create_tablet(*tablet_schema, enable_unique_key_merge_on_write,
output_rs_writer->version().first - 1,
has_delete_handler);
+ if (enable_unique_key_merge_on_write) {
+
tablet->tablet_meta()->delete_bitmap().add({input_rowsets[0]->rowset_id(), 0,
0}, 0);
+
tablet->tablet_meta()->delete_bitmap().add({input_rowsets[0]->rowset_id(), 0,
0}, 3);
+ }
+
+ // create input rowset reader
+ vector<RowsetReaderSharedPtr> input_rs_readers;
+ for (auto& rowset : input_rowsets) {
+ RowsetReaderSharedPtr rs_reader;
+ EXPECT_TRUE(rowset->create_reader(&rs_reader).ok());
+ input_rs_readers.push_back(std::move(rs_reader));
+ }
+
Merger::Statistics stats;
RowIdConversion rowid_conversion;
stats.rowid_conversion = &rowid_conversion;
@@ -424,10 +429,16 @@ protected:
RowLocation src(input_rowsets[rs_id]->rowset_id(), s_id,
row_id);
RowLocation dst;
int res = rowid_conversion.get(src, &dst);
+ // key deleted by delete bitmap
+ if (enable_unique_key_merge_on_write && rs_id == 0 && s_id
== 0 &&
+ (row_id == 0 || row_id == 3)) {
+ EXPECT_LT(res, 0);
+ }
if (res < 0) {
continue;
}
size_t rowid_in_output_data = dst.row_id;
+ EXPECT_GT(segment_num_rows[dst.segment_id], dst.row_id);
for (auto n = 1; n <= dst.segment_id; n++) {
rowid_in_output_data += segment_num_rows[n - 1];
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]