This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 90da65c7b87 [fix](block-reader) Make rowsets union iterating work
(#40877) (#43175)
90da65c7b87 is described below
commit 90da65c7b87e7fc0f070e8e354a792ce9a633f26
Author: Siyang Tang <[email protected]>
AuthorDate: Fri Nov 8 10:05:10 2024 +0800
[fix](block-reader) Make rowsets union iterating work (#40877) (#43175)
pick: #40877
---
be/src/olap/compaction.cpp | 4 +--
be/src/olap/merger.cpp | 26 +++---------------
be/src/olap/rowset/beta_rowset_reader.cpp | 6 +++++
be/src/olap/rowset/rowset.h | 8 ++++--
be/src/olap/rowset/rowset_reader_context.h | 2 ++
be/src/olap/tablet_reader.cpp | 1 +
be/src/olap/tablet_reader.h | 1 +
be/src/vec/olap/block_reader.cpp | 43 +++++++-----------------------
be/src/vec/olap/block_reader.h | 3 ++-
be/src/vec/olap/vcollect_iterator.cpp | 1 +
be/test/olap/rowid_conversion_test.cpp | 7 ++++-
11 files changed, 41 insertions(+), 61 deletions(-)
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index 782063331df..50edafec1c2 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -195,14 +195,14 @@ bool Compaction::is_rowset_tidy(std::string& pre_max_key,
const RowsetSharedPtr&
}
}
std::string min_key;
- auto ret = rhs->min_key(&min_key);
+ auto ret = rhs->first_key(&min_key);
if (!ret) {
return false;
}
if (min_key <= pre_max_key) {
return false;
}
- CHECK(rhs->max_key(&pre_max_key));
+ CHECK(rhs->last_key(&pre_max_key));
return true;
}
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index 08d328b7c42..8ed2f0e676f 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -89,6 +89,8 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet,
ReaderType reader_type,
if (stats_output && stats_output->rowid_conversion) {
reader_params.record_rowids = true;
+ reader_params.rowid_conversion = stats_output->rowid_conversion;
+
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
}
reader_params.return_columns.resize(cur_tablet_schema->num_columns());
@@ -96,17 +98,6 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet,
ReaderType reader_type,
reader_params.origin_return_columns = &reader_params.return_columns;
RETURN_IF_ERROR(reader.init(reader_params));
- if (reader_params.record_rowids) {
-
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
- // init segment rowid map for rowid conversion
- std::vector<uint32_t> segment_num_rows;
- for (auto& rs_split : reader_params.rs_splits) {
-
RETURN_IF_ERROR(rs_split.rs_reader->get_segment_num_rows(&segment_num_rows));
- stats_output->rowid_conversion->init_segment_map(
- rs_split.rs_reader->rowset()->rowset_id(),
segment_num_rows);
- }
- }
-
vectorized::Block block =
cur_tablet_schema->create_block(reader_params.return_columns);
size_t output_rows = 0;
bool eof = false;
@@ -251,6 +242,8 @@ Status Merger::vertical_compact_one_group(
if (is_key && stats_output && stats_output->rowid_conversion) {
reader_params.record_rowids = true;
+ reader_params.rowid_conversion = stats_output->rowid_conversion;
+
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
}
reader_params.return_columns = column_group;
@@ -258,17 +251,6 @@ Status Merger::vertical_compact_one_group(
reader_params.batch_size = batch_size;
RETURN_IF_ERROR(reader.init(reader_params, sample_info));
- if (reader_params.record_rowids) {
-
stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id());
- // init segment rowid map for rowid conversion
- std::vector<uint32_t> segment_num_rows;
- for (auto& rs_split : reader_params.rs_splits) {
-
RETURN_IF_ERROR(rs_split.rs_reader->get_segment_num_rows(&segment_num_rows));
- stats_output->rowid_conversion->init_segment_map(
- rs_split.rs_reader->rowset()->rowset_id(),
segment_num_rows);
- }
- }
-
vectorized::Block block =
tablet_schema->create_block(reader_params.return_columns);
size_t output_rows = 0;
bool eof = false;
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp
b/be/src/olap/rowset/beta_rowset_reader.cpp
index 5d9b391ef33..e8923522f2e 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -216,6 +216,12 @@ Status
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
for (size_t i = 0; i < segments.size(); i++) {
_segments_rows[i] = segments[i]->num_rows();
}
+ if (_read_context->record_rowids) {
+ // init segment rowid map for rowid conversion
+ std::vector<uint32_t> segment_num_rows;
+ RETURN_IF_ERROR(get_segment_num_rows(&segment_num_rows));
+
_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(),
segment_num_rows);
+ }
auto [seg_start, seg_end] = _segment_offsets;
if (seg_start == seg_end) {
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 7677015f2e0..fca55a4ce22 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -275,7 +275,9 @@ public:
_rowset_meta->get_segments_key_bounds(segments_key_bounds);
return Status::OK();
}
- bool min_key(std::string* min_key) {
+
+ // min key of the first segment
+ bool first_key(std::string* min_key) {
KeyBoundsPB key_bounds;
bool ret = _rowset_meta->get_first_segment_key_bound(&key_bounds);
if (!ret) {
@@ -284,7 +286,9 @@ public:
*min_key = key_bounds.min_key();
return true;
}
- bool max_key(std::string* max_key) {
+
+ // max key of the last segment
+ bool last_key(std::string* max_key) {
KeyBoundsPB key_bounds;
bool ret = _rowset_meta->get_last_segment_key_bound(&key_bounds);
if (!ret) {
diff --git a/be/src/olap/rowset/rowset_reader_context.h
b/be/src/olap/rowset/rowset_reader_context.h
index 8ac4cf55271..d681f9323c8 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -21,6 +21,7 @@
#include "io/io_common.h"
#include "olap/column_predicate.h"
#include "olap/olap_common.h"
+#include "olap/rowid_conversion.h"
#include "runtime/runtime_state.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
@@ -76,6 +77,7 @@ struct RowsetReaderContext {
bool enable_unique_key_merge_on_write = false;
const DeleteBitmap* delete_bitmap = nullptr;
bool record_rowids = false;
+ RowIdConversion* rowid_conversion;
bool is_vertical_compaction = false;
bool is_key_column_group = false;
const std::set<int32_t>* output_columns = nullptr;
diff --git a/be/src/olap/tablet_reader.cpp b/be/src/olap/tablet_reader.cpp
index 469da277967..17e23fb5165 100644
--- a/be/src/olap/tablet_reader.cpp
+++ b/be/src/olap/tablet_reader.cpp
@@ -254,6 +254,7 @@ Status TabletReader::_capture_rs_readers(const
ReaderParams& read_params) {
_reader_context.delete_bitmap = read_params.delete_bitmap;
_reader_context.enable_unique_key_merge_on_write =
tablet()->enable_unique_key_merge_on_write();
_reader_context.record_rowids = read_params.record_rowids;
+ _reader_context.rowid_conversion = read_params.rowid_conversion;
_reader_context.is_key_column_group = read_params.is_key_column_group;
_reader_context.remaining_conjunct_roots =
read_params.remaining_conjunct_roots;
_reader_context.common_expr_ctxs_push_down =
read_params.common_expr_ctxs_push_down;
diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h
index d4151f144f1..d0329ee3614 100644
--- a/be/src/olap/tablet_reader.h
+++ b/be/src/olap/tablet_reader.h
@@ -158,6 +158,7 @@ public:
// used for compaction to record row ids
bool record_rowids = false;
+ RowIdConversion* rowid_conversion;
// flag for enable topn opt
bool use_topn_opt = false;
std::vector<int> topn_filter_source_node_ids;
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index d726e84a520..0df19961ee3 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -71,54 +71,31 @@ Status BlockReader::next_block_with_aggregation(Block*
block, bool* eof) {
return res;
}
-bool BlockReader::_rowsets_overlapping(const ReaderParams& read_params) {
- std::string cur_max_key;
+bool BlockReader::_rowsets_mono_asc_disjoint(const ReaderParams& read_params) {
+ std::string cur_rs_last_key;
const std::vector<RowSetSplits>& rs_splits = read_params.rs_splits;
for (const auto& rs_split : rs_splits) {
- // version 0-1 of every tablet is empty, just skip this rowset
- if (rs_split.rs_reader->rowset()->version().second == 1) {
- continue;
- }
if (rs_split.rs_reader->rowset()->num_rows() == 0) {
continue;
}
if (rs_split.rs_reader->rowset()->is_segments_overlapping()) {
return true;
}
- std::string min_key;
- bool has_min_key = rs_split.rs_reader->rowset()->min_key(&min_key);
- if (!has_min_key) {
+ std::string rs_first_key;
+ bool has_first_key =
rs_split.rs_reader->rowset()->first_key(&rs_first_key);
+ if (!has_first_key) {
return true;
}
- if (min_key <= cur_max_key) {
+ if (rs_first_key <= cur_rs_last_key) {
return true;
}
- CHECK(rs_split.rs_reader->rowset()->max_key(&cur_max_key));
+ bool has_last_key =
rs_split.rs_reader->rowset()->last_key(&cur_rs_last_key);
+ CHECK(has_last_key);
}
- for (const auto& rs_reader : rs_splits) {
- // version 0-1 of every tablet is empty, just skip this rowset
- if (rs_reader.rs_reader->rowset()->version().second == 1) {
- continue;
- }
- if (rs_reader.rs_reader->rowset()->num_rows() == 0) {
- continue;
- }
- if (rs_reader.rs_reader->rowset()->is_segments_overlapping()) {
- return true;
- }
- std::string min_key;
- bool has_min_key = rs_reader.rs_reader->rowset()->min_key(&min_key);
- if (!has_min_key) {
- return true;
- }
- if (min_key <= cur_max_key) {
- return true;
- }
- CHECK(rs_reader.rs_reader->rowset()->max_key(&cur_max_key));
- }
return false;
}
+
Status BlockReader::_init_collect_iter(const ReaderParams& read_params) {
auto res = _capture_rs_readers(read_params);
if (!res.ok()) {
@@ -130,7 +107,7 @@ Status BlockReader::_init_collect_iter(const ReaderParams&
read_params) {
return res;
}
// check if rowsets are noneoverlapping
- _is_rowsets_overlapping = _rowsets_overlapping(read_params);
+ _is_rowsets_overlapping = _rowsets_mono_asc_disjoint(read_params);
_vcollect_iter.init(this, _is_rowsets_overlapping,
read_params.read_orderby_key,
read_params.read_orderby_key_reverse);
diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h
index 6f9792929db..f33fe743109 100644
--- a/be/src/vec/olap/block_reader.h
+++ b/be/src/vec/olap/block_reader.h
@@ -86,7 +86,8 @@ private:
bool _get_next_row_same();
- bool _rowsets_overlapping(const ReaderParams& read_params);
+ // return true if keys of rowsets are mono ascending and disjoint
+ bool _rowsets_mono_asc_disjoint(const ReaderParams& read_params);
VCollectIterator _vcollect_iter;
IteratorRowRef _next_row {{}, -1, false};
diff --git a/be/src/vec/olap/vcollect_iterator.cpp
b/be/src/vec/olap/vcollect_iterator.cpp
index e356c3b1896..a020119924d 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -499,6 +499,7 @@ Status
VCollectIterator::Level0Iterator::refresh_current_row() {
if (_block == nullptr && !_get_data_by_ref) {
_block = std::make_shared<Block>(_schema.create_block(
_reader->_return_columns,
_reader->_tablet_columns_convert_to_null_set));
+ _ref.block = _block;
}
if (!_is_empty() && _current_valid()) {
diff --git a/be/test/olap/rowid_conversion_test.cpp
b/be/test/olap/rowid_conversion_test.cpp
index 658b104493f..a9c966acf36 100644
--- a/be/test/olap/rowid_conversion_test.cpp
+++ b/be/test/olap/rowid_conversion_test.cpp
@@ -452,7 +452,12 @@ protected:
int64_t c1 = j * rows_per_segment + n;
// There are 500 rows of data overlap between rowsets
if (i > 0) {
- c1 += i * num_segments * rows_per_segment - 500;
+ if (is_overlap) {
+ // There are 500 rows of data overlap between
rowsets
+ c1 -= 500;
+ } else {
+ ++c1;
+ }
}
if (is_overlap && j > 0) {
// There are 10 rows of data overlap between segments
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]