This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch branch-3.0-tmp-load-seg-memory
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0-tmp-load-seg-memory
by this push:
new 9347d3cd62f [improvement](segment) reduce memory usage when open
segments (#46570)
9347d3cd62f is described below
commit 9347d3cd62fa5b608d4d9179ac31d7b4e48f4df6
Author: TengJianPing <[email protected]>
AuthorDate: Thu Jan 9 15:00:37 2025 +0800
[improvement](segment) reduce memory usage when open segments (#46570)
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
When there are a lot of segments in one rowset, it will consume plenty
of memory if open all the segments all at once. This PR open segments
one by one and release the `Segment` object immediately if it's not need
to be kept for later use, thus reduce memory footprints dramatically.
---
be/src/olap/parallel_scanner_builder.cpp | 18 ++---
be/src/olap/rowset/beta_rowset.cpp | 21 ++++-
be/src/olap/rowset/beta_rowset.h | 7 +-
be/src/olap/rowset/beta_rowset_reader.cpp | 90 ++++++----------------
be/src/olap/rowset/beta_rowset_reader.h | 8 +-
be/src/olap/rowset/rowset.cpp | 2 -
be/src/olap/rowset/rowset.h | 3 -
be/src/olap/rowset/rowset_reader.h | 6 +-
be/src/olap/rowset/rowset_reader_context.h | 3 +-
.../segment_v2/lazy_init_segment_iterator.cpp | 23 +++++-
.../rowset/segment_v2/lazy_init_segment_iterator.h | 12 ++-
be/src/olap/segment_loader.cpp | 58 ++++++++------
be/src/olap/segment_loader.h | 7 ++
be/test/olap/ordered_data_compaction_test.cpp | 2 -
be/test/olap/rowid_conversion_test.cpp | 3 +-
be/test/olap/segcompaction_mow_test.cpp | 9 +--
be/test/olap/segcompaction_test.cpp | 18 ++---
be/test/testutil/mock_rowset.h | 4 -
be/test/vec/olap/vertical_compaction_test.cpp | 12 ---
19 files changed, 141 insertions(+), 165 deletions(-)
diff --git a/be/src/olap/parallel_scanner_builder.cpp
b/be/src/olap/parallel_scanner_builder.cpp
index 88c69ab5c9a..769abe4a946 100644
--- a/be/src/olap/parallel_scanner_builder.cpp
+++ b/be/src/olap/parallel_scanner_builder.cpp
@@ -70,7 +70,7 @@ Status
ParallelScannerBuilder::_build_scanners_by_rowid(std::list<VScannerSPtr>&
continue;
}
- int segment_start = 0;
+ int64_t segment_start = 0;
auto split = RowSetSplits(reader->clone());
for (size_t i = 0; i != segments_rows.size(); ++i) {
@@ -171,22 +171,18 @@ Status ParallelScannerBuilder::_load() {
if (!_state->skip_delete_predicate()) {
read_source.fill_delete_predicates();
}
- bool enable_segment_cache =
_state->query_options().__isset.enable_segment_cache
- ?
_state->query_options().enable_segment_cache
- : true;
for (auto& rs_split : read_source.rs_splits) {
auto rowset = rs_split.rs_reader->rowset();
RETURN_IF_ERROR(rowset->load());
const auto rowset_id = rowset->rowset_id();
- SegmentCacheHandle segment_cache_handle;
- RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
- std::dynamic_pointer_cast<BetaRowset>(rowset),
&segment_cache_handle,
- enable_segment_cache, false));
-
- for (const auto& segment : segment_cache_handle.get_segments()) {
-
_all_segments_rows[rowset_id].emplace_back(segment->num_rows());
+ auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
+ std::vector<uint32_t> segment_rows;
+ RETURN_IF_ERROR(beta_rowset->get_segment_num_rows(&segment_rows));
+ auto segment_count = rowset->num_segments();
+ for (int64_t i = 0; i != segment_count; i++) {
+ _all_segments_rows[rowset_id].emplace_back(segment_rows[i]);
}
_total_rows += rowset->num_rows();
}
diff --git a/be/src/olap/rowset/beta_rowset.cpp
b/be/src/olap/rowset/beta_rowset.cpp
index 9f33f363e99..40db8226f50 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -42,6 +42,7 @@
#include "olap/rowset/segment_v2/inverted_index_cache.h"
#include "olap/rowset/segment_v2/inverted_index_desc.h"
#include "olap/rowset/segment_v2/inverted_index_file_reader.h"
+#include "olap/segment_loader.h"
#include "olap/tablet_schema.h"
#include "olap/utils.h"
#include "util/crc32c.h"
@@ -68,9 +69,23 @@ Status BetaRowset::init() {
return Status::OK(); // no op
}
-Status BetaRowset::do_load(bool /*use_cache*/) {
- // do nothing.
- // the segments in this rowset will be loaded by calling load_segments()
explicitly.
+Status BetaRowset::get_segment_num_rows(std::vector<uint32_t>* segment_rows) {
+ DCHECK(_rowset_state_machine.rowset_state() == ROWSET_LOADED);
+
+ RETURN_IF_ERROR(_load_segment_rows_once.call([this] {
+ auto segment_count = num_segments();
+ _segments_rows.resize(segment_count);
+ for (int64_t i = 0; i != segment_count; ++i) {
+ SegmentCacheHandle segment_cache_handle;
+ RETURN_IF_ERROR(SegmentLoader::instance()->load_segment(
+ std::static_pointer_cast<BetaRowset>(shared_from_this()),
i,
+ &segment_cache_handle, false, false));
+ const auto& tmp_segments = segment_cache_handle.get_segments();
+ _segments_rows[i] = tmp_segments[0]->num_rows();
+ }
+ return Status::OK();
+ }));
+ segment_rows->assign(_segments_rows.cbegin(), _segments_rows.cend());
return Status::OK();
}
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index 0b22d122741..4b1388a1f08 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -89,6 +89,8 @@ public:
Status show_nested_index_file(rapidjson::Value* rowset_value,
rapidjson::Document::AllocatorType&
allocator);
+ Status get_segment_num_rows(std::vector<uint32_t>* segment_rows);
+
protected:
BetaRowset(const TabletSchemaSPtr& schema, const RowsetMetaSharedPtr&
rowset_meta,
std::string tablet_path);
@@ -96,8 +98,6 @@ protected:
// init segment groups
Status init() override;
- Status do_load(bool use_cache) override;
-
void do_close() override;
Status check_current_rowset_segment() override;
@@ -107,6 +107,9 @@ protected:
private:
friend class RowsetFactory;
friend class BetaRowsetReader;
+
+ DorisCallOnce<Status> _load_segment_rows_once;
+ std::vector<uint32_t> _segments_rows;
};
} // namespace doris
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp
b/be/src/olap/rowset/beta_rowset_reader.cpp
index d690b9b58d5..4ff7169e055 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -215,7 +215,6 @@ Status
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
_read_options.io_ctx.expiration_time = 0;
}
- // load segments
bool enable_segment_cache = true;
auto* state = read_context->runtime_state;
if (state != nullptr) {
@@ -226,75 +225,40 @@ Status
BetaRowsetReader::get_segment_iterators(RowsetReaderContext* read_context
// When reader type is for query, session variable `enable_segment_cache`
should be respected.
bool should_use_cache = use_cache || (_read_context->reader_type ==
ReaderType::READER_QUERY &&
enable_segment_cache);
- SegmentCacheHandle segment_cache_handle;
- {
- SCOPED_RAW_TIMER(&_stats->rowset_reader_load_segments_timer_ns);
- RETURN_IF_ERROR(SegmentLoader::instance()->load_segments(
- _rowset, &segment_cache_handle, should_use_cache,
- /*need_load_pk_index_and_bf*/ false));
- }
-
- // create iterator for each segment
- auto& segments = segment_cache_handle.get_segments();
- _segments_rows.resize(segments.size());
- for (size_t i = 0; i < segments.size(); i++) {
- _segments_rows[i] = segments[i]->num_rows();
- }
- if (_read_context->record_rowids) {
- // init segment rowid map for rowid conversion
- std::vector<uint32_t> segment_num_rows;
- RETURN_IF_ERROR(get_segment_num_rows(&segment_num_rows));
-
_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(),
segment_num_rows);
- }
+ auto segment_count = _rowset->num_segments();
auto [seg_start, seg_end] = _segment_offsets;
+ // If seg_start == seg_end, it means that the segments of a rowset is not
+ // split scanned by multiple scanners, and the rowset reader is used to
read the whole rowset.
if (seg_start == seg_end) {
seg_start = 0;
- seg_end = segments.size();
+ seg_end = segment_count;
+ }
+ if (_read_context->record_rowids && _read_context->rowid_conversion) {
+ // init segment rowid map for rowid conversion
+ std::vector<uint32_t> segment_rows;
+ RETURN_IF_ERROR(_rowset->get_segment_num_rows(&segment_rows));
+
_read_context->rowid_conversion->init_segment_map(rowset()->rowset_id(),
segment_rows);
}
- const bool is_merge_iterator = _is_merge_iterator();
- const bool use_lazy_init_iterators =
- !is_merge_iterator && _read_context->reader_type ==
ReaderType::READER_QUERY;
- for (int i = seg_start; i < seg_end; i++) {
+ for (int64_t i = seg_start; i < seg_end; i++) {
SCOPED_RAW_TIMER(&_stats->rowset_reader_create_iterators_timer_ns);
- auto& seg_ptr = segments[i];
std::unique_ptr<RowwiseIterator> iter;
- if (use_lazy_init_iterators) {
- /// For non-merging iterators, we don't need to initialize them
all at once when creating them.
- /// Instead, we should initialize each iterator separately when
really using them.
- /// This optimization minimizes the lifecycle of resources like
column readers
- /// and prevents excessive memory consumption, especially for wide
tables.
- if (_segment_row_ranges.empty()) {
- _read_options.row_ranges.clear();
- iter = std::make_unique<LazyInitSegmentIterator>(seg_ptr,
_input_schema,
-
_read_options);
- } else {
- DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
- auto local_options = _read_options;
- local_options.row_ranges = _segment_row_ranges[i - seg_start];
- iter = std::make_unique<LazyInitSegmentIterator>(seg_ptr,
_input_schema,
-
local_options);
- }
+ /// For iterators, we don't need to initialize them all at once when
creating them.
+ /// Instead, we should initialize each iterator separately when really
using them.
+ /// This optimization minimizes the lifecycle of resources like column
readers
+ /// and prevents excessive memory consumption, especially for wide
tables.
+ if (_segment_row_ranges.empty()) {
+ _read_options.row_ranges.clear();
+ iter = std::make_unique<LazyInitSegmentIterator>(_rowset, i,
should_use_cache,
+ _input_schema,
_read_options);
} else {
- Status status;
- /// If `_segment_row_ranges` is empty, the segment is not split.
- if (_segment_row_ranges.empty()) {
- _read_options.row_ranges.clear();
- status = seg_ptr->new_iterator(_input_schema, _read_options,
&iter);
- } else {
- DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
- auto local_options = _read_options;
- local_options.row_ranges = _segment_row_ranges[i - seg_start];
- status = seg_ptr->new_iterator(_input_schema, local_options,
&iter);
- }
-
- if (!status.ok()) {
- LOG(WARNING) << "failed to create iterator[" << seg_ptr->id()
- << "]: " << status.to_string();
- return Status::Error<ROWSET_READER_INIT>(status.to_string());
- }
+ DCHECK_EQ(seg_end - seg_start, _segment_row_ranges.size());
+ auto local_options = _read_options;
+ local_options.row_ranges = _segment_row_ranges[i - seg_start];
+ iter = std::make_unique<LazyInitSegmentIterator>(_rowset, i,
should_use_cache,
+ _input_schema,
local_options);
}
if (iter->empty()) {
@@ -422,10 +386,4 @@ bool
BetaRowsetReader::_should_push_down_value_predicates() const {
_read_context->sequence_id_idx == -1) ||
_read_context->enable_unique_key_merge_on_write);
}
-
-Status BetaRowsetReader::get_segment_num_rows(std::vector<uint32_t>*
segment_num_rows) {
- segment_num_rows->assign(_segments_rows.cbegin(), _segments_rows.cend());
- return Status::OK();
-}
-
} // namespace doris
diff --git a/be/src/olap/rowset/beta_rowset_reader.h
b/be/src/olap/rowset/beta_rowset_reader.h
index 33b2fb6a58c..b191480f7c7 100644
--- a/be/src/olap/rowset/beta_rowset_reader.h
+++ b/be/src/olap/rowset/beta_rowset_reader.h
@@ -80,8 +80,6 @@ public:
return _iterator->current_block_row_locations(locations);
}
- Status get_segment_num_rows(std::vector<uint32_t>* segment_num_rows)
override;
-
bool update_profile(RuntimeProfile* profile) override;
RowsetReaderSharedPtr clone() override;
@@ -97,7 +95,7 @@ private:
_rowset->rowset_meta()->is_segments_overlapping() &&
_get_segment_num() > 1;
}
- int32_t _get_segment_num() const {
+ int64_t _get_segment_num() const {
auto [seg_start, seg_end] = _segment_offsets;
if (seg_start == seg_end) {
seg_start = 0;
@@ -108,7 +106,7 @@ private:
DorisCallOnce<Status> _init_iter_once;
- std::pair<int, int> _segment_offsets;
+ std::pair<int64_t, int64_t> _segment_offsets;
std::vector<RowRanges> _segment_row_ranges;
SchemaSPtr _input_schema;
@@ -120,8 +118,6 @@ private:
std::unique_ptr<RowwiseIterator> _iterator;
- std::vector<uint32_t> _segments_rows;
-
StorageReadOptions _read_options;
bool _empty = false;
diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp
index ac3a2a7a1dc..0fd8e60f7ce 100644
--- a/be/src/olap/rowset/rowset.cpp
+++ b/be/src/olap/rowset/rowset.cpp
@@ -67,8 +67,6 @@ Status Rowset::load(bool use_cache) {
std::lock_guard load_lock(_lock);
// after lock, if rowset state is ROWSET_UNLOADING, it is ok to return
if (_rowset_state_machine.rowset_state() == ROWSET_UNLOADED) {
- // first do load, then change the state
- RETURN_IF_ERROR(do_load(use_cache));
RETURN_IF_ERROR(_rowset_state_machine.on_load());
}
}
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index be21f29888e..01f321728f1 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -323,9 +323,6 @@ protected:
// this is non-public because all clients should use RowsetFactory to
obtain pointer to initialized Rowset
virtual Status init() = 0;
- // The actual implementation of load(). Guaranteed by to called exactly
once.
- virtual Status do_load(bool use_cache) = 0;
-
// release resources in this api
virtual void do_close() = 0;
diff --git a/be/src/olap/rowset/rowset_reader.h
b/be/src/olap/rowset/rowset_reader.h
index 58c0f592b9c..6c637f47cc1 100644
--- a/be/src/olap/rowset/rowset_reader.h
+++ b/be/src/olap/rowset/rowset_reader.h
@@ -40,7 +40,7 @@ struct RowSetSplits {
// if segment_offsets is not empty, means we only scan
// [pair.first, pair.second) segment in rs_reader, only effective in dup
key
// and pipeline
- std::pair<int, int> segment_offsets;
+ std::pair<int64_t, int64_t> segment_offsets;
// RowRanges of each segment.
std::vector<RowRanges> segment_row_ranges;
@@ -83,10 +83,6 @@ public:
return Status::NotSupported("to be implemented");
}
- virtual Status get_segment_num_rows(std::vector<uint32_t>*
segment_num_rows) {
- return Status::NotSupported("to be implemented");
- }
-
virtual bool update_profile(RuntimeProfile* profile) = 0;
virtual RowsetReaderSharedPtr clone() = 0;
diff --git a/be/src/olap/rowset/rowset_reader_context.h
b/be/src/olap/rowset/rowset_reader_context.h
index fd3b4fed56f..2dd71328902 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -76,8 +76,7 @@ struct RowsetReaderContext {
bool enable_unique_key_merge_on_write = false;
const DeleteBitmap* delete_bitmap = nullptr;
bool record_rowids = false;
- RowIdConversion* rowid_conversion;
- bool is_vertical_compaction = false;
+ RowIdConversion* rowid_conversion = nullptr;
bool is_key_column_group = false;
const std::set<int32_t>* output_columns = nullptr;
RowsetId rowset_id;
diff --git a/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp
b/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp
index d70df5a7bae..77e2310fc48 100644
--- a/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp
+++ b/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.cpp
@@ -17,11 +17,18 @@
#include "olap/rowset/segment_v2/lazy_init_segment_iterator.h"
+#include "olap/segment_loader.h"
+
namespace doris::segment_v2 {
-LazyInitSegmentIterator::LazyInitSegmentIterator(std::shared_ptr<Segment>
segment,
- SchemaSPtr schema, const
StorageReadOptions& opts)
- : _schema(std::move(schema)), _segment(std::move(segment)),
_read_options(opts) {}
+LazyInitSegmentIterator::LazyInitSegmentIterator(BetaRowsetSharedPtr rowset,
int64_t segment_id,
+ bool should_use_cache,
SchemaSPtr schema,
+ const StorageReadOptions&
opts)
+ : _rowset(std::move(rowset)),
+ _segment_id(segment_id),
+ _should_use_cache(should_use_cache),
+ _schema(std::move(schema)),
+ _read_options(opts) {}
/// Here do not use the argument of `opts`,
/// see where the iterator is created in
`BetaRowsetReader::get_segment_iterators`
@@ -31,7 +38,15 @@ Status LazyInitSegmentIterator::init(const
StorageReadOptions& /*opts*/) {
return Status::OK();
}
- RETURN_IF_ERROR(_segment->new_iterator(_schema, _read_options,
&_inner_iterator));
+ std::shared_ptr<Segment> segment;
+ {
+ SegmentCacheHandle segment_cache_handle;
+ RETURN_IF_ERROR(SegmentLoader::instance()->load_segment(
+ _rowset, _segment_id, &segment_cache_handle,
_should_use_cache, false));
+ const auto& tmp_segments = segment_cache_handle.get_segments();
+ segment = tmp_segments[0];
+ }
+ RETURN_IF_ERROR(segment->new_iterator(_schema, _read_options,
&_inner_iterator));
return _inner_iterator->init(_read_options);
}
diff --git a/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.h
b/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.h
index 923c540c456..c31918d092c 100644
--- a/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.h
+++ b/be/src/olap/rowset/segment_v2/lazy_init_segment_iterator.h
@@ -22,14 +22,18 @@
#include "olap/rowset/segment_v2/segment_iterator.h"
#include "vec/core/block.h"
+namespace doris {
+class BetaRowset;
+using BetaRowsetSharedPtr = std::shared_ptr<BetaRowset>;
+}; // namespace doris
namespace doris::segment_v2 {
using namespace vectorized;
class LazyInitSegmentIterator : public RowwiseIterator {
public:
- LazyInitSegmentIterator(std::shared_ptr<Segment> segment, SchemaSPtr
schema,
- const StorageReadOptions& opts);
+ LazyInitSegmentIterator(BetaRowsetSharedPtr rowset, int64_t segment_id,
bool should_use_cache,
+ SchemaSPtr schema, const StorageReadOptions& opts);
~LazyInitSegmentIterator() override = default;
@@ -59,8 +63,10 @@ public:
private:
bool _need_lazy_init {true};
+ BetaRowsetSharedPtr _rowset;
+ int64_t _segment_id {-1};
+ bool _should_use_cache {false};
SchemaSPtr _schema = nullptr;
- std::shared_ptr<Segment> _segment;
StorageReadOptions _read_options;
RowwiseIteratorUPtr _inner_iterator;
};
diff --git a/be/src/olap/segment_loader.cpp b/be/src/olap/segment_loader.cpp
index 26ac54c699b..0501e604df2 100644
--- a/be/src/olap/segment_loader.cpp
+++ b/be/src/olap/segment_loader.cpp
@@ -52,6 +52,38 @@ void SegmentCache::erase(const SegmentCache::CacheKey& key) {
LRUCachePolicy::erase(key.encode());
}
+Status SegmentLoader::load_segment(const BetaRowsetSharedPtr& rowset, int64_t
segment_id,
+ SegmentCacheHandle* cache_handle, bool
use_cache,
+ bool need_load_pk_index_and_bf,
+ OlapReaderStatistics* index_load_stats) {
+ SegmentCache::CacheKey cache_key(rowset->rowset_id(), segment_id);
+ if (_segment_cache->lookup(cache_key, cache_handle)) {
+ // Has to check the segment status here, because the segment in cache
may has something wrong during
+ // load index or create column reader.
+ // Not merge this if logic with previous to make the logic more clear.
+ if (cache_handle->pop_unhealthy_segment() == nullptr) {
+ return Status::OK();
+ }
+ }
+ // If the segment is not healthy, then will create a new segment and will
replace the unhealthy one in SegmentCache.
+ segment_v2::SegmentSharedPtr segment;
+ RETURN_IF_ERROR(rowset->load_segment(segment_id, &segment));
+ if (need_load_pk_index_and_bf) {
+ RETURN_IF_ERROR(segment->load_pk_index_and_bf(index_load_stats));
+ }
+ if (use_cache && !config::disable_segment_cache) {
+ // memory of SegmentCache::CacheValue will be handled by SegmentCache
+ auto* cache_value = new SegmentCache::CacheValue();
+ _cache_mem_usage += segment->meta_mem_usage();
+ cache_value->segment = std::move(segment);
+ _segment_cache->insert(cache_key, *cache_value, cache_handle);
+ } else {
+ cache_handle->push_segment(std::move(segment));
+ }
+
+ return Status::OK();
+}
+
Status SegmentLoader::load_segments(const BetaRowsetSharedPtr& rowset,
SegmentCacheHandle* cache_handle, bool
use_cache,
bool need_load_pk_index_and_bf,
@@ -60,30 +92,8 @@ Status SegmentLoader::load_segments(const
BetaRowsetSharedPtr& rowset,
return Status::OK();
}
for (int64_t i = 0; i < rowset->num_segments(); i++) {
- SegmentCache::CacheKey cache_key(rowset->rowset_id(), i);
- if (_segment_cache->lookup(cache_key, cache_handle)) {
- // Has to check the segment status here, because the segment in
cache may has something wrong during
- // load index or create column reader.
- // Not merge this if logic with previous to make the logic more
clear.
- if (cache_handle->pop_unhealthy_segment() == nullptr) {
- continue;
- }
- }
- // If the segment is not healthy, then will create a new segment and
will replace the unhealthy one in SegmentCache.
- segment_v2::SegmentSharedPtr segment;
- RETURN_IF_ERROR(rowset->load_segment(i, &segment));
- if (need_load_pk_index_and_bf) {
- RETURN_IF_ERROR(segment->load_pk_index_and_bf(index_load_stats));
- }
- if (use_cache && !config::disable_segment_cache) {
- // memory of SegmentCache::CacheValue will be handled by
SegmentCache
- auto* cache_value = new SegmentCache::CacheValue();
- _cache_mem_usage += segment->meta_mem_usage();
- cache_value->segment = std::move(segment);
- _segment_cache->insert(cache_key, *cache_value, cache_handle);
- } else {
- cache_handle->push_segment(std::move(segment));
- }
+ RETURN_IF_ERROR(load_segment(rowset, i, cache_handle, use_cache,
need_load_pk_index_and_bf,
+ index_load_stats));
}
cache_handle->set_inited();
return Status::OK();
diff --git a/be/src/olap/segment_loader.h b/be/src/olap/segment_loader.h
index 834906da93b..57418ab7b78 100644
--- a/be/src/olap/segment_loader.h
+++ b/be/src/olap/segment_loader.h
@@ -120,6 +120,13 @@ public:
bool use_cache = false, bool
need_load_pk_index_and_bf = false,
OlapReaderStatistics* index_load_stats = nullptr);
+ // Load one segment of "rowset", return the "cache_handle" which contains
segments.
+ // If use_cache is true, it will be loaded from _cache.
+ Status load_segment(const BetaRowsetSharedPtr& rowset, int64_t segment_id,
+ SegmentCacheHandle* cache_handle, bool use_cache =
false,
+ bool need_load_pk_index_and_bf = false,
+ OlapReaderStatistics* index_load_stats = nullptr);
+
void erase_segment(const SegmentCache::CacheKey& key);
void erase_segments(const RowsetId& rowset_id, int64_t num_segments);
diff --git a/be/test/olap/ordered_data_compaction_test.cpp
b/be/test/olap/ordered_data_compaction_test.cpp
index 934dfbef3ea..dbbe19fc475 100644
--- a/be/test/olap/ordered_data_compaction_test.cpp
+++ b/be/test/olap/ordered_data_compaction_test.cpp
@@ -530,8 +530,6 @@ TEST_F(OrderedDataCompactionTest, test_01) {
EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
EXPECT_EQ(output_data.size(), num_input_rowset * num_segments *
rows_per_segment);
- std::vector<uint32_t> segment_num_rows;
-
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
// check vertical compaction result
for (auto id = 0; id < output_data.size(); id++) {
LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " "
diff --git a/be/test/olap/rowid_conversion_test.cpp
b/be/test/olap/rowid_conversion_test.cpp
index d48d4150ad3..9221c93479b 100644
--- a/be/test/olap/rowid_conversion_test.cpp
+++ b/be/test/olap/rowid_conversion_test.cpp
@@ -382,8 +382,9 @@ protected:
} while (s.ok());
EXPECT_TRUE(s.is<END_OF_FILE>()) << s;
EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
+ auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(out_rowset);
std::vector<uint32_t> segment_num_rows;
-
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
+ EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows).ok());
if (has_delete_handler) {
// All keys less than 1000 are deleted by delete handler
for (auto& item : output_data) {
diff --git a/be/test/olap/segcompaction_mow_test.cpp
b/be/test/olap/segcompaction_mow_test.cpp
index 62a3232889d..5463de03f2b 100644
--- a/be/test/olap/segcompaction_mow_test.cpp
+++ b/be/test/olap/segcompaction_mow_test.cpp
@@ -239,7 +239,6 @@ protected:
reader_context.stats = &_stats;
reader_context.delete_bitmap = delete_bitmap.get();
- std::vector<uint32_t> segment_num_rows;
Status s;
// without predicates
@@ -280,7 +279,9 @@ protected:
EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
EXPECT_EQ(rowset->rowset_meta()->num_rows(), expect_total_rows);
EXPECT_EQ(num_rows_read, expect_total_rows - rows_mark_deleted);
-
EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok());
+ auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
+ std::vector<uint32_t> segment_num_rows;
+
EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows).ok());
size_t total_num_rows = 0;
for (const auto& i : segment_num_rows) {
total_num_rows += i;
@@ -307,7 +308,6 @@ TEST_P(SegCompactionMoWTest, SegCompactionThenRead) {
config::segcompaction_candidate_max_rows = 6000; // set threshold above
// rows_per_segment
config::segcompaction_batch_size = 10;
- std::vector<uint32_t> segment_num_rows;
DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID);
uint32_t rows_mark_deleted = 0;
{ // write `num_segments * rows_per_segment` rows to rowset
@@ -413,7 +413,6 @@ TEST_F(SegCompactionMoWTest,
SegCompactionInterleaveWithBig_ooooOOoOooooooooO) {
DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID);
uint32_t rows_mark_deleted = 0;
uint32_t total_written_rows = 0;
- std::vector<uint32_t> segment_num_rows;
{ // write `num_segments * rows_per_segment` rows to rowset
RowsetWriterContext writer_context;
create_rowset_writer_context(20048, tablet_schema, &writer_context);
@@ -641,7 +640,6 @@ TEST_F(SegCompactionMoWTest,
SegCompactionInterleaveWithBig_OoOoO) {
RowsetSharedPtr rowset;
config::segcompaction_candidate_max_rows = 6000; // set threshold above
config::segcompaction_batch_size = 5;
- std::vector<uint32_t> segment_num_rows;
DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID);
uint32_t rows_mark_deleted = 0;
uint32_t total_written_rows = 0;
@@ -832,7 +830,6 @@ TEST_F(SegCompactionMoWTest, SegCompactionNotTrigger) {
config::segcompaction_candidate_max_rows = 6000; // set threshold above
// rows_per_segment
config::segcompaction_batch_size = 10;
- std::vector<uint32_t> segment_num_rows;
DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(TABLET_ID);
uint32_t rows_mark_deleted = 0;
{ // write `num_segments * rows_per_segment` rows to rowset
diff --git a/be/test/olap/segcompaction_test.cpp
b/be/test/olap/segcompaction_test.cpp
index 2457ff11b83..f33cc4e944b 100644
--- a/be/test/olap/segcompaction_test.cpp
+++ b/be/test/olap/segcompaction_test.cpp
@@ -291,7 +291,6 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) {
config::segcompaction_candidate_max_rows = 6000; // set threshold above
// rows_per_segment
config::segcompaction_batch_size = 10;
- std::vector<uint32_t> segment_num_rows;
{ // write `num_segments * rows_per_segment` rows to rowset
RowsetWriterContext writer_context;
create_rowset_writer_context(10047, tablet_schema, &writer_context);
@@ -387,7 +386,9 @@ TEST_F(SegCompactionTest, SegCompactionThenRead) {
EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
EXPECT_EQ(rowset->rowset_meta()->num_rows(), num_rows_read);
EXPECT_EQ(num_rows_read, num_segments * rows_per_segment);
-
EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok());
+ auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
+ std::vector<uint32_t> segment_num_rows;
+
EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows).ok());
size_t total_num_rows = 0;
for (const auto& i : segment_num_rows) {
total_num_rows += i;
@@ -406,7 +407,6 @@ TEST_F(SegCompactionTest,
SegCompactionInterleaveWithBig_ooooOOoOooooooooO) {
RowsetSharedPtr rowset;
config::segcompaction_candidate_max_rows = 6000; // set threshold above
// rows_per_segment
- std::vector<uint32_t> segment_num_rows;
{ // write `num_segments * rows_per_segment` rows to rowset
RowsetWriterContext writer_context;
create_rowset_writer_context(10048, tablet_schema, &writer_context);
@@ -561,7 +561,6 @@ TEST_F(SegCompactionTest,
SegCompactionInterleaveWithBig_OoOoO) {
RowsetSharedPtr rowset;
config::segcompaction_candidate_max_rows = 6000; // set threshold above
config::segcompaction_batch_size = 5;
- std::vector<uint32_t> segment_num_rows;
{ // write `num_segments * rows_per_segment` rows to rowset
RowsetWriterContext writer_context;
create_rowset_writer_context(10049, tablet_schema, &writer_context);
@@ -693,7 +692,6 @@ TEST_F(SegCompactionTest,
SegCompactionThenReadUniqueTableSmall) {
config::segcompaction_candidate_max_rows = 6000; // set threshold above
// rows_per_segment
config::segcompaction_batch_size = 3;
- std::vector<uint32_t> segment_num_rows;
{ // write `num_segments * rows_per_segment` rows to rowset
RowsetWriterContext writer_context;
create_rowset_writer_context(10051, tablet_schema, &writer_context);
@@ -894,7 +892,9 @@ TEST_F(SegCompactionTest,
SegCompactionThenReadUniqueTableSmall) {
// duplicated keys between segments are counted duplicately
// so actual read by rowset reader is less or equal to it
EXPECT_GE(rowset->rowset_meta()->num_rows(), num_rows_read);
-
EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok());
+ auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
+ std::vector<uint32_t> segment_num_rows;
+
EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows).ok());
size_t total_num_rows = 0;
for (const auto& i : segment_num_rows) {
total_num_rows += i;
@@ -927,7 +927,6 @@ TEST_F(SegCompactionTest, CreateSegCompactionWriter) {
config::segcompaction_candidate_max_rows = 6000; // set threshold above
// rows_per_segment
config::segcompaction_batch_size = 3;
- std::vector<uint32_t> segment_num_rows;
{
RowsetWriterContext writer_context;
create_rowset_writer_context(10052, tablet_schema, &writer_context);
@@ -959,7 +958,6 @@ TEST_F(SegCompactionTest,
SegCompactionThenReadAggTableSmall) {
config::segcompaction_candidate_max_rows = 6000; // set threshold above
// rows_per_segment
config::segcompaction_batch_size = 3;
- std::vector<uint32_t> segment_num_rows;
{ // write `num_segments * rows_per_segment` rows to rowset
RowsetWriterContext writer_context;
create_rowset_writer_context(10052, tablet_schema, &writer_context);
@@ -1162,7 +1160,9 @@ TEST_F(SegCompactionTest,
SegCompactionThenReadAggTableSmall) {
// duplicated keys between segments are counted duplicately
// so actual read by rowset reader is less or equal to it
EXPECT_GE(rowset->rowset_meta()->num_rows(), num_rows_read);
-
EXPECT_TRUE(rowset_reader->get_segment_num_rows(&segment_num_rows).ok());
+ auto beta_rowset = std::dynamic_pointer_cast<BetaRowset>(rowset);
+ std::vector<uint32_t> segment_num_rows;
+
EXPECT_TRUE(beta_rowset->get_segment_num_rows(&segment_num_rows).ok());
size_t total_num_rows = 0;
for (const auto& i : segment_num_rows) {
total_num_rows += i;
diff --git a/be/test/testutil/mock_rowset.h b/be/test/testutil/mock_rowset.h
index 1d6638863df..78f8f71bee8 100644
--- a/be/test/testutil/mock_rowset.h
+++ b/be/test/testutil/mock_rowset.h
@@ -77,10 +77,6 @@ protected:
Status init() override { return Status::NotSupported("MockRowset not
support this method."); }
- Status do_load(bool use_cache) override {
- return Status::NotSupported("MockRowset not support this method.");
- }
-
void do_close() override {
// Do nothing.
}
diff --git a/be/test/vec/olap/vertical_compaction_test.cpp
b/be/test/vec/olap/vertical_compaction_test.cpp
index 4c4409a7506..dd6f6932efc 100644
--- a/be/test/vec/olap/vertical_compaction_test.cpp
+++ b/be/test/vec/olap/vertical_compaction_test.cpp
@@ -521,8 +521,6 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) {
EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
EXPECT_EQ(output_data.size(), num_input_rowset * num_segments *
rows_per_segment);
- std::vector<uint32_t> segment_num_rows;
-
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
// check vertical compaction result
for (auto id = 0; id < output_data.size(); id++) {
LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " "
@@ -628,8 +626,6 @@ TEST_F(VerticalCompactionTest,
TestDupWithoutKeyVerticalMerge) {
EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
EXPECT_EQ(output_data.size(), num_input_rowset * num_segments *
rows_per_segment);
- std::vector<uint32_t> segment_num_rows;
-
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
// check vertical compaction result
for (auto id = 0; id < output_data.size(); id++) {
LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " "
@@ -736,8 +732,6 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) {
EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
EXPECT_EQ(output_data.size(), num_segments * rows_per_segment);
- std::vector<uint32_t> segment_num_rows;
-
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
// check vertical compaction result
for (auto id = 0; id < output_data.size(); id++) {
LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " "
@@ -848,8 +842,6 @@ TEST_F(VerticalCompactionTest,
TestDupKeyVerticalMergeWithDelete) {
EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
EXPECT_EQ(output_data.size(),
num_input_rowset * num_segments * rows_per_segment -
num_input_rowset * 100);
- std::vector<uint32_t> segment_num_rows;
-
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
// All keys less than 1000 are deleted by delete handler
for (auto& item : output_data) {
ASSERT_GE(std::get<0>(item), 100);
@@ -951,8 +943,6 @@ TEST_F(VerticalCompactionTest,
TestDupWithoutKeyVerticalMergeWithDelete) {
EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
EXPECT_EQ(output_data.size(),
num_input_rowset * num_segments * rows_per_segment -
num_input_rowset * 100);
- std::vector<uint32_t> segment_num_rows;
-
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
// All keys less than 1000 are deleted by delete handler
for (auto& item : output_data) {
ASSERT_GE(std::get<0>(item), 100);
@@ -1042,8 +1032,6 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) {
EXPECT_EQ(Status::Error<END_OF_FILE>(""), s);
EXPECT_EQ(out_rowset->rowset_meta()->num_rows(), output_data.size());
EXPECT_EQ(output_data.size(), num_segments * rows_per_segment);
- std::vector<uint32_t> segment_num_rows;
-
EXPECT_TRUE(output_rs_reader->get_segment_num_rows(&segment_num_rows).ok());
// check vertical compaction result
for (auto id = 0; id < output_data.size(); id++) {
LOG(INFO) << "output data: " << std::get<0>(output_data[id]) << " "
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]