This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new d1449e6b980 Pick "[Bug](ScanNode) Fix potential incorrect query result
caused by concurrent NewOlapScanNode initialization and Compaction (#24638)"
(#25127)
d1449e6b980 is described below
commit d1449e6b9800f51dd5b3f849f81893efb5a005bf
Author: plat1ko <[email protected]>
AuthorDate: Tue Oct 17 09:37:06 2023 +0800
Pick "[Bug](ScanNode) Fix potential incorrect query result caused by
concurrent NewOlapScanNode initialization and Compaction (#24638)" (#25127)
---
be/src/olap/merger.cpp | 46 +++--
be/src/olap/reader.cpp | 24 ++-
be/src/olap/reader.h | 13 +-
be/src/olap/schema_change.cpp | 13 +-
be/src/olap/tablet.cpp | 4 -
be/src/olap/tablet.h | 5 -
be/src/olap/tablet_meta.cpp | 24 ---
be/src/olap/tablet_meta.h | 2 -
be/src/olap/tablet_schema.cpp | 6 +-
be/src/olap/tablet_schema.h | 2 +-
be/src/vec/exec/scan/new_olap_scan_node.cpp | 73 +++----
be/src/vec/exec/scan/new_olap_scanner.cpp | 70 +++----
be/src/vec/exec/scan/new_olap_scanner.h | 3 +-
be/test/olap/delete_handler_test.cpp | 22 ++-
be/test/olap/rowid_conversion_test.cpp | 167 +++++++---------
be/test/vec/olap/vertical_compaction_test.cpp | 262 ++++++++++++--------------
16 files changed, 339 insertions(+), 397 deletions(-)
diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp
index 587d5326a2e..734b81794fb 100644
--- a/be/src/olap/merger.cpp
+++ b/be/src/olap/merger.cpp
@@ -60,24 +60,23 @@ Status Merger::vmerge_rowsets(TabletSharedPtr tablet,
ReaderType reader_type,
TabletReader::ReaderParams reader_params;
reader_params.tablet = tablet;
reader_params.reader_type = reader_type;
- reader_params.rs_splits.reserve(src_rowset_readers.size());
+
+ TabletReader::ReadSource read_source;
+ read_source.rs_splits.reserve(src_rowset_readers.size());
for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) {
- reader_params.rs_splits.emplace_back(RowSetSplits(rs_reader));
+ read_source.rs_splits.emplace_back(RowSetSplits(rs_reader));
}
+ read_source.fill_delete_predicates();
+ reader_params.set_read_source(std::move(read_source));
+
reader_params.version = dst_rowset_writer->version();
TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
merge_tablet_schema->copy_from(*cur_tablet_schema);
- {
- std::shared_lock rdlock(tablet->get_header_lock());
- auto delete_preds = tablet->delete_predicates();
- std::copy(delete_preds.cbegin(), delete_preds.cend(),
- std::inserter(reader_params.delete_predicates,
- reader_params.delete_predicates.begin()));
- // Merge the columns in delete predicate that not in latest schema in
to current tablet schema
- for (auto& del_pred_rs : reader_params.delete_predicates) {
-
merge_tablet_schema->merge_dropped_columns(del_pred_rs->tablet_schema());
- }
+
+ // Merge the columns in delete predicate that not in latest schema in to
current tablet schema
+ for (auto& del_pred_rs : reader_params.delete_predicates) {
+
merge_tablet_schema->merge_dropped_columns(*del_pred_rs->tablet_schema());
}
reader_params.tablet_schema = merge_tablet_schema;
@@ -196,25 +195,24 @@ Status Merger::vertical_compact_one_group(
reader_params.is_key_column_group = is_key;
reader_params.tablet = tablet;
reader_params.reader_type = reader_type;
- reader_params.rs_splits.reserve(src_rowset_readers.size());
+
+ TabletReader::ReadSource read_source;
+ read_source.rs_splits.reserve(src_rowset_readers.size());
for (const RowsetReaderSharedPtr& rs_reader : src_rowset_readers) {
- reader_params.rs_splits.emplace_back(RowSetSplits(rs_reader));
+ read_source.rs_splits.emplace_back(RowSetSplits(rs_reader));
}
+ read_source.fill_delete_predicates();
+ reader_params.set_read_source(std::move(read_source));
+
reader_params.version = dst_rowset_writer->version();
TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
merge_tablet_schema->copy_from(*tablet_schema);
- {
- std::shared_lock rdlock(tablet->get_header_lock());
- auto delete_preds = tablet->delete_predicates();
- std::copy(delete_preds.cbegin(), delete_preds.cend(),
- std::inserter(reader_params.delete_predicates,
- reader_params.delete_predicates.begin()));
-
- for (auto& del_pred_rs : reader_params.delete_predicates) {
-
merge_tablet_schema->merge_dropped_columns(del_pred_rs->tablet_schema());
- }
+
+ for (auto& del_pred_rs : reader_params.delete_predicates) {
+
merge_tablet_schema->merge_dropped_columns(*del_pred_rs->tablet_schema());
}
+
reader_params.tablet_schema = merge_tablet_schema;
if (is_key && stats_output && stats_output->rowid_conversion) {
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 573570bbcbc..43cdf4e637d 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -98,6 +98,16 @@ std::string TabletReader::KeysParam::to_string() const {
return ss.str();
}
+void TabletReader::ReadSource::fill_delete_predicates() {
+ DCHECK_EQ(delete_predicates.size(), 0);
+ for (auto&& split : rs_splits) {
+ auto& rs_meta = split.rs_reader->rowset()->rowset_meta();
+ if (rs_meta->has_delete_predicate()) {
+ delete_predicates.push_back(rs_meta);
+ }
+ }
+}
+
TabletReader::~TabletReader() {
VLOG_NOTICE << "merged rows:" << _merged_rows;
_delete_handler.finalize();
@@ -630,11 +640,14 @@ Status TabletReader::init_reader_params_and_create_block(
reader_params->version =
Version(input_rowsets.front()->start_version(),
input_rowsets.back()->end_version());
+ ReadSource read_source;
for (auto& rowset : input_rowsets) {
RowsetReaderSharedPtr rs_reader;
RETURN_IF_ERROR(rowset->create_reader(&rs_reader));
- reader_params->rs_splits.push_back(RowSetSplits(std::move(rs_reader)));
+ read_source.rs_splits.push_back(RowSetSplits(std::move(rs_reader)));
}
+ read_source.fill_delete_predicates();
+ reader_params->set_read_source(std::move(read_source));
std::vector<RowsetMetaSharedPtr> rowset_metas(input_rowsets.size());
std::transform(input_rowsets.begin(), input_rowsets.end(),
rowset_metas.begin(),
@@ -644,14 +657,9 @@ Status TabletReader::init_reader_params_and_create_block(
TabletSchemaSPtr merge_tablet_schema = std::make_shared<TabletSchema>();
merge_tablet_schema->copy_from(*read_tablet_schema);
- auto& delete_preds = tablet->delete_predicates();
- std::copy(delete_preds.cbegin(), delete_preds.cend(),
- std::inserter(reader_params->delete_predicates,
- reader_params->delete_predicates.begin()));
-
// Merge the columns in delete predicate that not in latest schema in to
current tablet schema
- for (auto& del_pred_pb : reader_params->delete_predicates) {
-
merge_tablet_schema->merge_dropped_columns(tablet->tablet_schema(del_pred_pb->version()));
+ for (auto& del_pred : reader_params->delete_predicates) {
+ merge_tablet_schema->merge_dropped_columns(*del_pred->tablet_schema());
}
reader_params->tablet_schema = merge_tablet_schema;
if (tablet->enable_unique_key_merge_on_write()) {
diff --git a/be/src/olap/reader.h b/be/src/olap/reader.h
index 249c1997811..b053f710f05 100644
--- a/be/src/olap/reader.h
+++ b/be/src/olap/reader.h
@@ -90,6 +90,12 @@ class TabletReader {
};
public:
+ struct ReadSource {
+ std::vector<RowSetSplits> rs_splits;
+ std::vector<RowsetMetaSharedPtr> delete_predicates;
+ // Fill delete predicates with `rs_splits`
+ void fill_delete_predicates();
+ };
// Params for Reader,
// mainly include tablet, data version and fetch range.
struct ReaderParams {
@@ -103,6 +109,11 @@ public:
!rs_splits[1].rs_reader->rowset()->rowset_meta()->is_segments_overlapping());
}
+ void set_read_source(ReadSource read_source) {
+ rs_splits = std::move(read_source.rs_splits);
+ delete_predicates = std::move(read_source.delete_predicates);
+ }
+
TabletSharedPtr tablet;
TabletSchemaSPtr tablet_schema;
ReaderType reader_type = ReaderType::READER_QUERY;
@@ -126,9 +137,9 @@ public:
std::vector<FunctionFilter> function_filters;
std::vector<RowsetMetaSharedPtr> delete_predicates;
+ std::vector<RowSetSplits> rs_splits;
// For unique key table with merge-on-write
DeleteBitmap* delete_bitmap {nullptr};
- std::vector<RowSetSplits> rs_splits;
// return_columns is init from query schema
std::vector<uint32_t> return_columns;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 9650e1a400e..33a9f6fee87 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -822,15 +822,16 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
versions_to_be_changed.size(), rs_splits.size());
break;
}
- auto& all_del_preds = base_tablet->delete_predicates();
- for (auto& delete_pred : all_del_preds) {
- if (delete_pred->version().first > end_version) {
+ std::vector<RowsetMetaSharedPtr> del_preds;
+ for (auto&& split : rs_splits) {
+ auto& rs_meta = split.rs_reader->rowset()->rowset_meta();
+ if (!rs_meta->has_delete_predicate() ||
rs_meta->start_version() > end_version) {
continue;
}
- base_tablet_schema->merge_dropped_columns(
- base_tablet->tablet_schema(delete_pred->version()));
+
base_tablet_schema->merge_dropped_columns(*rs_meta->tablet_schema());
+ del_preds.push_back(rs_meta);
}
- res = delete_handler.init(base_tablet_schema, all_del_preds,
end_version);
+ res = delete_handler.init(base_tablet_schema, del_preds,
end_version);
if (!res) {
LOG(WARNING) << "init delete handler failed. base_tablet="
<< base_tablet->full_name() << ", end_version="
<< end_version;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 1f46cf2b99f..2ed3571575b 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -979,10 +979,6 @@ Status Tablet::capture_rs_readers(const
std::vector<Version>& version_path,
return Status::OK();
}
-bool Tablet::version_for_delete_predicate(const Version& version) {
- return _tablet_meta->version_for_delete_predicate(version);
-}
-
bool Tablet::can_do_compaction(size_t path_hash, CompactionType
compaction_type) {
if (compaction_type == CompactionType::BASE_COMPACTION && tablet_state()
!= TABLET_RUNNING) {
// base compaction can only be done for tablet in TABLET_RUNNING state.
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index bbdeb411f44..d57b043857d 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -191,11 +191,6 @@ public:
Status capture_rs_readers(const std::vector<Version>& version_path,
std::vector<RowSetSplits>* rs_splits) const;
- const std::vector<RowsetMetaSharedPtr> delete_predicates() {
- return _tablet_meta->delete_predicates();
- }
- bool version_for_delete_predicate(const Version& version);
-
// meta lock
std::shared_mutex& get_header_lock() { return _meta_lock; }
std::mutex& get_rowset_update_lock() { return _rowset_update_lock; }
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 58e2876d81a..b407e35a449 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -844,30 +844,6 @@ RowsetMetaSharedPtr
TabletMeta::acquire_stale_rs_meta_by_version(const Version&
return nullptr;
}
-const std::vector<RowsetMetaSharedPtr> TabletMeta::delete_predicates() const {
- std::vector<RowsetMetaSharedPtr> res;
- for (auto& del_pred : _rs_metas) {
- if (del_pred->has_delete_predicate()) {
- res.push_back(del_pred);
- }
- }
- return res;
-}
-
-bool TabletMeta::version_for_delete_predicate(const Version& version) {
- if (version.first != version.second) {
- return false;
- }
-
- for (auto& del_pred : _rs_metas) {
- if (del_pred->version().first == version.first &&
del_pred->has_delete_predicate()) {
- return true;
- }
- }
-
- return false;
-}
-
std::string TabletMeta::full_name() const {
std::stringstream ss;
ss << _tablet_id << "." << _schema_hash << "." << _tablet_uid.to_string();
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 42fec6489b2..11dc3532514 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -191,8 +191,6 @@ public:
RowsetMetaSharedPtr acquire_rs_meta_by_version(const Version& version)
const;
void delete_stale_rs_meta_by_version(const Version& version);
RowsetMetaSharedPtr acquire_stale_rs_meta_by_version(const Version&
version) const;
- const std::vector<RowsetMetaSharedPtr> delete_predicates() const;
- bool version_for_delete_predicate(const Version& version);
std::string full_name() const;
diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp
index 9fc1c2eb0f3..b93f2fb849f 100644
--- a/be/src/olap/tablet_schema.cpp
+++ b/be/src/olap/tablet_schema.cpp
@@ -815,12 +815,12 @@ void TabletSchema::build_current_tablet_schema(int64_t
index_id, int32_t version
}
}
-void TabletSchema::merge_dropped_columns(TabletSchemaSPtr src_schema) {
+void TabletSchema::merge_dropped_columns(const TabletSchema& src_schema) {
// If they are the same tablet schema object, then just return
- if (this == src_schema.get()) {
+ if (this == &src_schema) {
return;
}
- for (const auto& src_col : src_schema->columns()) {
+ for (const auto& src_col : src_schema.columns()) {
if (_field_id_to_index.find(src_col.unique_id()) ==
_field_id_to_index.end()) {
CHECK(!src_col.is_key()) << src_col.name() << " is key column,
should not be dropped.";
ColumnPB src_col_pb;
diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h
index 715af14191e..d04b009cee6 100644
--- a/be/src/olap/tablet_schema.h
+++ b/be/src/olap/tablet_schema.h
@@ -291,7 +291,7 @@ public:
// 7. insert value 4, 5
// Then the read schema should be ColA, ColB, ColB' because the delete
predicate need ColB to remove related data.
// Because they have same name, so that the dropped column should not be
added to the map, only with unique id.
- void merge_dropped_columns(std::shared_ptr<TabletSchema> src_schema);
+ void merge_dropped_columns(const TabletSchema& src_schema);
bool is_dropped_column(const TabletColumn& col) const;
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 13b8e92119a..5aa179d3006 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -38,6 +38,7 @@
#include "common/object_pool.h"
#include "common/status.h"
#include "exec/exec_node.h"
+#include "olap/reader.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_reader.h"
#include "olap/storage_engine.h"
@@ -446,15 +447,16 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
bool is_dup_mow_key = false;
size_t segment_count = 0;
- std::vector<std::vector<RowSetSplits>>
rowset_splits_vector(_scan_ranges.size());
- std::vector<std::vector<size_t>> tablet_rs_seg_count(_scan_ranges.size());
+ std::vector<TabletReader::ReadSource> tablets_read_source;
+ tablets_read_source.reserve(_scan_ranges.size());
+ std::vector<std::vector<size_t>> tablet_rs_seg_count;
+ tablet_rs_seg_count.reserve(_scan_ranges.size());
// Split tablet segment by scanner, only use in pipeline in duplicate key
// 1. if tablet count lower than scanner thread num, count segment num of
all tablet ready for scan
// TODO: some tablet may do not have segment, may need split segment all
case
if (_shared_scan_opt && _scan_ranges.size() <
config::doris_scanner_thread_pool_thread_num) {
- for (int i = 0; i < _scan_ranges.size(); ++i) {
- auto& scan_range = _scan_ranges[i];
+ for (auto&& scan_range : _scan_ranges) {
auto tablet_id = scan_range->tablet_id;
auto [tablet, status] =
StorageEngine::instance()->tablet_manager()->get_tablet_and_status(tablet_id,
@@ -472,24 +474,25 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
std::from_chars(scan_range->version.c_str(),
scan_range->version.c_str() +
scan_range->version.size(), version);
- std::shared_lock rdlock(tablet->get_header_lock());
- // acquire tablet rowset readers at the beginning of the scan node
- // to prevent this case: when there are lots of olap scanners to
run for example 10000
- // the rowsets maybe compacted when the last olap scanner starts
- Status acquire_reader_st =
- tablet->capture_rs_readers({0, version},
&rowset_splits_vector[i]);
- if (!acquire_reader_st.ok()) {
- LOG(WARNING) << "fail to init reader.res=" <<
acquire_reader_st;
- std::stringstream ss;
- ss << "failed to initialize storage reader. tablet=" <<
tablet->full_name()
- << ", res=" << acquire_reader_st
- << ", backend=" << BackendOptions::get_localhost();
- return Status::InternalError(ss.str());
+ auto& read_source = tablets_read_source.emplace_back();
+ {
+ std::shared_lock rdlock(tablet->get_header_lock());
+ auto st = tablet->capture_rs_readers({0, version},
&read_source.rs_splits);
+ if (!st.ok()) {
+ LOG(WARNING) << "fail to init reader.res=" << st;
+ return Status::InternalError(
+ "failed to initialize storage reader. tablet_id={}
: {}",
+ tablet->tablet_id(), st.to_string());
+ }
+ }
+ if (!_state->skip_delete_predicate()) {
+ read_source.fill_delete_predicates();
}
- for (const auto& rowset_splits : rowset_splits_vector[i]) {
+ auto& rs_seg_count = tablet_rs_seg_count.emplace_back();
+ for (const auto& rowset_splits : read_source.rs_splits) {
auto num_segments =
rowset_splits.rs_reader->rowset()->num_segments();
- tablet_rs_seg_count[i].emplace_back(num_segments);
+ rs_seg_count.emplace_back(num_segments);
segment_count += num_segments;
}
}
@@ -498,14 +501,14 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
if (is_dup_mow_key) {
auto build_new_scanner = [&](const TPaloScanRange& scan_range,
const std::vector<OlapScanRange*>&
key_ranges,
- const std::vector<RowSetSplits>&
rs_splits) {
+ TabletReader::ReadSource read_source) {
std::shared_ptr<NewOlapScanner> scanner =
NewOlapScanner::create_shared(
_state, this, _limit_per_scanner,
_olap_scan_node.is_preaggregation, scan_range,
- key_ranges, rs_splits, _scanner_profile.get());
+ key_ranges, std::move(read_source),
_scanner_profile.get());
RETURN_IF_ERROR(scanner->prepare(_state, _conjuncts));
scanner->set_compound_filters(_compound_filters);
- scanners->push_back(scanner);
+ scanners->push_back(std::move(scanner));
return Status::OK();
};
// 2. Split segment evenly to each scanner (e.g. each scanner need to
scan `avg_segment_count_per_scanner` segments)
@@ -527,6 +530,7 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
size_t segment_idx_to_scan = 0;
size_t num_segments_assigned = 0;
+ auto& read_source = tablets_read_source[i];
std::vector<RowSetSplits> rs_splits;
while (rowset_idx < rs_seg_count.size()) {
@@ -538,35 +542,35 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
}
const auto max_add_seg_nums = rs_seg_count[rowset_idx] -
segment_idx_to_scan;
- rs_splits.emplace_back(RowSetSplits());
- rs_splits.back().rs_reader =
rowset_splits_vector[i][rowset_idx].rs_reader->clone();
+ auto& split = rs_splits.emplace_back();
+ split.rs_reader =
read_source.rs_splits[rowset_idx].rs_reader->clone();
// if segments assigned to current scanner are already more
than the average count,
// this scanner will just scan segments equal to the average
count
if (num_segments_assigned + max_add_seg_nums >
avg_segment_count_by_scanner) {
auto need_add_seg_nums = avg_segment_count_by_scanner -
num_segments_assigned;
- rs_splits.back().segment_offsets = {
+ split.segment_offsets = {
segment_idx_to_scan,
segment_idx_to_scan + need_add_seg_nums}; // only
scan need_add_seg_nums
- RETURN_IF_ERROR(build_new_scanner(*scan_range,
scanner_ranges, rs_splits));
+ RETURN_IF_ERROR(build_new_scanner(
+ *scan_range, scanner_ranges,
+ {std::move(rs_splits),
read_source.delete_predicates}));
segment_idx_to_scan += need_add_seg_nums;
num_segments_assigned = 0;
- rs_splits.clear();
} else if (num_segments_assigned + max_add_seg_nums ==
avg_segment_count_by_scanner) {
- rs_splits.back().segment_offsets = {segment_idx_to_scan,
-
rs_seg_count[rowset_idx]};
- RETURN_IF_ERROR(build_new_scanner(*scan_range,
scanner_ranges, rs_splits));
+ split.segment_offsets = {segment_idx_to_scan,
rs_seg_count[rowset_idx]};
+ RETURN_IF_ERROR(build_new_scanner(
+ *scan_range, scanner_ranges,
+ {std::move(rs_splits),
read_source.delete_predicates}));
segment_idx_to_scan = 0;
num_segments_assigned = 0;
- rs_splits.clear();
rowset_idx++;
} else {
- rs_splits.back().segment_offsets = {segment_idx_to_scan,
-
rs_seg_count[rowset_idx]};
+ split.segment_offsets = {segment_idx_to_scan,
rs_seg_count[rowset_idx]};
segment_idx_to_scan = 0;
num_segments_assigned += max_add_seg_nums;
@@ -583,7 +587,8 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
// dispose some segment tail
if (!rs_splits.empty()) {
- build_new_scanner(*scan_range, scanner_ranges, rs_splits);
+ build_new_scanner(*scan_range, scanner_ranges,
+ {std::move(rs_splits),
read_source.delete_predicates});
}
}
} else {
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 232493b66f4..ea4187e556a 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -60,6 +60,8 @@
namespace doris::vectorized {
+using ReadSource = TabletReader::ReadSource;
+
NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent,
int64_t limit,
bool aggregation, const TPaloScanRange&
scan_range,
const std::vector<OlapScanRange*>& key_ranges,
@@ -76,13 +78,13 @@ NewOlapScanner::NewOlapScanner(RuntimeState* state,
NewOlapScanNode* parent, int
NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent,
int64_t limit,
bool aggregation, const TPaloScanRange&
scan_range,
const std::vector<OlapScanRange*>& key_ranges,
- const std::vector<RowSetSplits>& rs_splits,
RuntimeProfile* profile)
+ ReadSource read_source, RuntimeProfile* profile)
: VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
_aggregation(aggregation),
_version(-1),
_scan_range(scan_range),
_key_ranges(key_ranges) {
- _tablet_reader_params.rs_splits = rs_splits;
+ _tablet_reader_params.set_read_source(std::move(read_source));
_tablet_schema = std::make_shared<TabletSchema>();
_is_init = false;
}
@@ -175,38 +177,33 @@ Status NewOlapScanner::init() {
}
}
- {
- std::shared_lock rdlock(_tablet->get_header_lock());
- if (_tablet_reader_params.rs_splits.empty()) {
- const RowsetSharedPtr rowset =
_tablet->rowset_with_max_version();
- if (rowset == nullptr) {
- std::stringstream ss;
- ss << "fail to get latest version of tablet: " <<
tablet_id;
- LOG(WARNING) << ss.str();
- return Status::InternalError(ss.str());
- }
-
- // acquire tablet rowset readers at the beginning of the scan
node
- // to prevent this case: when there are lots of olap scanners
to run for example 10000
- // the rowsets maybe compacted when the last olap scanner
starts
- Version rd_version(0, _version);
- Status acquire_reader_st =
- _tablet->capture_rs_readers(rd_version,
&_tablet_reader_params.rs_splits);
- if (!acquire_reader_st.ok()) {
- LOG(WARNING) << "fail to init reader.res=" <<
acquire_reader_st;
- std::stringstream ss;
- ss << "failed to initialize storage reader. tablet=" <<
_tablet->full_name()
- << ", res=" << acquire_reader_st
- << ", backend=" << BackendOptions::get_localhost();
- return Status::InternalError(ss.str());
+ if (_tablet_reader_params.rs_splits.empty()) {
+ // Non-pipeline mode, Tablet : Scanner = 1 : 1
+ // acquire tablet rowset readers at the beginning of the scan node
+ // to prevent this case: when there are lots of olap scanners to
run for example 10000
+ // the rowsets maybe compacted when the last olap scanner starts
+ Version rd_version(0, _version);
+ ReadSource read_source;
+ {
+ std::shared_lock rdlock(_tablet->get_header_lock());
+ auto st = _tablet->capture_rs_readers(rd_version,
&read_source.rs_splits);
+ if (!st.ok()) {
+ LOG(WARNING) << "fail to init reader.res=" << st;
+ return Status::InternalError(
+ "failed to initialize storage reader. tablet_id={}
: {}",
+ _tablet->tablet_id(), st.to_string());
}
}
-
- // Initialize tablet_reader_params
- RETURN_IF_ERROR(_init_tablet_reader_params(_key_ranges,
parent->_olap_filters,
-
parent->_filter_predicates,
-
parent->_push_down_functions));
+ if (!_state->skip_delete_predicate()) {
+ read_source.fill_delete_predicates();
+ }
+ _tablet_reader_params.set_read_source(std::move(read_source));
}
+
+ // Initialize tablet_reader_params
+ RETURN_IF_ERROR(_init_tablet_reader_params(_key_ranges,
parent->_olap_filters,
+ parent->_filter_predicates,
+
parent->_push_down_functions));
}
// add read columns in profile
@@ -306,16 +303,9 @@ Status NewOlapScanner::_init_tablet_reader_params(
std::inserter(_tablet_reader_params.function_filters,
_tablet_reader_params.function_filters.begin()));
- if (!_state->skip_delete_predicate()) {
- auto& delete_preds = _tablet->delete_predicates();
- std::copy(delete_preds.cbegin(), delete_preds.cend(),
- std::inserter(_tablet_reader_params.delete_predicates,
-
_tablet_reader_params.delete_predicates.begin()));
- }
-
// Merge the columns in delete predicate that not in latest schema in to
current tablet schema
- for (auto& del_pred_pb : _tablet_reader_params.delete_predicates) {
-
_tablet_schema->merge_dropped_columns(_tablet->tablet_schema(del_pred_pb->version()));
+ for (auto& del_pred : _tablet_reader_params.delete_predicates) {
+ _tablet_schema->merge_dropped_columns(*del_pred->tablet_schema());
}
// Range
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h
b/be/src/vec/exec/scan/new_olap_scanner.h
index 41dd4e416c0..6ecb6a1dc5d 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -30,6 +30,7 @@
#include "common/status.h"
#include "olap/data_dir.h"
#include "olap/reader.h"
+#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_reader.h"
#include "olap/tablet.h"
#include "olap/tablet_schema.h"
@@ -59,7 +60,7 @@ public:
NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t
limit, bool aggregation,
const TPaloScanRange& scan_range, const
std::vector<OlapScanRange*>& key_ranges,
- const std::vector<RowSetSplits>& rs_splits, RuntimeProfile*
profile);
+ TabletReader::ReadSource read_source, RuntimeProfile*
profile);
Status init() override;
diff --git a/be/test/olap/delete_handler_test.cpp
b/be/test/olap/delete_handler_test.cpp
index c2aa5f1efeb..792fc18f78c 100644
--- a/be/test/olap/delete_handler_test.cpp
+++ b/be/test/olap/delete_handler_test.cpp
@@ -909,6 +909,16 @@ protected:
tablet->add_rowset(rowset);
}
+ std::vector<RowsetMetaSharedPtr> get_delete_predicates() {
+ std::vector<RowsetMetaSharedPtr> delete_preds;
+ for (auto&& rs_meta : tablet->tablet_meta()->_rs_metas) {
+ if (rs_meta->has_delete_predicate()) {
+ delete_preds.push_back(rs_meta);
+ }
+ }
+ return delete_preds;
+ }
+
std::string _tablet_path;
RowCursor _data_row_cursor;
TabletSharedPtr tablet;
@@ -929,7 +939,7 @@ TEST_F(TestDeleteHandler, ValueWithQuote) {
add_delete_predicate(del_predicate, 2);
- auto res = _delete_handler.init(tablet->tablet_schema(),
tablet->delete_predicates(), 5);
+ auto res = _delete_handler.init(tablet->tablet_schema(),
get_delete_predicates(), 5);
EXPECT_EQ(Status::OK(), res);
_delete_handler.finalize();
}
@@ -942,7 +952,7 @@ TEST_F(TestDeleteHandler, ValueWithoutQuote) {
add_delete_predicate(del_predicate, 2);
- auto res = _delete_handler.init(tablet->tablet_schema(),
tablet->delete_predicates(), 5);
+ auto res = _delete_handler.init(tablet->tablet_schema(),
get_delete_predicates(), 5);
EXPECT_EQ(Status::OK(), res);
_delete_handler.finalize();
}
@@ -1016,7 +1026,7 @@ TEST_F(TestDeleteHandler, InitSuccess) {
add_delete_predicate(del_pred_4, 5);
// Get delete conditions which version <= 5
- res = _delete_handler.init(tablet->tablet_schema(),
tablet->delete_predicates(), 5);
+ res = _delete_handler.init(tablet->tablet_schema(),
get_delete_predicates(), 5);
EXPECT_EQ(Status::OK(), res);
_delete_handler.finalize();
}
@@ -1048,7 +1058,7 @@ TEST_F(TestDeleteHandler, FilterDataSubconditions) {
add_delete_predicate(del_pred, 2);
// 指定版本号为10以载入Header中的所有过滤条件(在这个case中,只有过滤条件1)
- res = _delete_handler.init(tablet->tablet_schema(),
tablet->delete_predicates(), 4);
+ res = _delete_handler.init(tablet->tablet_schema(),
get_delete_predicates(), 4);
EXPECT_EQ(Status::OK(), res);
// 构造一行测试数据
@@ -1133,7 +1143,7 @@ TEST_F(TestDeleteHandler, FilterDataConditions) {
add_delete_predicate(del_pred_3, 4);
// 指定版本号为4以载入meta中的所有过滤条件(在这个case中,只有过滤条件1)
- res = _delete_handler.init(tablet->tablet_schema(),
tablet->delete_predicates(), 4);
+ res = _delete_handler.init(tablet->tablet_schema(),
get_delete_predicates(), 4);
EXPECT_EQ(Status::OK(), res);
std::vector<string> data_str;
@@ -1196,7 +1206,7 @@ TEST_F(TestDeleteHandler, FilterDataVersion) {
add_delete_predicate(del_pred_2, 4);
// 指定版本号为4以载入meta中的所有过滤条件(过滤条件1,过滤条件2)
- res = _delete_handler.init(tablet->tablet_schema(),
tablet->delete_predicates(), 4);
+ res = _delete_handler.init(tablet->tablet_schema(),
get_delete_predicates(), 4);
EXPECT_EQ(Status::OK(), res);
// 构造一行测试数据
diff --git a/be/test/olap/rowid_conversion_test.cpp
b/be/test/olap/rowid_conversion_test.cpp
index 95b34cf0f88..e6bb0efeb46 100644
--- a/be/test/olap/rowid_conversion_test.cpp
+++ b/be/test/olap/rowid_conversion_test.cpp
@@ -65,6 +65,18 @@ using namespace ErrorCode;
static const uint32_t MAX_PATH_LEN = 1024;
static StorageEngine* k_engine = nullptr;
+static constexpr int64_t tablet_id = 10005;
+
+static RowsetId next_rowset_id() {
+ // FIXME(plat1ko): If `inc_id` set to 1000, and run
`VerticalCompactionTest` before `TestRowIdConversion`,
+ // will `TestRowIdConversion` will fail. There may be some strange global
states here.
+ static int64_t inc_id = 0;
+ RowsetId rowset_id;
+ inc_id++;
+ rowset_id.init(inc_id);
+ return rowset_id;
+}
+
class TestRowIdConversion : public testing::TestWithParam<std::tuple<KeysType,
bool, bool, bool>> {
protected:
void SetUp() override {
@@ -137,22 +149,21 @@ protected:
return tablet_schema;
}
- void create_rowset_writer_context(TabletSchemaSPtr tablet_schema,
- const SegmentsOverlapPB& overlap,
- uint32_t max_rows_per_segment,
- RowsetWriterContext*
rowset_writer_context) {
- static int64_t inc_id = 0;
- RowsetId rowset_id;
- rowset_id.init(inc_id);
- rowset_writer_context->rowset_id = rowset_id;
- rowset_writer_context->rowset_type = BETA_ROWSET;
- rowset_writer_context->rowset_state = VISIBLE;
- rowset_writer_context->tablet_schema = tablet_schema;
- rowset_writer_context->rowset_dir = absolute_dir + "/tablet_path";
- rowset_writer_context->version = Version(inc_id, inc_id);
- rowset_writer_context->segments_overlap = overlap;
- rowset_writer_context->max_rows_per_segment = max_rows_per_segment;
- inc_id++;
+ RowsetWriterContext create_rowset_writer_context(TabletSchemaSPtr
tablet_schema,
+ const SegmentsOverlapPB&
overlap,
+ uint32_t
max_rows_per_segment,
+ Version version) {
+ RowsetWriterContext rowset_writer_context;
+ rowset_writer_context.tablet_id = tablet_id;
+ rowset_writer_context.rowset_id = next_rowset_id();
+ rowset_writer_context.rowset_type = BETA_ROWSET;
+ rowset_writer_context.rowset_state = VISIBLE;
+ rowset_writer_context.tablet_schema = tablet_schema;
+ rowset_writer_context.rowset_dir = absolute_dir + "/tablet_path";
+ rowset_writer_context.version = version;
+ rowset_writer_context.segments_overlap = overlap;
+ rowset_writer_context.max_rows_per_segment = max_rows_per_segment;
+ return rowset_writer_context;
}
void create_and_init_rowset_reader(Rowset* rowset, RowsetReaderContext&
context,
@@ -167,8 +178,7 @@ protected:
RowsetSharedPtr create_rowset(
TabletSchemaSPtr tablet_schema, const SegmentsOverlapPB& overlap,
- std::vector<std::vector<std::tuple<int64_t, int64_t>>>
rowset_data) {
- RowsetWriterContext writer_context;
+ std::vector<std::vector<std::tuple<int64_t, int64_t>>>
rowset_data, int64_t version) {
if (overlap == NONOVERLAPPING) {
for (auto i = 1; i < rowset_data.size(); i++) {
auto& last_seg_data = rowset_data[i - 1];
@@ -178,8 +188,8 @@ protected:
EXPECT_LT(last_seg_max, cur_seg_min);
}
}
- create_rowset_writer_context(tablet_schema, overlap, UINT32_MAX,
&writer_context);
-
+ auto writer_context = create_rowset_writer_context(tablet_schema,
overlap, UINT32_MAX,
+ {version, version});
std::unique_ptr<RowsetWriter> rowset_writer;
Status s = RowsetFactory::create_rowset_writer(writer_context, false,
&rowset_writer);
EXPECT_TRUE(s.ok());
@@ -214,51 +224,21 @@ protected:
return rowset;
}
- void init_rs_meta(RowsetMetaSharedPtr& pb1, int64_t start, int64_t end) {
- std::string json_rowset_meta = R"({
- "rowset_id": 540081,
- "tablet_id": 15673,
- "txn_id": 4042,
- "tablet_schema_hash": 567997577,
- "rowset_type": "BETA_ROWSET",
- "rowset_state": "VISIBLE",
- "start_version": 2,
- "end_version": 2,
- "num_rows": 3929,
- "total_disk_size": 84699,
- "data_disk_size": 84464,
- "index_disk_size": 235,
- "empty": false,
- "load_id": {
- "hi": -5350970832824939812,
- "lo": -6717994719194512122
- },
- "creation_time": 1553765670
- })";
- RowsetMetaPB rowset_meta_pb;
- json2pb::JsonToProtoMessage(json_rowset_meta, &rowset_meta_pb);
- rowset_meta_pb.set_start_version(start);
- rowset_meta_pb.set_end_version(end);
- rowset_meta_pb.set_creation_time(10000);
- pb1->init_from_pb(rowset_meta_pb);
- }
-
- void add_delete_predicate(TabletSharedPtr tablet, DeletePredicatePB&
del_pred,
- int64_t version) {
- RowsetMetaSharedPtr rsm(new RowsetMeta());
- init_rs_meta(rsm, version, version);
- RowsetId id;
- id.init(version * 1000);
- rsm->set_rowset_id(id);
- rsm->set_delete_predicate(del_pred);
- rsm->set_tablet_schema(tablet->tablet_schema());
- RowsetSharedPtr rowset =
std::make_shared<BetaRowset>(tablet->tablet_schema(), "", rsm);
- tablet->add_rowset(rowset);
+ RowsetSharedPtr create_delete_predicate(const TabletSchemaSPtr& schema,
+ DeletePredicatePB del_pred,
int64_t version) {
+ auto rs_meta = std::make_shared<RowsetMeta>();
+ rs_meta->set_tablet_id(tablet_id);
+ rs_meta->set_rowset_id(next_rowset_id());
+ rs_meta->set_rowset_type(BETA_ROWSET);
+ rs_meta->set_rowset_state(VISIBLE);
+ rs_meta->set_delete_predicate(std::move(del_pred));
+ rs_meta->set_tablet_schema(schema);
+ rs_meta->set_version({version, version});
+ return std::make_shared<BetaRowset>(schema, "", std::move(rs_meta));
}
TabletSharedPtr create_tablet(const TabletSchema& tablet_schema,
- bool enable_unique_key_merge_on_write,
int64_t version,
- bool has_delete_handler) {
+ bool enable_unique_key_merge_on_write) {
std::vector<TColumn> cols;
std::unordered_map<uint32_t, uint32_t> col_ordinal_to_unique_id;
for (auto i = 0; i < tablet_schema.num_columns(); i++) {
@@ -281,29 +261,13 @@ protected:
}
t_tablet_schema.__set_storage_type(TStorageType::COLUMN);
t_tablet_schema.__set_columns(cols);
- TabletMetaSharedPtr tablet_meta(
- new TabletMeta(1, 1, 1, 1, 1, 1, t_tablet_schema, 1,
col_ordinal_to_unique_id,
- UniqueId(1, 2), TTabletType::TABLET_TYPE_DISK,
- TCompressionType::LZ4F, 0,
enable_unique_key_merge_on_write));
+ TabletMetaSharedPtr tablet_meta(new TabletMeta(
+ 1, 1, tablet_id, 1, 1, 1, t_tablet_schema, 1,
col_ordinal_to_unique_id,
+ UniqueId(1, 2), TTabletType::TABLET_TYPE_DISK,
TCompressionType::LZ4F, 0,
+ enable_unique_key_merge_on_write));
TabletSharedPtr tablet(new Tablet(tablet_meta, nullptr));
tablet->init();
- if (has_delete_handler) {
- // delete data with key < 1000
- std::vector<TCondition> conditions;
- TCondition condition;
- condition.column_name = tablet_schema.column(0).name();
- condition.condition_op = "<";
- condition.condition_values.clear();
- condition.condition_values.push_back("1000");
- conditions.push_back(condition);
-
- DeletePredicatePB del_pred;
- Status st =
- DeleteHandler::generate_delete_predicate(tablet_schema,
conditions, &del_pred);
- EXPECT_EQ(Status::OK(), st);
- add_delete_predicate(tablet, del_pred, version);
- }
return tablet;
}
@@ -327,13 +291,30 @@ protected:
new_overlap = OVERLAPPING;
}
}
- RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap,
input_data[i]);
+ RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap,
input_data[i], i);
input_rowsets.push_back(rowset);
}
+ if (has_delete_handler) {
+ // delete data with key < 1000
+ std::vector<TCondition> conditions;
+ TCondition condition;
+ condition.column_name = tablet_schema->column(0).name();
+ condition.condition_op = "<";
+ condition.condition_values.clear();
+ condition.condition_values.push_back("1000");
+ conditions.push_back(condition);
+
+ DeletePredicatePB del_pred;
+ Status st =
+ DeleteHandler::generate_delete_predicate(*tablet_schema,
conditions, &del_pred);
+ ASSERT_TRUE(st.ok()) << st;
+ input_rowsets.push_back(
+ create_delete_predicate(tablet_schema, del_pred,
num_input_rowset));
+ }
// create output rowset writer
- RowsetWriterContext writer_context;
- create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 3456,
&writer_context);
+ auto writer_context = create_rowset_writer_context(
+ tablet_schema, NONOVERLAPPING, 3456, {0,
input_rowsets.back()->end_version()});
std::unique_ptr<RowsetWriter> output_rs_writer;
Status s;
if (is_vertical_merger) {
@@ -341,12 +322,10 @@ protected:
} else {
s = RowsetFactory::create_rowset_writer(writer_context, false,
&output_rs_writer);
}
- EXPECT_TRUE(s.ok());
+ ASSERT_TRUE(s.ok()) << s;
// merge input rowset
- TabletSharedPtr tablet =
- create_tablet(*tablet_schema, enable_unique_key_merge_on_write,
- output_rs_writer->version().first - 1,
has_delete_handler);
+ TabletSharedPtr tablet = create_tablet(*tablet_schema,
enable_unique_key_merge_on_write);
// create input rowset reader
vector<RowsetReaderSharedPtr> input_rs_readers;
@@ -390,15 +369,15 @@ protected:
output_data.emplace_back(columns[0].column->get_int(i),
columns[1].column->get_int(i));
}
- } while (s == Status::OK());
- EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
+ } while (s.ok());
+ EXPECT_TRUE(s.is<END_OF_FILE>()) << s;
EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
std::vector<uint32_t> segment_num_rows;
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
if (has_delete_handler) {
// All keys less than 1000 are deleted by delete handler
for (auto& item : output_data) {
- EXPECT_GE(std::get<0>(item), 1000);
+ ASSERT_GE(std::get<0>(item), 1000);
}
}
@@ -579,7 +558,6 @@ TEST_P(TestRowIdConversion, Conversion) {
{
uint32_t num_segments = 1;
SegmentsOverlapPB overlap = NONOVERLAPPING;
- std::vector<std::vector<std::vector<std::tuple<int64_t,
int64_t>>>> input_data;
check_rowid_conversion(keys_type,
enable_unique_key_merge_on_write, num_input_rowset,
num_segments, rows_per_segment, overlap,
has_delete_handler,
is_vertical_merger);
@@ -588,7 +566,6 @@ TEST_P(TestRowIdConversion, Conversion) {
{
uint32_t num_segments = 2;
SegmentsOverlapPB overlap = OVERLAPPING;
- std::vector<std::vector<std::vector<std::tuple<int64_t,
int64_t>>>> input_data;
check_rowid_conversion(keys_type,
enable_unique_key_merge_on_write, num_input_rowset,
num_segments, rows_per_segment, overlap,
has_delete_handler,
is_vertical_merger);
@@ -597,7 +574,6 @@ TEST_P(TestRowIdConversion, Conversion) {
{
uint32_t num_segments = 2;
SegmentsOverlapPB overlap = NONOVERLAPPING;
- std::vector<std::vector<std::vector<std::tuple<int64_t,
int64_t>>>> input_data;
check_rowid_conversion(keys_type,
enable_unique_key_merge_on_write, num_input_rowset,
num_segments, rows_per_segment, overlap,
has_delete_handler,
is_vertical_merger);
@@ -606,7 +582,6 @@ TEST_P(TestRowIdConversion, Conversion) {
{
uint32_t num_segments = 2;
SegmentsOverlapPB overlap = OVERLAP_UNKNOWN;
- std::vector<std::vector<std::vector<std::tuple<int64_t,
int64_t>>>> input_data;
check_rowid_conversion(keys_type,
enable_unique_key_merge_on_write, num_input_rowset,
num_segments, rows_per_segment, overlap,
has_delete_handler,
is_vertical_merger);
diff --git a/be/test/vec/olap/vertical_compaction_test.cpp
b/be/test/vec/olap/vertical_compaction_test.cpp
index 14a202845ae..963b5241800 100644
--- a/be/test/vec/olap/vertical_compaction_test.cpp
+++ b/be/test/vec/olap/vertical_compaction_test.cpp
@@ -188,22 +188,24 @@ protected:
return tablet_schema;
}
- void create_rowset_writer_context(TabletSchemaSPtr tablet_schema,
- const SegmentsOverlapPB& overlap,
- uint32_t max_rows_per_segment,
- RowsetWriterContext*
rowset_writer_context) {
+ RowsetWriterContext create_rowset_writer_context(TabletSchemaSPtr
tablet_schema,
+ const SegmentsOverlapPB&
overlap,
+ uint32_t
max_rows_per_segment,
+ Version version) {
static int64_t inc_id = 1000;
+ RowsetWriterContext rowset_writer_context;
RowsetId rowset_id;
rowset_id.init(inc_id);
- rowset_writer_context->rowset_id = rowset_id;
- rowset_writer_context->rowset_type = BETA_ROWSET;
- rowset_writer_context->rowset_state = VISIBLE;
- rowset_writer_context->tablet_schema = tablet_schema;
- rowset_writer_context->rowset_dir = absolute_dir + "/tablet_path";
- rowset_writer_context->version = Version(inc_id, inc_id);
- rowset_writer_context->segments_overlap = overlap;
- rowset_writer_context->max_rows_per_segment = max_rows_per_segment;
+ rowset_writer_context.rowset_id = rowset_id;
+ rowset_writer_context.rowset_type = BETA_ROWSET;
+ rowset_writer_context.rowset_state = VISIBLE;
+ rowset_writer_context.tablet_schema = tablet_schema;
+ rowset_writer_context.rowset_dir = absolute_dir + "/tablet_path";
+ rowset_writer_context.version = version;
+ rowset_writer_context.segments_overlap = overlap;
+ rowset_writer_context.max_rows_per_segment = max_rows_per_segment;
inc_id++;
+ return rowset_writer_context;
}
void create_and_init_rowset_reader(Rowset* rowset, RowsetReaderContext&
context,
@@ -218,8 +220,7 @@ protected:
RowsetSharedPtr create_rowset(
TabletSchemaSPtr tablet_schema, const SegmentsOverlapPB& overlap,
- std::vector<std::vector<std::tuple<int64_t, int64_t>>>
rowset_data) {
- RowsetWriterContext writer_context;
+ std::vector<std::vector<std::tuple<int64_t, int64_t>>>
rowset_data, int64_t version) {
if (overlap == NONOVERLAPPING) {
for (auto i = 1; i < rowset_data.size(); i++) {
auto& last_seg_data = rowset_data[i - 1];
@@ -229,7 +230,8 @@ protected:
EXPECT_LT(last_seg_max, cur_seg_min);
}
}
- create_rowset_writer_context(tablet_schema, overlap, UINT32_MAX,
&writer_context);
+ auto writer_context = create_rowset_writer_context(tablet_schema,
overlap, UINT32_MAX,
+ {version, version});
std::unique_ptr<RowsetWriter> rowset_writer;
Status s = RowsetFactory::create_rowset_writer(writer_context, false,
&rowset_writer);
@@ -265,51 +267,37 @@ protected:
return rowset;
}
- void init_rs_meta(RowsetMetaSharedPtr& pb1, int64_t start, int64_t end) {
+ void init_rs_meta(RowsetMetaSharedPtr& rs_meta, int64_t start, int64_t
end) {
std::string json_rowset_meta = R"({
- "rowset_id": 540085,
- "tablet_id": 15674,
- "txn_id": 4045,
- "tablet_schema_hash": 567997588,
+ "rowset_id": 540081,
+ "tablet_id": 15673,
+ "tablet_schema_hash": 567997577,
"rowset_type": "BETA_ROWSET",
"rowset_state": "VISIBLE",
- "start_version": 2,
- "end_version": 2,
- "num_rows": 3929,
- "total_disk_size": 84699,
- "data_disk_size": 84464,
- "index_disk_size": 235,
- "empty": false,
- "load_id": {
- "hi": -5350970832824939812,
- "lo": -6717994719194512122
- },
- "creation_time": 1553765670
+ "empty": false
})";
RowsetMetaPB rowset_meta_pb;
json2pb::JsonToProtoMessage(json_rowset_meta, &rowset_meta_pb);
rowset_meta_pb.set_start_version(start);
rowset_meta_pb.set_end_version(end);
rowset_meta_pb.set_creation_time(10000);
- pb1->init_from_pb(rowset_meta_pb);
+ rs_meta->init_from_pb(rowset_meta_pb);
}
- void add_delete_predicate(TabletSharedPtr tablet, DeletePredicatePB&
del_pred,
- int64_t version) {
+ RowsetSharedPtr create_delete_predicate(const TabletSchemaSPtr& schema,
+ DeletePredicatePB del_pred,
int64_t version) {
RowsetMetaSharedPtr rsm(new RowsetMeta());
init_rs_meta(rsm, version, version);
RowsetId id;
id.init(version * 1000);
rsm->set_rowset_id(id);
- rsm->set_delete_predicate(del_pred);
- rsm->set_tablet_schema(tablet->tablet_schema());
- RowsetSharedPtr rowset =
std::make_shared<BetaRowset>(tablet->tablet_schema(), "", rsm);
- tablet->add_rowset(rowset);
+ rsm->set_delete_predicate(std::move(del_pred));
+ rsm->set_tablet_schema(schema);
+ return std::make_shared<BetaRowset>(schema, "", rsm);
}
TabletSharedPtr create_tablet(const TabletSchema& tablet_schema,
- bool enable_unique_key_merge_on_write,
int64_t version,
- bool has_delete_handler) {
+ bool enable_unique_key_merge_on_write) {
std::vector<TColumn> cols;
std::unordered_map<uint32_t, uint32_t> col_ordinal_to_unique_id;
for (auto i = 0; i < tablet_schema.num_columns(); i++) {
@@ -346,23 +334,6 @@ protected:
EXPECT_TRUE(res.ok() && !exists);
res =
io::global_local_filesystem()->create_directory(tablet->tablet_path());
EXPECT_TRUE(res.ok());
-
- if (has_delete_handler) {
- // delete data with key < 1000
- std::vector<TCondition> conditions;
- TCondition condition;
- condition.column_name = tablet_schema.column(0).name();
- condition.condition_op = "<";
- condition.condition_values.clear();
- condition.condition_values.push_back("100");
- conditions.push_back(condition);
-
- DeletePredicatePB del_pred;
- Status st =
- DeleteHandler::generate_delete_predicate(tablet_schema,
conditions, &del_pred);
- EXPECT_EQ(Status::OK(), st);
- add_delete_predicate(tablet, del_pred, version);
- }
return tablet;
}
@@ -491,35 +462,34 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) {
new_overlap = OVERLAPPING;
}
}
- RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap,
input_data[i]);
+ RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap,
input_data[i], i);
input_rowsets.push_back(rowset);
}
// create input rowset reader
vector<RowsetReaderSharedPtr> input_rs_readers;
for (auto& rowset : input_rowsets) {
RowsetReaderSharedPtr rs_reader;
- EXPECT_TRUE(rowset->create_reader(&rs_reader).ok());
+ ASSERT_TRUE(rowset->create_reader(&rs_reader).ok());
input_rs_readers.push_back(std::move(rs_reader));
}
// create output rowset writer
- RowsetWriterContext writer_context;
- create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 3456,
&writer_context);
+ auto writer_context = create_rowset_writer_context(tablet_schema,
NONOVERLAPPING, 3456,
+ {0,
input_rowsets.back()->end_version()});
std::unique_ptr<RowsetWriter> output_rs_writer;
Status s = RowsetFactory::create_rowset_writer(writer_context, true,
&output_rs_writer);
- EXPECT_TRUE(s.ok());
+ ASSERT_TRUE(s.ok()) << s;
// merge input rowset
- bool has_delete_handler = false;
- TabletSharedPtr tablet = create_tablet(
- *tablet_schema, false, output_rs_writer->version().first - 1,
has_delete_handler);
+ TabletSharedPtr tablet = create_tablet(*tablet_schema, false);
Merger::Statistics stats;
RowIdConversion rowid_conversion;
stats.rowid_conversion = &rowid_conversion;
s = Merger::vertical_merge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION, tablet_schema,
input_rs_readers,
output_rs_writer.get(), 100, &stats);
- EXPECT_TRUE(s.ok());
+ ASSERT_TRUE(s.ok()) << s;
RowsetSharedPtr out_rowset = output_rs_writer->build();
+ ASSERT_TRUE(out_rowset);
// create output rowset reader
RowsetReaderContext reader_context;
@@ -598,7 +568,7 @@ TEST_F(VerticalCompactionTest,
TestDupWithoutKeyVerticalMerge) {
new_overlap = OVERLAPPING;
}
}
- RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap,
input_data[i]);
+ RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap,
input_data[i], i);
input_rowsets.push_back(rowset);
}
// create input rowset reader
@@ -610,22 +580,20 @@ TEST_F(VerticalCompactionTest,
TestDupWithoutKeyVerticalMerge) {
}
// create output rowset writer
- RowsetWriterContext writer_context;
- create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 3456,
&writer_context);
+ auto writer_context = create_rowset_writer_context(tablet_schema,
NONOVERLAPPING, 3456,
+ {0,
input_rowsets.back()->end_version()});
std::unique_ptr<RowsetWriter> output_rs_writer;
Status s = RowsetFactory::create_rowset_writer(writer_context, true,
&output_rs_writer);
EXPECT_TRUE(s.ok());
// merge input rowset
- bool has_delete_handler = false;
- TabletSharedPtr tablet = create_tablet(
- *tablet_schema, false, output_rs_writer->version().first - 1,
has_delete_handler);
+ TabletSharedPtr tablet = create_tablet(*tablet_schema, false);
Merger::Statistics stats;
RowIdConversion rowid_conversion;
stats.rowid_conversion = &rowid_conversion;
s = Merger::vertical_merge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION, tablet_schema,
input_rs_readers,
output_rs_writer.get(), 100, &stats);
- EXPECT_TRUE(s.ok());
+ ASSERT_TRUE(s.ok()) << s;
RowsetSharedPtr out_rowset = output_rs_writer->build();
// create output rowset reader
@@ -706,7 +674,7 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) {
new_overlap = OVERLAPPING;
}
}
- RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap,
input_data[i]);
+ RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap,
input_data[i], i);
input_rowsets.push_back(rowset);
}
// create input rowset reader
@@ -718,16 +686,14 @@ TEST_F(VerticalCompactionTest,
TestUniqueKeyVerticalMerge) {
}
// create output rowset writer
- RowsetWriterContext writer_context;
- create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 3456,
&writer_context);
+ auto writer_context = create_rowset_writer_context(tablet_schema,
NONOVERLAPPING, 3456,
+ {0,
input_rowsets.back()->end_version()});
std::unique_ptr<RowsetWriter> output_rs_writer;
Status s = RowsetFactory::create_rowset_writer(writer_context, true,
&output_rs_writer);
EXPECT_TRUE(s.ok());
// merge input rowset
- bool has_delete_handler = false;
- TabletSharedPtr tablet = create_tablet(
- *tablet_schema, false, output_rs_writer->version().first - 1,
has_delete_handler);
+ TabletSharedPtr tablet = create_tablet(*tablet_schema, false);
Merger::Statistics stats;
RowIdConversion rowid_conversion;
stats.rowid_conversion = &rowid_conversion;
@@ -795,46 +761,55 @@ TEST_F(VerticalCompactionTest,
TestDupKeyVerticalMergeWithDelete) {
}
TabletSchemaSPtr tablet_schema = create_schema(DUP_KEYS);
+ TabletSharedPtr tablet = create_tablet(*tablet_schema, false);
// create input rowset
- vector<RowsetSharedPtr> input_rowsets;
SegmentsOverlapPB new_overlap = overlap;
+ std::vector<RowsetSharedPtr> input_rowsets;
for (auto i = 0; i < num_input_rowset; i++) {
if (overlap == OVERLAP_UNKNOWN) {
- if (i == 0) {
- new_overlap = NONOVERLAPPING;
- } else {
- new_overlap = OVERLAPPING;
- }
+ new_overlap = (i == 0) ? NONOVERLAPPING : OVERLAPPING;
}
- RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap,
input_data[i]);
- input_rowsets.push_back(rowset);
+ input_rowsets.push_back(create_rowset(tablet_schema, new_overlap,
input_data[i], i));
}
+
+ // delete data with key < 100
+ std::vector<TCondition> conditions;
+ TCondition condition;
+ condition.column_name = tablet->tablet_schema()->column(0).name();
+ condition.condition_op = "<";
+ condition.condition_values.clear();
+ condition.condition_values.push_back("100");
+ conditions.push_back(condition);
+ DeletePredicatePB del_pred;
+ auto st =
DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), conditions,
+ &del_pred);
+ ASSERT_TRUE(st.ok()) << st;
+ input_rowsets.push_back(create_delete_predicate(tablet->tablet_schema(),
std::move(del_pred),
+ num_input_rowset));
+
// create input rowset reader
vector<RowsetReaderSharedPtr> input_rs_readers;
for (auto& rowset : input_rowsets) {
RowsetReaderSharedPtr rs_reader;
- EXPECT_TRUE(rowset->create_reader(&rs_reader).ok());
+ ASSERT_TRUE(rowset->create_reader(&rs_reader).ok());
input_rs_readers.push_back(std::move(rs_reader));
}
// create output rowset writer
- RowsetWriterContext writer_context;
- create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 3456,
&writer_context);
+ auto writer_context = create_rowset_writer_context(tablet_schema,
NONOVERLAPPING, 3456,
+ {0,
input_rowsets.back()->end_version()});
std::unique_ptr<RowsetWriter> output_rs_writer;
- Status s = RowsetFactory::create_rowset_writer(writer_context, true,
&output_rs_writer);
- EXPECT_TRUE(s.ok());
-
+ st = RowsetFactory::create_rowset_writer(writer_context, true,
&output_rs_writer);
+ ASSERT_TRUE(st.ok()) << st;
// merge input rowset
- bool has_delete_handler = true;
- TabletSharedPtr tablet = create_tablet(*tablet_schema, false,
output_rs_writer->version().first,
- has_delete_handler);
Merger::Statistics stats;
RowIdConversion rowid_conversion;
stats.rowid_conversion = &rowid_conversion;
- s = Merger::vertical_merge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION, tablet_schema,
- input_rs_readers,
output_rs_writer.get(), 100, &stats);
- EXPECT_TRUE(s.ok());
+ st = Merger::vertical_merge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION, tablet_schema,
+ input_rs_readers,
output_rs_writer.get(), 100, &stats);
+ ASSERT_TRUE(st.ok()) << st;
RowsetSharedPtr out_rowset = output_rs_writer->build();
+ ASSERT_TRUE(out_rowset);
// create output rowset reader
RowsetReaderContext reader_context;
@@ -851,24 +826,22 @@ TEST_F(VerticalCompactionTest,
TestDupKeyVerticalMergeWithDelete) {
std::vector<std::tuple<int64_t, int64_t>> output_data;
do {
block_create(tablet_schema, &output_block);
- s = output_rs_reader->next_block(&output_block);
+ st = output_rs_reader->next_block(&output_block);
auto columns = output_block.get_columns_with_type_and_name();
EXPECT_EQ(columns.size(), 2);
for (auto i = 0; i < output_block.rows(); i++) {
output_data.emplace_back(columns[0].column->get_int(i),
columns[1].column->get_int(i));
}
- } while (s == Status::OK());
- EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
+ } while (st.ok());
+ EXPECT_TRUE(st.is<END_OF_FILE>()) << st;
EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
EXPECT_EQ(output_data.size(),
num_input_rowset * num_segments * rows_per_segment -
num_input_rowset * 100);
std::vector<uint32_t> segment_num_rows;
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
- if (has_delete_handler) {
- // All keys less than 1000 are deleted by delete handler
- for (auto& item : output_data) {
- EXPECT_GE(std::get<0>(item), 100);
- }
+ // All keys less than 1000 are deleted by delete handler
+ for (auto& item : output_data) {
+ ASSERT_GE(std::get<0>(item), 100);
}
}
@@ -889,46 +862,55 @@ TEST_F(VerticalCompactionTest,
TestDupWithoutKeyVerticalMergeWithDelete) {
}
TabletSchemaSPtr tablet_schema = create_schema(DUP_KEYS, true);
+ TabletSharedPtr tablet = create_tablet(*tablet_schema, false);
// create input rowset
- vector<RowsetSharedPtr> input_rowsets;
SegmentsOverlapPB new_overlap = overlap;
+ std::vector<RowsetSharedPtr> input_rowsets;
for (auto i = 0; i < num_input_rowset; i++) {
if (overlap == OVERLAP_UNKNOWN) {
- if (i == 0) {
- new_overlap = NONOVERLAPPING;
- } else {
- new_overlap = OVERLAPPING;
- }
+ new_overlap = (i == 0) ? NONOVERLAPPING : OVERLAPPING;
}
- RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap,
input_data[i]);
- input_rowsets.push_back(rowset);
+ input_rowsets.push_back(create_rowset(tablet_schema, new_overlap,
input_data[i], i));
}
+
+ // delete data with key < 100
+ std::vector<TCondition> conditions;
+ TCondition condition;
+ condition.column_name = tablet->tablet_schema()->column(0).name();
+ condition.condition_op = "<";
+ condition.condition_values.clear();
+ condition.condition_values.push_back("100");
+ conditions.push_back(condition);
+ DeletePredicatePB del_pred;
+ auto st =
DeleteHandler::generate_delete_predicate(*tablet->tablet_schema(), conditions,
+ &del_pred);
+ ASSERT_TRUE(st.ok()) << st;
+ input_rowsets.push_back(create_delete_predicate(tablet->tablet_schema(),
std::move(del_pred),
+ num_input_rowset));
+
// create input rowset reader
vector<RowsetReaderSharedPtr> input_rs_readers;
for (auto& rowset : input_rowsets) {
RowsetReaderSharedPtr rs_reader;
- EXPECT_TRUE(rowset->create_reader(&rs_reader).ok());
+ ASSERT_TRUE(rowset->create_reader(&rs_reader).ok());
input_rs_readers.push_back(std::move(rs_reader));
}
// create output rowset writer
- RowsetWriterContext writer_context;
- create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 3456,
&writer_context);
+ auto writer_context = create_rowset_writer_context(tablet_schema,
NONOVERLAPPING, 3456,
+ {0,
input_rowsets.back()->end_version()});
std::unique_ptr<RowsetWriter> output_rs_writer;
- Status s = RowsetFactory::create_rowset_writer(writer_context, true,
&output_rs_writer);
- EXPECT_TRUE(s.ok());
-
+ st = RowsetFactory::create_rowset_writer(writer_context, true,
&output_rs_writer);
+ ASSERT_TRUE(st.ok()) << st;
// merge input rowset
- bool has_delete_handler = true;
- TabletSharedPtr tablet = create_tablet(*tablet_schema, false,
output_rs_writer->version().first,
- has_delete_handler);
Merger::Statistics stats;
RowIdConversion rowid_conversion;
stats.rowid_conversion = &rowid_conversion;
- s = Merger::vertical_merge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION, tablet_schema,
- input_rs_readers,
output_rs_writer.get(), 100, &stats);
- EXPECT_TRUE(s.ok());
+ st = Merger::vertical_merge_rowsets(tablet,
ReaderType::READER_BASE_COMPACTION, tablet_schema,
+ input_rs_readers,
output_rs_writer.get(), 100, &stats);
+ ASSERT_TRUE(st.ok()) << st;
RowsetSharedPtr out_rowset = output_rs_writer->build();
+ ASSERT_TRUE(out_rowset);
// create output rowset reader
RowsetReaderContext reader_context;
@@ -945,24 +927,22 @@ TEST_F(VerticalCompactionTest,
TestDupWithoutKeyVerticalMergeWithDelete) {
std::vector<std::tuple<int64_t, int64_t>> output_data;
do {
block_create(tablet_schema, &output_block);
- s = output_rs_reader->next_block(&output_block);
+ st = output_rs_reader->next_block(&output_block);
auto columns = output_block.get_columns_with_type_and_name();
EXPECT_EQ(columns.size(), 2);
for (auto i = 0; i < output_block.rows(); i++) {
output_data.emplace_back(columns[0].column->get_int(i),
columns[1].column->get_int(i));
}
- } while (s == Status::OK());
- EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
+ } while (st.ok());
+ EXPECT_TRUE(st.is<END_OF_FILE>()) << st;
EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
EXPECT_EQ(output_data.size(),
num_input_rowset * num_segments * rows_per_segment -
num_input_rowset * 100);
std::vector<uint32_t> segment_num_rows;
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
- if (has_delete_handler) {
- // All keys less than 1000 are deleted by delete handler
- for (auto& item : output_data) {
- EXPECT_GE(std::get<0>(item), 100);
- }
+ // All keys less than 1000 are deleted by delete handler
+ for (auto& item : output_data) {
+ ASSERT_GE(std::get<0>(item), 100);
}
}
@@ -994,7 +974,7 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) {
new_overlap = OVERLAPPING;
}
}
- RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap,
input_data[i]);
+ RowsetSharedPtr rowset = create_rowset(tablet_schema, new_overlap,
input_data[i], i);
input_rowsets.push_back(rowset);
}
// create input rowset reader
@@ -1006,16 +986,14 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) {
}
// create output rowset writer
- RowsetWriterContext writer_context;
- create_rowset_writer_context(tablet_schema, NONOVERLAPPING, 3456,
&writer_context);
+ auto writer_context = create_rowset_writer_context(tablet_schema,
NONOVERLAPPING, 3456,
+ {0,
input_rowsets.back()->end_version()});
std::unique_ptr<RowsetWriter> output_rs_writer;
Status s = RowsetFactory::create_rowset_writer(writer_context, true,
&output_rs_writer);
EXPECT_TRUE(s.ok());
// merge input rowset
- bool has_delete_handler = false;
- TabletSharedPtr tablet = create_tablet(
- *tablet_schema, false, output_rs_writer->version().first - 1,
has_delete_handler);
+ TabletSharedPtr tablet = create_tablet(*tablet_schema, false);
Merger::Statistics stats;
RowIdConversion rowid_conversion;
stats.rowid_conversion = &rowid_conversion;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]