This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4f4e2074a1d [feature](merge-cloud) Add CloudTablet (#30045)
4f4e2074a1d is described below
commit 4f4e2074a1d986246e686b23a68cdd092b3d1fac
Author: plat1ko <[email protected]>
AuthorDate: Thu Jan 18 10:44:04 2024 +0800
[feature](merge-cloud) Add CloudTablet (#30045)
---
be/src/cloud/cloud_meta_mgr.cpp | 26 +-
be/src/cloud/cloud_meta_mgr.h | 11 +-
be/src/cloud/cloud_storage_engine.h | 39 +++
be/src/cloud/cloud_tablet.cpp | 441 ++++++++++++++++++++++++++++
be/src/cloud/cloud_tablet.h | 124 ++++++++
be/src/common/status.h | 3 +-
be/src/olap/base_tablet.cpp | 40 +++
be/src/olap/base_tablet.h | 19 +-
be/src/olap/rowset/rowset.cpp | 9 +
be/src/olap/rowset/rowset.h | 2 +
be/src/olap/schema_change.cpp | 7 +-
be/src/olap/tablet.cpp | 104 ++-----
be/src/olap/tablet.h | 25 +-
be/src/olap/tablet_meta.cpp | 7 +
be/src/olap/tablet_meta.h | 6 +
be/src/vec/exec/scan/new_olap_scan_node.cpp | 16 +-
be/src/vec/exec/scan/new_olap_scanner.cpp | 15 +-
17 files changed, 752 insertions(+), 142 deletions(-)
diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index cbe6ab8ae24..d6eb54e5c41 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -29,6 +29,7 @@
#include <type_traits>
#include <vector>
+#include "cloud/cloud_tablet.h"
#include "cloud/config.h"
#include "cloud/pb_convert.h"
#include "common/logging.h"
@@ -38,7 +39,6 @@
#include "gen_cpp/olap_file.pb.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset_factory.h"
-#include "olap/tablet.h"
#include "olap/tablet_meta.h"
#include "runtime/stream_load/stream_load_context.h"
#include "util/network_util.h"
@@ -270,12 +270,12 @@ Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id,
TabletMetaSharedPtr* tab
return Status::OK();
}
-Status CloudMetaMgr::sync_tablet_rowsets(Tablet* tablet, bool
warmup_delta_data) {
+Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool
warmup_delta_data) {
return Status::NotSupported("CloudMetaMgr::sync_tablet_rowsets is not
implemented");
}
Status CloudMetaMgr::sync_tablet_delete_bitmap(
- Tablet* tablet, int64_t old_max_version,
+ CloudTablet* tablet, int64_t old_max_version,
const google::protobuf::RepeatedPtrField<RowsetMetaPB>& rs_metas,
const TabletStatsPB& stats, const TabletIndexPB& idx, DeleteBitmap*
delete_bitmap) {
return Status::NotSupported("CloudMetaMgr::sync_tablet_delete_bitmap is
not implemented");
@@ -425,15 +425,15 @@ Status CloudMetaMgr::update_tablet_schema(int64_t
tablet_id, const TabletSchema&
return Status::OK();
}
-Status CloudMetaMgr::update_delete_bitmap(const Tablet* tablet, int64_t
lock_id, int64_t initiator,
- DeleteBitmap* delete_bitmap) {
- VLOG_DEBUG << "update_delete_bitmap , tablet_id: " << tablet->tablet_id();
+Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t
lock_id,
+ int64_t initiator, DeleteBitmap*
delete_bitmap) {
+ VLOG_DEBUG << "update_delete_bitmap , tablet_id: " << tablet.tablet_id();
UpdateDeleteBitmapRequest req;
UpdateDeleteBitmapResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
- req.set_table_id(tablet->table_id());
- req.set_partition_id(tablet->partition_id());
- req.set_tablet_id(tablet->tablet_id());
+ req.set_table_id(tablet.table_id());
+ req.set_partition_id(tablet.partition_id());
+ req.set_tablet_id(tablet.tablet_id());
req.set_lock_id(lock_id);
req.set_initiator(initiator);
for (auto iter = delete_bitmap->delete_bitmap.begin();
@@ -451,18 +451,18 @@ Status CloudMetaMgr::update_delete_bitmap(const Tablet*
tablet, int64_t lock_id,
if (res.status().code() == MetaServiceCode::LOCK_EXPIRED) {
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
"lock expired when update delete bitmap, tablet_id: {},
lock_id: {}",
- tablet->tablet_id(), lock_id);
+ tablet.tablet_id(), lock_id);
}
return st;
}
-Status CloudMetaMgr::get_delete_bitmap_update_lock(const Tablet* tablet,
int64_t lock_id,
+Status CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet,
int64_t lock_id,
int64_t initiator) {
- VLOG_DEBUG << "get_delete_bitmap_update_lock , tablet_id: " <<
tablet->tablet_id();
+ VLOG_DEBUG << "get_delete_bitmap_update_lock , tablet_id: " <<
tablet.tablet_id();
GetDeleteBitmapUpdateLockRequest req;
GetDeleteBitmapUpdateLockResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
- req.set_table_id(tablet->table_id());
+ req.set_table_id(tablet.table_id());
req.set_lock_id(lock_id);
req.set_initiator(initiator);
req.set_expiration(10); // 10s expiration time for compaction and
schema_change
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index 6557a6eab8a..af5b048b2f0 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -29,7 +29,7 @@ namespace doris {
class DeleteBitmap;
class StreamLoadContext;
-class Tablet;
+class CloudTablet;
class TabletMeta;
class TabletSchema;
class RowsetMeta;
@@ -51,7 +51,7 @@ public:
Status get_tablet_meta(int64_t tablet_id, std::shared_ptr<TabletMeta>*
tablet_meta);
- Status sync_tablet_rowsets(Tablet* tablet, bool warmup_delta_data = false);
+ Status sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data =
false);
Status prepare_rowset(const RowsetMeta& rs_meta, bool is_tmp,
std::shared_ptr<RowsetMeta>* existed_rs_meta =
nullptr);
@@ -79,14 +79,15 @@ public:
Status update_tablet_schema(int64_t tablet_id, const TabletSchema&
tablet_schema);
- Status update_delete_bitmap(const Tablet* tablet, int64_t lock_id, int64_t
initiator,
+ Status update_delete_bitmap(const CloudTablet& tablet, int64_t lock_id,
int64_t initiator,
DeleteBitmap* delete_bitmap);
- Status get_delete_bitmap_update_lock(const Tablet* tablet, int64_t
lock_id, int64_t initiator);
+ Status get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t
lock_id,
+ int64_t initiator);
private:
Status sync_tablet_delete_bitmap(
- Tablet* tablet, int64_t old_max_version,
+ CloudTablet* tablet, int64_t old_max_version,
const google::protobuf::RepeatedPtrField<RowsetMetaPB>& rs_metas,
const TabletStatsPB& stas, const TabletIndexPB& idx, DeleteBitmap*
delete_bitmap);
};
diff --git a/be/src/cloud/cloud_storage_engine.h
b/be/src/cloud/cloud_storage_engine.h
new file mode 100644
index 00000000000..87e3ed52d39
--- /dev/null
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+namespace doris {
+namespace cloud {
+class CloudMetaMgr;
+}
+
+class CloudStorageEngine {
+public:
+ CloudStorageEngine();
+
+ ~CloudStorageEngine();
+
+ cloud::CloudMetaMgr& meta_mgr() { return *_meta_mgr; }
+
+private:
+ std::unique_ptr<cloud::CloudMetaMgr> _meta_mgr;
+};
+
+} // namespace doris
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
new file mode 100644
index 00000000000..03670df78d1
--- /dev/null
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -0,0 +1,441 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_tablet.h"
+
+#include <rapidjson/document.h>
+#include <rapidjson/encodings.h>
+#include <rapidjson/prettywriter.h>
+#include <rapidjson/rapidjson.h>
+#include <rapidjson/stringbuffer.h>
+
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "io/cache/block/block_file_cache_factory.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/segment_v2/inverted_index_desc.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr
tablet_meta)
+ : BaseTablet(std::move(tablet_meta)), _engine(engine) {}
+
+CloudTablet::~CloudTablet() = default;
+
+bool CloudTablet::exceed_version_limit(int32_t limit) {
+ return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit;
+}
+
+Status CloudTablet::capture_rs_readers(const Version& spec_version,
+ std::vector<RowSetSplits>* rs_splits,
+ bool skip_missing_version) {
+ Versions version_path;
+ std::shared_lock rlock(_meta_lock);
+ auto st =
_timestamped_version_tracker.capture_consistent_versions(spec_version,
&version_path);
+ if (!st.ok()) {
+ rlock.unlock(); // avoid logging in lock range
+ // Check no missed versions or req version is merged
+ auto missed_versions = calc_missed_versions(spec_version.second);
+ if (missed_versions.empty()) {
+ st.set_code(VERSION_ALREADY_MERGED); // Reset error code
+ }
+ st.append(" tablet_id=" + std::to_string(tablet_id()));
+ // clang-format off
+ LOG(WARNING) << st << '\n' << [this]() { std::string json;
get_compaction_status(&json); return json; }();
+ // clang-format on
+ return st;
+ }
+ VLOG_DEBUG << "capture consitent versions: " << version_path;
+ return capture_rs_readers_unlocked(version_path, rs_splits);
+}
+
+// for example:
+// [0-4][5-5][8-8][9-9][13-13]
+// if spec_version = 12, it will return [6-7],[10-12]
+Versions CloudTablet::calc_missed_versions(int64_t spec_version) {
+ DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
+
+ Versions missed_versions;
+ Versions existing_versions;
+ {
+ std::shared_lock rdlock(_meta_lock);
+ for (const auto& rs : _tablet_meta->all_rs_metas()) {
+ existing_versions.emplace_back(rs->version());
+ }
+ }
+
+ // sort the existing versions in ascending order
+ std::sort(existing_versions.begin(), existing_versions.end(),
+ [](const Version& a, const Version& b) {
+ // simple because 2 versions are certainly not overlapping
+ return a.first < b.first;
+ });
+
+ auto min_version = existing_versions.front().first;
+ if (min_version > 0) {
+ missed_versions.emplace_back(0, std::min(spec_version, min_version -
1));
+ }
+ for (auto it = existing_versions.begin(); it != existing_versions.end() -
1; ++it) {
+ auto prev_v = it->second;
+ if (prev_v >= spec_version) {
+ return missed_versions;
+ }
+ auto next_v = (it + 1)->first;
+ if (next_v > prev_v + 1) {
+ // there is a hole between versions
+ missed_versions.emplace_back(prev_v + 1, std::min(spec_version,
next_v - 1));
+ }
+ }
+ auto max_version = existing_versions.back().second;
+ if (max_version < spec_version) {
+ missed_versions.emplace_back(max_version + 1, spec_version);
+ }
+ return missed_versions;
+}
+
+Status CloudTablet::sync_meta() {
+ // TODO(lightman): FileCache
+ return Status::NotSupported("CloudTablet::sync_meta is not implemented");
+}
+
+// There are only two tablet_states RUNNING and NOT_READY in cloud mode
+// This function will erase the tablet from `CloudTabletMgr` when it can't
find this tablet in MS.
+Status CloudTablet::sync_rowsets(int64_t query_version, bool
warmup_delta_data) {
+ RETURN_IF_ERROR(sync_if_not_running());
+
+ if (query_version > 0) {
+ std::shared_lock rlock(_meta_lock);
+ if (_max_version >= query_version) {
+ return Status::OK();
+ }
+ }
+
+ // serially execute sync to reduce unnecessary network overhead
+ std::lock_guard lock(_sync_meta_lock);
+ if (query_version > 0) {
+ std::shared_lock rlock(_meta_lock);
+ if (_max_version >= query_version) {
+ return Status::OK();
+ }
+ }
+
+ auto st = _engine.meta_mgr().sync_tablet_rowsets(this, warmup_delta_data);
+ if (st.is<ErrorCode::NOT_FOUND>()) {
+ recycle_cached_data();
+ }
+ return st;
+}
+
+// Sync tablet meta and all rowset meta if not running.
+// This could happen when BE didn't finish schema change job and another BE
committed this schema change job.
+// It should be a quite rare situation.
+Status CloudTablet::sync_if_not_running() {
+ if (tablet_state() == TABLET_RUNNING) {
+ return Status::OK();
+ }
+
+ // Serially execute sync to reduce unnecessary network overhead
+ std::lock_guard lock(_sync_meta_lock);
+
+ {
+ std::shared_lock rlock(_meta_lock);
+ if (tablet_state() == TABLET_RUNNING) {
+ return Status::OK();
+ }
+ }
+
+ TabletMetaSharedPtr tablet_meta;
+ auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
+ if (!st.ok()) {
+ if (st.is<ErrorCode::NOT_FOUND>()) {
+ recycle_cached_data();
+ }
+ return st;
+ }
+
+ if (tablet_meta->tablet_state() != TABLET_RUNNING) [[unlikely]] {
+ // MoW may go to here when load while schema change
+ return Status::Error<INVALID_TABLET_STATE>("invalid tablet state {}.
tablet_id={}",
+
tablet_meta->tablet_state(), tablet_id());
+ }
+
+ TimestampedVersionTracker empty_tracker;
+ {
+ std::lock_guard wlock(_meta_lock);
+ RETURN_IF_ERROR(set_tablet_state(TABLET_RUNNING));
+ _rs_version_map.clear();
+ _stale_rs_version_map.clear();
+ std::swap(_timestamped_version_tracker, empty_tracker);
+ _tablet_meta->clear_rowsets();
+ _tablet_meta->clear_stale_rowset();
+ _max_version = -1;
+ }
+
+ st = _engine.meta_mgr().sync_tablet_rowsets(this);
+ if (st.is<ErrorCode::NOT_FOUND>()) {
+ recycle_cached_data();
+ }
+ return st;
+}
+
+void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool
version_overlap,
+ std::unique_lock<std::shared_mutex>& meta_lock,
+ bool warmup_delta_data) {
+ if (to_add.empty()) {
+ return;
+ }
+
+ auto add_rowsets_directly = [=, this](std::vector<RowsetSharedPtr>&
rowsets) {
+ for (auto& rs : rowsets) {
+ _rs_version_map.emplace(rs->version(), rs);
+ _timestamped_version_tracker.add_version(rs->version());
+ _max_version = std::max(rs->end_version(), _max_version);
+ update_base_size(*rs);
+ }
+ _tablet_meta->add_rowsets_unchecked(rowsets);
+ // TODO(plat1ko): Warmup delta rowset data in background
+ };
+
+ if (!version_overlap) {
+ add_rowsets_directly(to_add);
+ return;
+ }
+
+ // Filter out existed rowsets
+ auto remove_it =
+ std::remove_if(to_add.begin(), to_add.end(), [this](const
RowsetSharedPtr& rs) {
+ if (auto find_it = _rs_version_map.find(rs->version());
+ find_it == _rs_version_map.end()) {
+ return false;
+ } else if (find_it->second->rowset_id() == rs->rowset_id()) {
+ return true; // Same rowset
+ }
+
+ // If version of rowset in `to_add` is equal to rowset in
tablet but rowset_id is not equal,
+ // replace existed rowset with `to_add` rowset. This may occur
when:
+ // 1. schema change converts rowsets which have been double
written to new tablet
+ // 2. cumu compaction picks single overlapping input rowset
to perform compaction
+ _tablet_meta->delete_rs_meta_by_version(rs->version(),
nullptr);
+ _rs_version_map[rs->version()] = rs;
+ _tablet_meta->add_rowsets_unchecked({rs});
+ update_base_size(*rs);
+ return true;
+ });
+
+ to_add.erase(remove_it, to_add.end());
+
+ // delete rowsets with overlapped version
+ std::vector<RowsetSharedPtr> to_add_directly;
+ for (auto& to_add_rs : to_add) {
+ // delete rowsets with overlapped version
+ std::vector<RowsetSharedPtr> to_delete;
+ Version to_add_v = to_add_rs->version();
+ // if start_version > max_version, we can skip checking overlap here.
+ if (to_add_v.first > _max_version) {
+ // if start_version > max_version, we can skip checking overlap
here.
+ to_add_directly.push_back(to_add_rs);
+ } else {
+ to_add_directly.push_back(to_add_rs);
+ for (auto& [v, rs] : _rs_version_map) {
+ if (to_add_v.contains(v)) {
+ to_delete.push_back(rs);
+ }
+ }
+ delete_rowsets(to_delete, meta_lock);
+ }
+ }
+
+ add_rowsets_directly(to_add_directly);
+}
+
+void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,
+ std::unique_lock<std::shared_mutex>&) {
+ if (to_delete.empty()) {
+ return;
+ }
+ std::vector<RowsetMetaSharedPtr> rs_metas;
+ rs_metas.reserve(to_delete.size());
+ for (auto&& rs : to_delete) {
+ rs_metas.push_back(rs->rowset_meta());
+ _stale_rs_version_map[rs->version()] = rs;
+ }
+ _timestamped_version_tracker.add_stale_path_version(rs_metas);
+ for (auto&& rs : to_delete) {
+ _rs_version_map.erase(rs->version());
+ }
+
+ _tablet_meta->modify_rs_metas({}, rs_metas, false);
+}
+
+int CloudTablet::delete_expired_stale_rowsets() {
+ std::vector<RowsetSharedPtr> expired_rowsets;
+ int64_t expired_stale_sweep_endtime =
+ ::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec;
+ {
+ std::unique_lock wlock(_meta_lock);
+
+ std::vector<int64_t> path_ids;
+ // capture the path version to delete
+
_timestamped_version_tracker.capture_expired_paths(expired_stale_sweep_endtime,
&path_ids);
+
+ if (path_ids.empty()) {
+ return 0;
+ }
+
+ for (int64_t path_id : path_ids) {
+ // delete stale versions in version graph
+ auto version_path =
_timestamped_version_tracker.fetch_and_delete_path_by_id(path_id);
+ for (auto& v_ts : version_path->timestamped_versions()) {
+ auto rs_it = _stale_rs_version_map.find(v_ts->version());
+ if (rs_it != _stale_rs_version_map.end()) {
+ expired_rowsets.push_back(rs_it->second);
+ _stale_rs_version_map.erase(rs_it);
+ } else {
+ LOG(WARNING) << "cannot find stale rowset " <<
v_ts->version() << " in tablet "
+ << tablet_id();
+ // clang-format off
+ DCHECK(false) << [this, &wlock]() { wlock.unlock();
std::string json; get_compaction_status(&json); return json; }();
+ // clang-format on
+ }
+ _tablet_meta->delete_stale_rs_meta_by_version(v_ts->version());
+ VLOG_DEBUG << "delete stale rowset " << v_ts->version();
+ }
+ }
+ _reconstruct_version_tracker_if_necessary();
+ }
+ recycle_cached_data(expired_rowsets);
+ return expired_rowsets.size();
+}
+
+void CloudTablet::update_base_size(const Rowset& rs) {
+ // Define base rowset as the rowset of version [2-x]
+ if (rs.start_version() == 2) {
+ _base_size = rs.data_disk_size();
+ }
+}
+
+void CloudTablet::recycle_cached_data() {
+ // TODO(plat1ko)
+}
+
+void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>&
rowsets) {
+ // TODO(plat1ko)
+}
+
+void CloudTablet::reset_approximate_stats(int64_t num_rowsets, int64_t
num_segments,
+ int64_t num_rows, int64_t data_size)
{
+ _approximate_num_rowsets.store(num_rowsets, std::memory_order_relaxed);
+ _approximate_num_segments.store(num_segments, std::memory_order_relaxed);
+ _approximate_num_rows.store(num_rows, std::memory_order_relaxed);
+ _approximate_data_size.store(data_size, std::memory_order_relaxed);
+ int64_t cumu_num_deltas = 0;
+ int64_t cumu_num_rowsets = 0;
+ auto cp = _cumulative_point.load(std::memory_order_relaxed);
+ for (auto& [v, r] : _rs_version_map) {
+ if (v.second < cp) {
+ continue;
+ }
+
+ cumu_num_deltas += r->is_segments_overlapping() ? r->num_segments() :
1;
+ ++cumu_num_rowsets;
+ }
+ _approximate_cumu_num_rowsets.store(cumu_num_rowsets,
std::memory_order_relaxed);
+ _approximate_cumu_num_deltas.store(cumu_num_deltas,
std::memory_order_relaxed);
+}
+
+Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_rowset_writer(
+ RowsetWriterContext& context, bool vertical) {
+ return ResultError(
+ Status::NotSupported("CloudTablet::create_rowset_writer is not
implemented"));
+}
+
+// return a json string to show the compaction status of this tablet
+void CloudTablet::get_compaction_status(std::string* json_result) {
+ rapidjson::Document root;
+ root.SetObject();
+
+ rapidjson::Document path_arr;
+ path_arr.SetArray();
+
+ std::vector<RowsetSharedPtr> rowsets;
+ std::vector<RowsetSharedPtr> stale_rowsets;
+ {
+ std::shared_lock rdlock(_meta_lock);
+ rowsets.reserve(_rs_version_map.size());
+ for (auto& it : _rs_version_map) {
+ rowsets.push_back(it.second);
+ }
+ stale_rowsets.reserve(_stale_rs_version_map.size());
+ for (auto& it : _stale_rs_version_map) {
+ stale_rowsets.push_back(it.second);
+ }
+ }
+ std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator);
+ std::sort(stale_rowsets.begin(), stale_rowsets.end(), Rowset::comparator);
+
+ // get snapshot version path json_doc
+ _timestamped_version_tracker.get_stale_version_path_json_doc(path_arr);
+ root.AddMember("cumulative point", _cumulative_point.load(),
root.GetAllocator());
+
+ // print all rowsets' version as an array
+ rapidjson::Document versions_arr;
+ rapidjson::Document missing_versions_arr;
+ versions_arr.SetArray();
+ missing_versions_arr.SetArray();
+ int64_t last_version = -1;
+ for (auto& rowset : rowsets) {
+ const Version& ver = rowset->version();
+ if (ver.first != last_version + 1) {
+ rapidjson::Value miss_value;
+ miss_value.SetString(fmt::format("[{}-{}]", last_version + 1,
ver.first - 1).c_str(),
+ missing_versions_arr.GetAllocator());
+ missing_versions_arr.PushBack(miss_value,
missing_versions_arr.GetAllocator());
+ }
+ rapidjson::Value value;
+ std::string version_str = rowset->get_rowset_info_str();
+ value.SetString(version_str.c_str(), version_str.length(),
versions_arr.GetAllocator());
+ versions_arr.PushBack(value, versions_arr.GetAllocator());
+ last_version = ver.second;
+ }
+ root.AddMember("rowsets", versions_arr, root.GetAllocator());
+ root.AddMember("missing_rowsets", missing_versions_arr,
root.GetAllocator());
+
+ // print all stale rowsets' version as an array
+ rapidjson::Document stale_versions_arr;
+ stale_versions_arr.SetArray();
+ for (auto& rowset : stale_rowsets) {
+ rapidjson::Value value;
+ std::string version_str = rowset->get_rowset_info_str();
+ value.SetString(version_str.c_str(), version_str.length(),
+ stale_versions_arr.GetAllocator());
+ stale_versions_arr.PushBack(value, stale_versions_arr.GetAllocator());
+ }
+ root.AddMember("stale_rowsets", stale_versions_arr, root.GetAllocator());
+
+ // add stale version rowsets
+ root.AddMember("stale version path", path_arr, root.GetAllocator());
+
+ // to json string
+ rapidjson::StringBuffer strbuf;
+ rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
+ root.Accept(writer);
+ *json_result = std::string(strbuf.GetString());
+}
+
+} // namespace doris
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
new file mode 100644
index 00000000000..537c8fe134d
--- /dev/null
+++ b/be/src/cloud/cloud_tablet.h
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+
+#include "olap/base_tablet.h"
+#include "olap/version_graph.h"
+
+namespace doris {
+
+class CloudStorageEngine;
+
+class CloudTablet final : public BaseTablet {
+public:
+ CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta);
+
+ ~CloudTablet() override;
+
+ bool exceed_version_limit(int32_t limit) override;
+
+ Result<std::unique_ptr<RowsetWriter>>
create_rowset_writer(RowsetWriterContext& context,
+ bool vertical)
override;
+
+ Status capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
+ bool skip_missing_version) override;
+
+ size_t tablet_footprint() override {
+ return _approximate_data_size.load(std::memory_order_relaxed);
+ }
+
+ // clang-format off
+ int64_t fetch_add_approximate_num_rowsets (int64_t x) { return
_approximate_num_rowsets .fetch_add(x, std::memory_order_relaxed); }
+ int64_t fetch_add_approximate_num_segments(int64_t x) { return
_approximate_num_segments.fetch_add(x, std::memory_order_relaxed); }
+ int64_t fetch_add_approximate_num_rows (int64_t x) { return
_approximate_num_rows .fetch_add(x, std::memory_order_relaxed); }
+ int64_t fetch_add_approximate_data_size (int64_t x) { return
_approximate_data_size .fetch_add(x, std::memory_order_relaxed); }
+ int64_t fetch_add_approximate_cumu_num_rowsets (int64_t x) { return
_approximate_cumu_num_rowsets.fetch_add(x, std::memory_order_relaxed); }
+ int64_t fetch_add_approximate_cumu_num_deltas (int64_t x) { return
_approximate_cumu_num_deltas.fetch_add(x, std::memory_order_relaxed); }
+ // clang-format on
+
+ // meta lock must be held when calling this function
+ void reset_approximate_stats(int64_t num_rowsets, int64_t num_segments,
int64_t num_rows,
+ int64_t data_size);
+
+ // return a json string to show the compaction status of this tablet
+ void get_compaction_status(std::string* json_result);
+
+ // Synchronize the rowsets from meta service.
+ // If tablet state is not `TABLET_RUNNING`, sync tablet meta and all
visible rowsets.
+ // If `query_version` > 0 and local max_version of the tablet >=
`query_version`, do nothing.
+ // If 'need_download_data_async' is true, it means that we need to
download the new version
+ // rowsets datas async.
+ Status sync_rowsets(int64_t query_version = -1, bool warmup_delta_data =
false);
+
+ // Synchronize the tablet meta from meta service.
+ Status sync_meta();
+
+ // If `version_overlap` is true, function will delete rowsets with
overlapped version in this tablet.
+ // If 'warmup_delta_data' is true, download the new version rowset data in
background.
+ // MUST hold EXCLUSIVE `_meta_lock`.
+ // If 'need_download_data_async' is true, it means that we need to
download the new version
+ // rowsets datas async.
+ void add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_overlap,
+ std::unique_lock<std::shared_mutex>& meta_lock,
+ bool warmup_delta_data = false);
+
+ // MUST hold EXCLUSIVE `_meta_lock`.
+ void delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,
+ std::unique_lock<std::shared_mutex>& meta_lock);
+
+ // When the tablet is dropped, we need to recycle cached data:
+ // 1. The data in file cache
+ // 2. The memory in tablet cache
+ void recycle_cached_data();
+
+ void recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets);
+
+ // Return number of deleted stale rowsets
+ int delete_expired_stale_rowsets();
+
+private:
+ Versions calc_missed_versions(int64_t spec_version);
+
+ // FIXME(plat1ko): No need to record base size if rowsets are ordered by
version
+ void update_base_size(const Rowset& rs);
+
+ Status sync_if_not_running();
+
+ CloudStorageEngine& _engine;
+
+ // this mutex MUST ONLY be used when sync meta
+ bthread::Mutex _sync_meta_lock;
+
+ std::atomic<int64_t> _cumulative_point {-1};
+ std::atomic<int64_t> _approximate_num_rowsets {-1};
+ std::atomic<int64_t> _approximate_num_segments {-1};
+ std::atomic<int64_t> _approximate_num_rows {-1};
+ std::atomic<int64_t> _approximate_data_size {-1};
+ std::atomic<int64_t> _approximate_cumu_num_rowsets {-1};
+ // Number of sorted arrays (e.g. for rowset with N segments, if rowset is
overlapping, delta is N, otherwise 1) after cumu point
+ std::atomic<int64_t> _approximate_cumu_num_deltas {-1};
+
+ [[maybe_unused]] int64_t _base_compaction_cnt = 0;
+ [[maybe_unused]] int64_t _cumulative_compaction_cnt = 0;
+ int64_t _max_version = -1;
+ int64_t _base_size = 0;
+};
+
+} // namespace doris
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 5199e86d120..2bec1c397e8 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -274,7 +274,8 @@ namespace ErrorCode {
E(INVERTED_INDEX_COMPACTION_ERROR, -6010, false); \
E(KEY_NOT_FOUND, -7000, false); \
E(KEY_ALREADY_EXISTS, -7001, false); \
- E(ENTRY_NOT_FOUND, -7002, false);
+ E(ENTRY_NOT_FOUND, -7002, false); \
+ E(INVALID_TABLET_STATE, -7211, false);
// Define constexpr int error_code_name = error_code_value
#define M(NAME, ERRORCODE, ENABLESTACKTRACE) constexpr int NAME = ERRORCODE;
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 43f514906cf..18445cb17a6 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -19,6 +19,8 @@
#include <fmt/format.h>
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_reader.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_schema_cache.h"
#include "util/doris_metrics.h"
@@ -79,4 +81,42 @@ Status BaseTablet::update_by_least_common_schema(const
TabletSchemaSPtr& update_
return Status::OK();
}
+Status BaseTablet::capture_rs_readers_unlocked(const std::vector<Version>&
version_path,
+ std::vector<RowSetSplits>*
rs_splits) const {
+ DCHECK(rs_splits != nullptr && rs_splits->empty());
+ for (auto version : version_path) {
+ auto it = _rs_version_map.find(version);
+ if (it == _rs_version_map.end()) {
+ VLOG_NOTICE << "fail to find Rowset in rs_version for version.
tablet=" << tablet_id()
+ << ", version='" << version.first << "-" <<
version.second;
+
+ it = _stale_rs_version_map.find(version);
+ if (it == _stale_rs_version_map.end()) {
+ return Status::Error<CAPTURE_ROWSET_READER_ERROR>(
+ "fail to find Rowset in stale_rs_version for version.
tablet={}, "
+ "version={}-{}",
+ tablet_id(), version.first, version.second);
+ }
+ }
+ RowsetReaderSharedPtr rs_reader;
+ auto res = it->second->create_reader(&rs_reader);
+ if (!res.ok()) {
+ return Status::Error<CAPTURE_ROWSET_READER_ERROR>(
+ "failed to create reader for rowset:{}",
it->second->rowset_id().to_string());
+ }
+ rs_splits->push_back(RowSetSplits(std::move(rs_reader)));
+ }
+ return Status::OK();
+}
+
+bool BaseTablet::_reconstruct_version_tracker_if_necessary() {
+ double orphan_vertex_ratio =
_timestamped_version_tracker.get_orphan_vertex_ratio();
+ if (orphan_vertex_ratio >=
config::tablet_version_graph_orphan_vertex_ratio) {
+ _timestamped_version_tracker.construct_versioned_tracker(
+ _tablet_meta->all_rs_metas(),
_tablet_meta->all_stale_rs_metas());
+ return true;
+ }
+ return false;
+}
+
} /* namespace doris */
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 2fa494b420a..bb327b39532 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -24,6 +24,7 @@
#include "common/status.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_meta.h"
+#include "olap/version_graph.h"
#include "util/metrics.h"
namespace doris {
@@ -72,19 +73,33 @@ public:
return _max_version_schema;
}
- virtual bool exceed_version_limit(int32_t limit) const = 0;
+ virtual bool exceed_version_limit(int32_t limit) = 0;
virtual Result<std::unique_ptr<RowsetWriter>>
create_rowset_writer(RowsetWriterContext& context,
bool
vertical) = 0;
virtual Status capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
- bool skip_missing_version) const = 0;
+ bool skip_missing_version) = 0;
virtual size_t tablet_footprint() = 0;
+ // MUST hold shared meta lock
+ Status capture_rs_readers_unlocked(const std::vector<Version>&
version_path,
+ std::vector<RowSetSplits>* rs_splits)
const;
+
protected:
+ bool _reconstruct_version_tracker_if_necessary();
+
mutable std::shared_mutex _meta_lock;
+ TimestampedVersionTracker _timestamped_version_tracker;
+ // After version 0.13, all newly created rowsets are saved in
_rs_version_map.
+ // And if rowset being compacted, the old rowsetis will be saved in
_stale_rs_version_map;
+ std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>
_rs_version_map;
+ // This variable _stale_rs_version_map is used to record these rowsets
which are be compacted.
+ // These _stale rowsets are been removed when rowsets' pathVersion is
expired,
+ // this policy is judged and computed by TimestampedVersionTracker.
+ std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>
_stale_rs_version_map;
const TabletMetaSharedPtr _tablet_meta;
TabletSchemaSPtr _max_version_schema;
diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp
index b3e6b1ca9e0..d09fce730e5 100644
--- a/be/src/olap/rowset/rowset.cpp
+++ b/be/src/olap/rowset/rowset.cpp
@@ -92,4 +92,13 @@ void Rowset::merge_rowset_meta(const RowsetMetaSharedPtr&
other) {
}
}
+std::string Rowset::get_rowset_info_str() {
+ std::string disk_size = PrettyPrinter::print(
+ static_cast<uint64_t>(_rowset_meta->total_disk_size()),
TUnit::BYTES);
+ return fmt::format("[{}-{}] {} {} {} {} {}", start_version(),
end_version(), num_segments(),
+ _rowset_meta->has_delete_predicate() ? "DELETE" :
"DATA",
+
SegmentsOverlapPB_Name(_rowset_meta->segments_overlap()),
+ rowset_id().to_string(), disk_size);
+}
+
} // namespace doris
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index a2a275f2eac..a1355a81198 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -302,6 +302,8 @@ public:
// set skip index compaction next time
void set_skip_index_compaction(int32_t column_id) {
skip_index_compaction.insert(column_id); }
+ std::string get_rowset_info_str();
+
protected:
friend class RowsetFactory;
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 66e6ffcf2b2..327afd6a8c4 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -833,7 +833,8 @@ Status
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
}
// acquire data sources correspond to history versions
-
RETURN_IF_ERROR(base_tablet->capture_rs_readers(versions_to_be_changed,
&rs_splits));
+ RETURN_IF_ERROR(
+
base_tablet->capture_rs_readers_unlocked(versions_to_be_changed, &rs_splits));
if (rs_splits.empty()) {
res = Status::Error<ALTER_DELTA_DOES_NOT_EXISTS>(
"fail to acquire all data sources. version_num={},
data_source_num={}",
@@ -985,8 +986,8 @@ Status SchemaChangeHandler::_get_versions_to_be_changed(
}
*max_rowset = rowset;
- RETURN_IF_ERROR(base_tablet->capture_consistent_versions(Version(0,
rowset->version().second),
-
versions_to_be_changed, false, false));
+ RETURN_IF_ERROR(base_tablet->capture_consistent_versions_unlocked(
+ Version(0, rowset->version().second), versions_to_be_changed,
false, false));
return Status::OK();
}
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 720cd5b2350..50ec2304096 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -757,7 +757,8 @@ void Tablet::delete_expired_stale_rowset() {
Version test_version = Version(0, lastest_delta->end_version());
stale_version_path_map[*path_id_iter] = version_path;
- Status status = capture_consistent_versions(test_version, nullptr,
false, false);
+ Status status =
+ capture_consistent_versions_unlocked(test_version,
nullptr, false, false);
// 1. When there is no consistent versions, we must reconstruct
the tracker.
if (!status.ok()) {
// 2. fetch missing version after delete
@@ -867,19 +868,9 @@ void Tablet::delete_expired_stale_rowset() {
#endif
}
-bool Tablet::_reconstruct_version_tracker_if_necessary() {
- double orphan_vertex_ratio =
_timestamped_version_tracker.get_orphan_vertex_ratio();
- if (orphan_vertex_ratio >=
config::tablet_version_graph_orphan_vertex_ratio) {
- _timestamped_version_tracker.construct_versioned_tracker(
- _tablet_meta->all_rs_metas(),
_tablet_meta->all_stale_rs_metas());
- return true;
- }
- return false;
-}
-
-Status Tablet::capture_consistent_versions(const Version& spec_version,
- std::vector<Version>* version_path,
- bool skip_missing_version, bool
quiet) const {
+Status Tablet::capture_consistent_versions_unlocked(const Version&
spec_version,
+ std::vector<Version>*
version_path,
+ bool skip_missing_version,
bool quiet) const {
Status status =
_timestamped_version_tracker.capture_consistent_versions(spec_version,
version_path);
if (!status.ok() && !quiet) {
@@ -914,10 +905,10 @@ Status Tablet::capture_consistent_versions(const Version&
spec_version,
Status Tablet::check_version_integrity(const Version& version, bool quiet) {
std::shared_lock rdlock(_meta_lock);
- return capture_consistent_versions(version, nullptr, false, quiet);
+ return capture_consistent_versions_unlocked(version, nullptr, false,
quiet);
}
-bool Tablet::exceed_version_limit(int32_t limit) const {
+bool Tablet::exceed_version_limit(int32_t limit) {
if (_tablet_meta->version_count() > limit) {
exceed_version_limit_counter << 1;
return true;
@@ -947,7 +938,8 @@ void Tablet::acquire_version_and_rowsets(
Status Tablet::capture_consistent_rowsets(const Version& spec_version,
std::vector<RowsetSharedPtr>*
rowsets) const {
std::vector<Version> version_path;
- RETURN_IF_ERROR(capture_consistent_versions(spec_version, &version_path,
false, false));
+ RETURN_IF_ERROR(
+ capture_consistent_versions_unlocked(spec_version, &version_path,
false, false));
RETURN_IF_ERROR(_capture_consistent_rowsets_unlocked(version_path,
rowsets));
return Status::OK();
}
@@ -984,39 +976,12 @@ Status Tablet::_capture_consistent_rowsets_unlocked(const
std::vector<Version>&
}
Status Tablet::capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
- bool skip_missing_version) const {
+ bool skip_missing_version) {
+ std::shared_lock rlock(_meta_lock);
std::vector<Version> version_path;
- RETURN_IF_ERROR(
- capture_consistent_versions(spec_version, &version_path,
skip_missing_version, false));
- RETURN_IF_ERROR(capture_rs_readers(version_path, rs_splits));
- return Status::OK();
-}
-
-Status Tablet::capture_rs_readers(const std::vector<Version>& version_path,
- std::vector<RowSetSplits>* rs_splits) const {
- DCHECK(rs_splits != nullptr && rs_splits->empty());
- for (auto version : version_path) {
- auto it = _rs_version_map.find(version);
- if (it == _rs_version_map.end()) {
- VLOG_NOTICE << "fail to find Rowset in rs_version for version.
tablet=" << tablet_id()
- << ", version='" << version.first << "-" <<
version.second;
-
- it = _stale_rs_version_map.find(version);
- if (it == _stale_rs_version_map.end()) {
- return Status::Error<CAPTURE_ROWSET_READER_ERROR>(
- "fail to find Rowset in stale_rs_version for version.
tablet={}, "
- "version={}-{}",
- tablet_id(), version.first, version.second);
- }
- }
- RowsetReaderSharedPtr rs_reader;
- auto res = it->second->create_reader(&rs_reader);
- if (!res.ok()) {
- return Status::Error<CAPTURE_ROWSET_READER_ERROR>(
- "failed to create reader for rowset:{}",
it->second->rowset_id().to_string());
- }
- rs_splits->push_back(RowSetSplits(std::move(rs_reader)));
- }
+ RETURN_IF_ERROR(capture_consistent_versions_unlocked(spec_version,
&version_path,
+ skip_missing_version,
false));
+ RETURN_IF_ERROR(capture_rs_readers_unlocked(version_path, rs_splits));
return Status::OK();
}
@@ -1419,16 +1384,6 @@ std::vector<RowsetSharedPtr>
Tablet::pick_candidate_rowsets_to_build_inverted_in
return candidate_rowsets;
}
-std::string Tablet::_get_rowset_info_str(RowsetSharedPtr rowset, bool
delete_flag) {
- const Version& ver = rowset->version();
- std::string disk_size = PrettyPrinter::print(
- static_cast<uint64_t>(rowset->rowset_meta()->total_disk_size()),
TUnit::BYTES);
- return strings::Substitute("[$0-$1] $2 $3 $4 $5 $6", ver.first, ver.second,
- rowset->num_segments(), (delete_flag ? "DELETE"
: "DATA"),
-
SegmentsOverlapPB_Name(rowset->rowset_meta()->segments_overlap()),
- rowset->rowset_id().to_string(), disk_size);
-}
-
// For http compaction action
void Tablet::get_compaction_status(std::string* json_result) {
rapidjson::Document root;
@@ -1523,17 +1478,16 @@ void Tablet::get_compaction_status(std::string*
json_result) {
versions_arr.SetArray();
missing_versions_arr.SetArray();
int64_t last_version = -1;
- for (int i = 0; i < rowsets.size(); ++i) {
- const Version& ver = rowsets[i]->version();
+ for (auto& rowset : rowsets) {
+ const Version& ver = rowset->version();
if (ver.first != last_version + 1) {
rapidjson::Value miss_value;
- miss_value.SetString(
- strings::Substitute("[$0-$1]", last_version + 1, ver.first
- 1).c_str(),
- missing_versions_arr.GetAllocator());
+ miss_value.SetString(fmt::format("[{}-{}]", last_version + 1,
ver.first - 1).c_str(),
+ missing_versions_arr.GetAllocator());
missing_versions_arr.PushBack(miss_value,
missing_versions_arr.GetAllocator());
}
rapidjson::Value value;
- std::string version_str = _get_rowset_info_str(rowsets[i],
delete_flags[i]);
+ std::string version_str = rowset->get_rowset_info_str();
value.SetString(version_str.c_str(), version_str.length(),
versions_arr.GetAllocator());
versions_arr.PushBack(value, versions_arr.GetAllocator());
last_version = ver.second;
@@ -1544,15 +1498,9 @@ void Tablet::get_compaction_status(std::string*
json_result) {
// print all stale rowsets' version as an array
rapidjson::Document stale_versions_arr;
stale_versions_arr.SetArray();
- for (int i = 0; i < stale_rowsets.size(); ++i) {
- const Version& ver = stale_rowsets[i]->version();
+ for (auto& rowset : stale_rowsets) {
rapidjson::Value value;
- std::string disk_size = PrettyPrinter::print(
-
static_cast<uint64_t>(stale_rowsets[i]->rowset_meta()->total_disk_size()),
- TUnit::BYTES);
- std::string version_str = strings::Substitute(
- "[$0-$1] $2 $3 $4", ver.first, ver.second,
stale_rowsets[i]->num_segments(),
- stale_rowsets[i]->rowset_id().to_string(), disk_size);
+ std::string version_str = rowset->get_rowset_info_str();
value.SetString(version_str.c_str(), version_str.length(),
stale_versions_arr.GetAllocator());
stale_versions_arr.PushBack(value, stale_versions_arr.GetAllocator());
@@ -3449,8 +3397,8 @@ Status Tablet::check_rowid_conversion(
Status Tablet::all_rs_id(int64_t max_version, RowsetIdUnorderedSet*
rowset_ids) const {
// Ensure that the obtained versions of rowsets are continuous
std::vector<Version> version_path;
- RETURN_IF_ERROR(
- capture_consistent_versions(Version(0, max_version),
&version_path, false, false));
+ RETURN_IF_ERROR(capture_consistent_versions_unlocked(Version(0,
max_version), &version_path,
+ false, false));
for (auto& ver : version_path) {
if (ver.second == 1) {
// [0-1] rowset is empty for each tablet, skip it
@@ -3719,8 +3667,7 @@ Status
Tablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, in
if (rowsets != nullptr) {
for (const auto& rowset : *rowsets) {
rapidjson::Value value;
- std::string version_str =
- _get_rowset_info_str(rowset,
rowset->rowset_meta()->has_delete_predicate());
+ std::string version_str = rowset->get_rowset_info_str();
value.SetString(version_str.c_str(), version_str.length(),
required_rowsets_arr.GetAllocator());
required_rowsets_arr.PushBack(value,
required_rowsets_arr.GetAllocator());
@@ -3733,8 +3680,7 @@ Status
Tablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, in
}
for (const auto& rowset : rowsets) {
rapidjson::Value value;
- std::string version_str =
- _get_rowset_info_str(rowset,
rowset->rowset_meta()->has_delete_predicate());
+ std::string version_str = rowset->get_rowset_info_str();
value.SetString(version_str.c_str(), version_str.length(),
required_rowsets_arr.GetAllocator());
required_rowsets_arr.PushBack(value,
required_rowsets_arr.GetAllocator());
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index d6ad0285233..d953d8fce4f 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -123,7 +123,7 @@ public:
size_t num_rows();
int version_count() const;
- bool exceed_version_limit(int32_t limit) const override;
+ bool exceed_version_limit(int32_t limit) override;
uint64_t segment_count() const;
Version max_version() const;
Version max_version_unlocked() const;
@@ -170,9 +170,9 @@ public:
// Given spec_version, find a continuous version path and store it in
version_path.
// If quiet is true, then only "does this path exist" is returned.
// If skip_missing_version is true, return ok even there are missing
versions.
- Status capture_consistent_versions(const Version& spec_version,
- std::vector<Version>* version_path,
- bool skip_missing_version, bool quiet)
const;
+ Status capture_consistent_versions_unlocked(const Version& spec_version,
+ std::vector<Version>*
version_path,
+ bool skip_missing_version,
bool quiet) const;
// if quiet is true, no error log will be printed if there are missing
versions
Status check_version_integrity(const Version& version, bool quiet = false);
bool check_version_exist(const Version& version) const;
@@ -183,10 +183,7 @@ public:
std::vector<RowsetSharedPtr>* rowsets)
const;
// If skip_missing_version is true, skip versions if they are missing.
Status capture_rs_readers(const Version& spec_version,
std::vector<RowSetSplits>* rs_splits,
- bool skip_missing_version) const override;
-
- Status capture_rs_readers(const std::vector<Version>& version_path,
- std::vector<RowSetSplits>* rs_splits) const;
+ bool skip_missing_version) override;
// meta lock
std::shared_mutex& get_header_lock() { return _meta_lock; }
@@ -583,9 +580,6 @@ private:
std::shared_ptr<CumulativeCompactionPolicy>
cumulative_compaction_policy);
uint32_t _calc_base_compaction_score() const;
- // When the proportion of empty edges in the adjacency matrix used to
represent the version graph
- // in the version tracker is greater than the threshold, rebuild the
version tracker
- bool _reconstruct_version_tracker_if_necessary();
void _init_context_common_fields(RowsetWriterContext& context);
void _rowset_ids_difference(const RowsetIdUnorderedSet& cur, const
RowsetIdUnorderedSet& pre,
@@ -607,7 +601,6 @@ private:
////////////////////////////////////////////////////////////////////////////
void _remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr
delete_bitmap);
- std::string _get_rowset_info_str(RowsetSharedPtr rowset, bool delete_flag);
public:
static const int64_t K_INVALID_CUMULATIVE_POINT = -1;
@@ -615,7 +608,6 @@ public:
private:
StorageEngine& _engine;
DataDir* _data_dir = nullptr;
- TimestampedVersionTracker _timestamped_version_tracker;
DorisCallOnce<Status> _init_once;
// meta store lock is used for prevent 2 threads do checkpoint concurrently
@@ -634,13 +626,6 @@ private:
// during publish_txn, which might take hundreds of milliseconds
mutable std::mutex _rowset_update_lock;
- // After version 0.13, all newly created rowsets are saved in
_rs_version_map.
- // And if rowset being compacted, the old rowsetis will be saved in
_stale_rs_version_map;
- std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>
_rs_version_map;
- // This variable _stale_rs_version_map is used to record these rowsets
which are be compacted.
- // These _stale rowsets are been removed when rowsets' pathVersion is
expired,
- // this policy is judged and computed by TimestampedVersionTracker.
- std::unordered_map<Version, RowsetSharedPtr, HashOfVersion>
_stale_rs_version_map;
// if this tablet is broken, set to true. default is false
std::atomic<bool> _is_bad;
// timestamp of last cumu compaction failure
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index c8a0387082c..f14622081d9 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -36,6 +36,7 @@
#include "olap/file_header.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
+#include "olap/rowset/rowset.h"
#include "olap/tablet_meta_manager.h"
#include "olap/utils.h"
#include "util/debug_points.h"
@@ -727,6 +728,12 @@ Status TabletMeta::add_rs_meta(const RowsetMetaSharedPtr&
rs_meta) {
return Status::OK();
}
+void TabletMeta::add_rowsets_unchecked(const std::vector<RowsetSharedPtr>&
to_add) {
+ for (const auto& rs : to_add) {
+ _rs_metas.push_back(rs->rowset_meta());
+ }
+}
+
void TabletMeta::delete_rs_meta_by_version(const Version& version,
std::vector<RowsetMetaSharedPtr>*
deleted_rs_metas) {
auto it = _rs_metas.begin();
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index db7912a452c..094bb21507d 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -202,6 +202,12 @@ public:
// used for after tablet cloned to clear stale rowset
void clear_stale_rowset() { _stale_rs_metas.clear(); }
+ void clear_rowsets() { _rs_metas.clear(); }
+
+ // MUST hold EXCLUSIVE `_meta_lock` in belonged Tablet
+ // `to_add` MUST NOT have overlapped version with `_rs_metas` in tablet
meta.
+ void add_rowsets_unchecked(const std::vector<RowsetSharedPtr>& to_add);
+
bool all_beta() const;
int64_t storage_policy_id() const { return _storage_policy_id; }
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index e1f39f2948b..f7fe7f813f2 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -528,16 +528,12 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
if (_shared_scan_opt) {
auto& read_source = tablets_read_source.emplace_back();
- {
- std::shared_lock rdlock(tablet->get_header_lock());
- auto st =
- tablet->capture_rs_readers({0, version},
&read_source.rs_splits, false);
- if (!st.ok()) {
- LOG(WARNING) << "fail to init reader.res=" << st;
- return Status::InternalError(
- "failed to initialize storage reader.
tablet_id={} : {}",
- tablet->tablet_id(), st.to_string());
- }
+ auto st = tablet->capture_rs_readers({0, version},
&read_source.rs_splits, false);
+ if (!st.ok()) {
+ LOG(WARNING) << "fail to init reader.res=" << st;
+ return Status::InternalError(
+ "failed to initialize storage reader. tablet_id={}
: {}",
+ tablet->tablet_id(), st.to_string());
}
if (!_state->skip_delete_predicate()) {
read_source.fill_delete_predicates();
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index bc15cf7207f..20938ecb8ba 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -182,15 +182,12 @@ Status NewOlapScanner::init() {
// to prevent this case: when there are lots of olap scanners to
run for example 10000
// the rowsets maybe compacted when the last olap scanner starts
ReadSource read_source;
- {
- std::shared_lock rdlock(tablet->get_header_lock());
- auto st =
tablet->capture_rs_readers(_tablet_reader_params.version,
- &read_source.rs_splits,
-
_state->skip_missing_version());
- if (!st.ok()) {
- LOG(WARNING) << "fail to init reader.res=" << st;
- return st;
- }
+ auto st = tablet->capture_rs_readers(_tablet_reader_params.version,
+ &read_source.rs_splits,
+
_state->skip_missing_version());
+ if (!st.ok()) {
+ LOG(WARNING) << "fail to init reader.res=" << st;
+ return st;
}
if (!_state->skip_delete_predicate()) {
read_source.fill_delete_predicates();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]