This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f4e2074a1d [feature](merge-cloud) Add CloudTablet (#30045)
4f4e2074a1d is described below

commit 4f4e2074a1d986246e686b23a68cdd092b3d1fac
Author: plat1ko <[email protected]>
AuthorDate: Thu Jan 18 10:44:04 2024 +0800

    [feature](merge-cloud) Add CloudTablet (#30045)
---
 be/src/cloud/cloud_meta_mgr.cpp             |  26 +-
 be/src/cloud/cloud_meta_mgr.h               |  11 +-
 be/src/cloud/cloud_storage_engine.h         |  39 +++
 be/src/cloud/cloud_tablet.cpp               | 441 ++++++++++++++++++++++++++++
 be/src/cloud/cloud_tablet.h                 | 124 ++++++++
 be/src/common/status.h                      |   3 +-
 be/src/olap/base_tablet.cpp                 |  40 +++
 be/src/olap/base_tablet.h                   |  19 +-
 be/src/olap/rowset/rowset.cpp               |   9 +
 be/src/olap/rowset/rowset.h                 |   2 +
 be/src/olap/schema_change.cpp               |   7 +-
 be/src/olap/tablet.cpp                      | 104 ++-----
 be/src/olap/tablet.h                        |  25 +-
 be/src/olap/tablet_meta.cpp                 |   7 +
 be/src/olap/tablet_meta.h                   |   6 +
 be/src/vec/exec/scan/new_olap_scan_node.cpp |  16 +-
 be/src/vec/exec/scan/new_olap_scanner.cpp   |  15 +-
 17 files changed, 752 insertions(+), 142 deletions(-)

diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp
index cbe6ab8ae24..d6eb54e5c41 100644
--- a/be/src/cloud/cloud_meta_mgr.cpp
+++ b/be/src/cloud/cloud_meta_mgr.cpp
@@ -29,6 +29,7 @@
 #include <type_traits>
 #include <vector>
 
+#include "cloud/cloud_tablet.h"
 #include "cloud/config.h"
 #include "cloud/pb_convert.h"
 #include "common/logging.h"
@@ -38,7 +39,6 @@
 #include "gen_cpp/olap_file.pb.h"
 #include "olap/olap_common.h"
 #include "olap/rowset/rowset_factory.h"
-#include "olap/tablet.h"
 #include "olap/tablet_meta.h"
 #include "runtime/stream_load/stream_load_context.h"
 #include "util/network_util.h"
@@ -270,12 +270,12 @@ Status CloudMetaMgr::get_tablet_meta(int64_t tablet_id, 
TabletMetaSharedPtr* tab
     return Status::OK();
 }
 
-Status CloudMetaMgr::sync_tablet_rowsets(Tablet* tablet, bool 
warmup_delta_data) {
+Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool 
warmup_delta_data) {
     return Status::NotSupported("CloudMetaMgr::sync_tablet_rowsets is not 
implemented");
 }
 
 Status CloudMetaMgr::sync_tablet_delete_bitmap(
-        Tablet* tablet, int64_t old_max_version,
+        CloudTablet* tablet, int64_t old_max_version,
         const google::protobuf::RepeatedPtrField<RowsetMetaPB>& rs_metas,
         const TabletStatsPB& stats, const TabletIndexPB& idx, DeleteBitmap* 
delete_bitmap) {
     return Status::NotSupported("CloudMetaMgr::sync_tablet_delete_bitmap is 
not implemented");
@@ -425,15 +425,15 @@ Status CloudMetaMgr::update_tablet_schema(int64_t 
tablet_id, const TabletSchema&
     return Status::OK();
 }
 
-Status CloudMetaMgr::update_delete_bitmap(const Tablet* tablet, int64_t 
lock_id, int64_t initiator,
-                                          DeleteBitmap* delete_bitmap) {
-    VLOG_DEBUG << "update_delete_bitmap , tablet_id: " << tablet->tablet_id();
+Status CloudMetaMgr::update_delete_bitmap(const CloudTablet& tablet, int64_t 
lock_id,
+                                          int64_t initiator, DeleteBitmap* 
delete_bitmap) {
+    VLOG_DEBUG << "update_delete_bitmap , tablet_id: " << tablet.tablet_id();
     UpdateDeleteBitmapRequest req;
     UpdateDeleteBitmapResponse res;
     req.set_cloud_unique_id(config::cloud_unique_id);
-    req.set_table_id(tablet->table_id());
-    req.set_partition_id(tablet->partition_id());
-    req.set_tablet_id(tablet->tablet_id());
+    req.set_table_id(tablet.table_id());
+    req.set_partition_id(tablet.partition_id());
+    req.set_tablet_id(tablet.tablet_id());
     req.set_lock_id(lock_id);
     req.set_initiator(initiator);
     for (auto iter = delete_bitmap->delete_bitmap.begin();
@@ -451,18 +451,18 @@ Status CloudMetaMgr::update_delete_bitmap(const Tablet* 
tablet, int64_t lock_id,
     if (res.status().code() == MetaServiceCode::LOCK_EXPIRED) {
         return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>(
                 "lock expired when update delete bitmap, tablet_id: {}, 
lock_id: {}",
-                tablet->tablet_id(), lock_id);
+                tablet.tablet_id(), lock_id);
     }
     return st;
 }
 
-Status CloudMetaMgr::get_delete_bitmap_update_lock(const Tablet* tablet, 
int64_t lock_id,
+Status CloudMetaMgr::get_delete_bitmap_update_lock(const CloudTablet& tablet, 
int64_t lock_id,
                                                    int64_t initiator) {
-    VLOG_DEBUG << "get_delete_bitmap_update_lock , tablet_id: " << 
tablet->tablet_id();
+    VLOG_DEBUG << "get_delete_bitmap_update_lock , tablet_id: " << 
tablet.tablet_id();
     GetDeleteBitmapUpdateLockRequest req;
     GetDeleteBitmapUpdateLockResponse res;
     req.set_cloud_unique_id(config::cloud_unique_id);
-    req.set_table_id(tablet->table_id());
+    req.set_table_id(tablet.table_id());
     req.set_lock_id(lock_id);
     req.set_initiator(initiator);
     req.set_expiration(10); // 10s expiration time for compaction and 
schema_change
diff --git a/be/src/cloud/cloud_meta_mgr.h b/be/src/cloud/cloud_meta_mgr.h
index 6557a6eab8a..af5b048b2f0 100644
--- a/be/src/cloud/cloud_meta_mgr.h
+++ b/be/src/cloud/cloud_meta_mgr.h
@@ -29,7 +29,7 @@ namespace doris {
 
 class DeleteBitmap;
 class StreamLoadContext;
-class Tablet;
+class CloudTablet;
 class TabletMeta;
 class TabletSchema;
 class RowsetMeta;
@@ -51,7 +51,7 @@ public:
 
     Status get_tablet_meta(int64_t tablet_id, std::shared_ptr<TabletMeta>* 
tablet_meta);
 
-    Status sync_tablet_rowsets(Tablet* tablet, bool warmup_delta_data = false);
+    Status sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_data = 
false);
 
     Status prepare_rowset(const RowsetMeta& rs_meta, bool is_tmp,
                           std::shared_ptr<RowsetMeta>* existed_rs_meta = 
nullptr);
@@ -79,14 +79,15 @@ public:
 
     Status update_tablet_schema(int64_t tablet_id, const TabletSchema& 
tablet_schema);
 
-    Status update_delete_bitmap(const Tablet* tablet, int64_t lock_id, int64_t 
initiator,
+    Status update_delete_bitmap(const CloudTablet& tablet, int64_t lock_id, 
int64_t initiator,
                                 DeleteBitmap* delete_bitmap);
 
-    Status get_delete_bitmap_update_lock(const Tablet* tablet, int64_t 
lock_id, int64_t initiator);
+    Status get_delete_bitmap_update_lock(const CloudTablet& tablet, int64_t 
lock_id,
+                                         int64_t initiator);
 
 private:
     Status sync_tablet_delete_bitmap(
-            Tablet* tablet, int64_t old_max_version,
+            CloudTablet* tablet, int64_t old_max_version,
             const google::protobuf::RepeatedPtrField<RowsetMetaPB>& rs_metas,
             const TabletStatsPB& stas, const TabletIndexPB& idx, DeleteBitmap* 
delete_bitmap);
 };
diff --git a/be/src/cloud/cloud_storage_engine.h 
b/be/src/cloud/cloud_storage_engine.h
new file mode 100644
index 00000000000..87e3ed52d39
--- /dev/null
+++ b/be/src/cloud/cloud_storage_engine.h
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+
+namespace doris {
+namespace cloud {
+class CloudMetaMgr;
+}
+
+class CloudStorageEngine {
+public:
+    CloudStorageEngine();
+
+    ~CloudStorageEngine();
+
+    cloud::CloudMetaMgr& meta_mgr() { return *_meta_mgr; }
+
+private:
+    std::unique_ptr<cloud::CloudMetaMgr> _meta_mgr;
+};
+
+} // namespace doris
diff --git a/be/src/cloud/cloud_tablet.cpp b/be/src/cloud/cloud_tablet.cpp
new file mode 100644
index 00000000000..03670df78d1
--- /dev/null
+++ b/be/src/cloud/cloud_tablet.cpp
@@ -0,0 +1,441 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "cloud/cloud_tablet.h"
+
+#include <rapidjson/document.h>
+#include <rapidjson/encodings.h>
+#include <rapidjson/prettywriter.h>
+#include <rapidjson/rapidjson.h>
+#include <rapidjson/stringbuffer.h>
+
+#include "cloud/cloud_meta_mgr.h"
+#include "cloud/cloud_storage_engine.h"
+#include "io/cache/block/block_file_cache_factory.h"
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_writer.h"
+#include "olap/rowset/segment_v2/inverted_index_desc.h"
+
+namespace doris {
+using namespace ErrorCode;
+
+CloudTablet::CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr 
tablet_meta)
+        : BaseTablet(std::move(tablet_meta)), _engine(engine) {}
+
+CloudTablet::~CloudTablet() = default;
+
+bool CloudTablet::exceed_version_limit(int32_t limit) {
+    return _approximate_num_rowsets.load(std::memory_order_relaxed) > limit;
+}
+
+Status CloudTablet::capture_rs_readers(const Version& spec_version,
+                                       std::vector<RowSetSplits>* rs_splits,
+                                       bool skip_missing_version) {
+    Versions version_path;
+    std::shared_lock rlock(_meta_lock);
+    auto st = 
_timestamped_version_tracker.capture_consistent_versions(spec_version, 
&version_path);
+    if (!st.ok()) {
+        rlock.unlock(); // avoid logging in lock range
+        // Check no missed versions or req version is merged
+        auto missed_versions = calc_missed_versions(spec_version.second);
+        if (missed_versions.empty()) {
+            st.set_code(VERSION_ALREADY_MERGED); // Reset error code
+        }
+        st.append(" tablet_id=" + std::to_string(tablet_id()));
+        // clang-format off
+        LOG(WARNING) << st << '\n' << [this]() { std::string json; 
get_compaction_status(&json); return json; }();
+        // clang-format on
+        return st;
+    }
+    VLOG_DEBUG << "capture consitent versions: " << version_path;
+    return capture_rs_readers_unlocked(version_path, rs_splits);
+}
+
+// for example:
+//     [0-4][5-5][8-8][9-9][13-13]
+// if spec_version = 12, it will return [6-7],[10-12]
+Versions CloudTablet::calc_missed_versions(int64_t spec_version) {
+    DCHECK(spec_version > 0) << "invalid spec_version: " << spec_version;
+
+    Versions missed_versions;
+    Versions existing_versions;
+    {
+        std::shared_lock rdlock(_meta_lock);
+        for (const auto& rs : _tablet_meta->all_rs_metas()) {
+            existing_versions.emplace_back(rs->version());
+        }
+    }
+
+    // sort the existing versions in ascending order
+    std::sort(existing_versions.begin(), existing_versions.end(),
+              [](const Version& a, const Version& b) {
+                  // simple because 2 versions are certainly not overlapping
+                  return a.first < b.first;
+              });
+
+    auto min_version = existing_versions.front().first;
+    if (min_version > 0) {
+        missed_versions.emplace_back(0, std::min(spec_version, min_version - 
1));
+    }
+    for (auto it = existing_versions.begin(); it != existing_versions.end() - 
1; ++it) {
+        auto prev_v = it->second;
+        if (prev_v >= spec_version) {
+            return missed_versions;
+        }
+        auto next_v = (it + 1)->first;
+        if (next_v > prev_v + 1) {
+            // there is a hole between versions
+            missed_versions.emplace_back(prev_v + 1, std::min(spec_version, 
next_v - 1));
+        }
+    }
+    auto max_version = existing_versions.back().second;
+    if (max_version < spec_version) {
+        missed_versions.emplace_back(max_version + 1, spec_version);
+    }
+    return missed_versions;
+}
+
+Status CloudTablet::sync_meta() {
+    // TODO(lightman): FileCache
+    return Status::NotSupported("CloudTablet::sync_meta is not implemented");
+}
+
+// There are only two tablet_states RUNNING and NOT_READY in cloud mode
+// This function will erase the tablet from `CloudTabletMgr` when it can't 
find this tablet in MS.
+Status CloudTablet::sync_rowsets(int64_t query_version, bool 
warmup_delta_data) {
+    RETURN_IF_ERROR(sync_if_not_running());
+
+    if (query_version > 0) {
+        std::shared_lock rlock(_meta_lock);
+        if (_max_version >= query_version) {
+            return Status::OK();
+        }
+    }
+
+    // serially execute sync to reduce unnecessary network overhead
+    std::lock_guard lock(_sync_meta_lock);
+    if (query_version > 0) {
+        std::shared_lock rlock(_meta_lock);
+        if (_max_version >= query_version) {
+            return Status::OK();
+        }
+    }
+
+    auto st = _engine.meta_mgr().sync_tablet_rowsets(this, warmup_delta_data);
+    if (st.is<ErrorCode::NOT_FOUND>()) {
+        recycle_cached_data();
+    }
+    return st;
+}
+
+// Sync tablet meta and all rowset meta if not running.
+// This could happen when BE didn't finish schema change job and another BE 
committed this schema change job.
+// It should be a quite rare situation.
+Status CloudTablet::sync_if_not_running() {
+    if (tablet_state() == TABLET_RUNNING) {
+        return Status::OK();
+    }
+
+    // Serially execute sync to reduce unnecessary network overhead
+    std::lock_guard lock(_sync_meta_lock);
+
+    {
+        std::shared_lock rlock(_meta_lock);
+        if (tablet_state() == TABLET_RUNNING) {
+            return Status::OK();
+        }
+    }
+
+    TabletMetaSharedPtr tablet_meta;
+    auto st = _engine.meta_mgr().get_tablet_meta(tablet_id(), &tablet_meta);
+    if (!st.ok()) {
+        if (st.is<ErrorCode::NOT_FOUND>()) {
+            recycle_cached_data();
+        }
+        return st;
+    }
+
+    if (tablet_meta->tablet_state() != TABLET_RUNNING) [[unlikely]] {
+        // MoW may go to here when load while schema change
+        return Status::Error<INVALID_TABLET_STATE>("invalid tablet state {}. 
tablet_id={}",
+                                                   
tablet_meta->tablet_state(), tablet_id());
+    }
+
+    TimestampedVersionTracker empty_tracker;
+    {
+        std::lock_guard wlock(_meta_lock);
+        RETURN_IF_ERROR(set_tablet_state(TABLET_RUNNING));
+        _rs_version_map.clear();
+        _stale_rs_version_map.clear();
+        std::swap(_timestamped_version_tracker, empty_tracker);
+        _tablet_meta->clear_rowsets();
+        _tablet_meta->clear_stale_rowset();
+        _max_version = -1;
+    }
+
+    st = _engine.meta_mgr().sync_tablet_rowsets(this);
+    if (st.is<ErrorCode::NOT_FOUND>()) {
+        recycle_cached_data();
+    }
+    return st;
+}
+
+void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool 
version_overlap,
+                              std::unique_lock<std::shared_mutex>& meta_lock,
+                              bool warmup_delta_data) {
+    if (to_add.empty()) {
+        return;
+    }
+
+    auto add_rowsets_directly = [=, this](std::vector<RowsetSharedPtr>& 
rowsets) {
+        for (auto& rs : rowsets) {
+            _rs_version_map.emplace(rs->version(), rs);
+            _timestamped_version_tracker.add_version(rs->version());
+            _max_version = std::max(rs->end_version(), _max_version);
+            update_base_size(*rs);
+        }
+        _tablet_meta->add_rowsets_unchecked(rowsets);
+        // TODO(plat1ko): Warmup delta rowset data in background
+    };
+
+    if (!version_overlap) {
+        add_rowsets_directly(to_add);
+        return;
+    }
+
+    // Filter out existed rowsets
+    auto remove_it =
+            std::remove_if(to_add.begin(), to_add.end(), [this](const 
RowsetSharedPtr& rs) {
+                if (auto find_it = _rs_version_map.find(rs->version());
+                    find_it == _rs_version_map.end()) {
+                    return false;
+                } else if (find_it->second->rowset_id() == rs->rowset_id()) {
+                    return true; // Same rowset
+                }
+
+                // If version of rowset in `to_add` is equal to rowset in 
tablet but rowset_id is not equal,
+                // replace existed rowset with `to_add` rowset. This may occur 
when:
+                //  1. schema change converts rowsets which have been double 
written to new tablet
+                //  2. cumu compaction picks single overlapping input rowset 
to perform compaction
+                _tablet_meta->delete_rs_meta_by_version(rs->version(), 
nullptr);
+                _rs_version_map[rs->version()] = rs;
+                _tablet_meta->add_rowsets_unchecked({rs});
+                update_base_size(*rs);
+                return true;
+            });
+
+    to_add.erase(remove_it, to_add.end());
+
+    // delete rowsets with overlapped version
+    std::vector<RowsetSharedPtr> to_add_directly;
+    for (auto& to_add_rs : to_add) {
+        // delete rowsets with overlapped version
+        std::vector<RowsetSharedPtr> to_delete;
+        Version to_add_v = to_add_rs->version();
+        // if start_version  > max_version, we can skip checking overlap here.
+        if (to_add_v.first > _max_version) {
+            // if start_version  > max_version, we can skip checking overlap 
here.
+            to_add_directly.push_back(to_add_rs);
+        } else {
+            to_add_directly.push_back(to_add_rs);
+            for (auto& [v, rs] : _rs_version_map) {
+                if (to_add_v.contains(v)) {
+                    to_delete.push_back(rs);
+                }
+            }
+            delete_rowsets(to_delete, meta_lock);
+        }
+    }
+
+    add_rowsets_directly(to_add_directly);
+}
+
+void CloudTablet::delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,
+                                 std::unique_lock<std::shared_mutex>&) {
+    if (to_delete.empty()) {
+        return;
+    }
+    std::vector<RowsetMetaSharedPtr> rs_metas;
+    rs_metas.reserve(to_delete.size());
+    for (auto&& rs : to_delete) {
+        rs_metas.push_back(rs->rowset_meta());
+        _stale_rs_version_map[rs->version()] = rs;
+    }
+    _timestamped_version_tracker.add_stale_path_version(rs_metas);
+    for (auto&& rs : to_delete) {
+        _rs_version_map.erase(rs->version());
+    }
+
+    _tablet_meta->modify_rs_metas({}, rs_metas, false);
+}
+
+int CloudTablet::delete_expired_stale_rowsets() {
+    std::vector<RowsetSharedPtr> expired_rowsets;
+    int64_t expired_stale_sweep_endtime =
+            ::time(nullptr) - config::tablet_rowset_stale_sweep_time_sec;
+    {
+        std::unique_lock wlock(_meta_lock);
+
+        std::vector<int64_t> path_ids;
+        // capture the path version to delete
+        
_timestamped_version_tracker.capture_expired_paths(expired_stale_sweep_endtime, 
&path_ids);
+
+        if (path_ids.empty()) {
+            return 0;
+        }
+
+        for (int64_t path_id : path_ids) {
+            // delete stale versions in version graph
+            auto version_path = 
_timestamped_version_tracker.fetch_and_delete_path_by_id(path_id);
+            for (auto& v_ts : version_path->timestamped_versions()) {
+                auto rs_it = _stale_rs_version_map.find(v_ts->version());
+                if (rs_it != _stale_rs_version_map.end()) {
+                    expired_rowsets.push_back(rs_it->second);
+                    _stale_rs_version_map.erase(rs_it);
+                } else {
+                    LOG(WARNING) << "cannot find stale rowset " << 
v_ts->version() << " in tablet "
+                                 << tablet_id();
+                    // clang-format off
+                    DCHECK(false) << [this, &wlock]() { wlock.unlock(); 
std::string json; get_compaction_status(&json); return json; }();
+                    // clang-format on
+                }
+                _tablet_meta->delete_stale_rs_meta_by_version(v_ts->version());
+                VLOG_DEBUG << "delete stale rowset " << v_ts->version();
+            }
+        }
+        _reconstruct_version_tracker_if_necessary();
+    }
+    recycle_cached_data(expired_rowsets);
+    return expired_rowsets.size();
+}
+
+void CloudTablet::update_base_size(const Rowset& rs) {
+    // Define base rowset as the rowset of version [2-x]
+    if (rs.start_version() == 2) {
+        _base_size = rs.data_disk_size();
+    }
+}
+
+void CloudTablet::recycle_cached_data() {
+    // TODO(plat1ko)
+}
+
+void CloudTablet::recycle_cached_data(const std::vector<RowsetSharedPtr>& 
rowsets) {
+    // TODO(plat1ko)
+}
+
+void CloudTablet::reset_approximate_stats(int64_t num_rowsets, int64_t 
num_segments,
+                                          int64_t num_rows, int64_t data_size) 
{
+    _approximate_num_rowsets.store(num_rowsets, std::memory_order_relaxed);
+    _approximate_num_segments.store(num_segments, std::memory_order_relaxed);
+    _approximate_num_rows.store(num_rows, std::memory_order_relaxed);
+    _approximate_data_size.store(data_size, std::memory_order_relaxed);
+    int64_t cumu_num_deltas = 0;
+    int64_t cumu_num_rowsets = 0;
+    auto cp = _cumulative_point.load(std::memory_order_relaxed);
+    for (auto& [v, r] : _rs_version_map) {
+        if (v.second < cp) {
+            continue;
+        }
+
+        cumu_num_deltas += r->is_segments_overlapping() ? r->num_segments() : 
1;
+        ++cumu_num_rowsets;
+    }
+    _approximate_cumu_num_rowsets.store(cumu_num_rowsets, 
std::memory_order_relaxed);
+    _approximate_cumu_num_deltas.store(cumu_num_deltas, 
std::memory_order_relaxed);
+}
+
+Result<std::unique_ptr<RowsetWriter>> CloudTablet::create_rowset_writer(
+        RowsetWriterContext& context, bool vertical) {
+    return ResultError(
+            Status::NotSupported("CloudTablet::create_rowset_writer is not 
implemented"));
+}
+
+// return a json string to show the compaction status of this tablet
+void CloudTablet::get_compaction_status(std::string* json_result) {
+    rapidjson::Document root;
+    root.SetObject();
+
+    rapidjson::Document path_arr;
+    path_arr.SetArray();
+
+    std::vector<RowsetSharedPtr> rowsets;
+    std::vector<RowsetSharedPtr> stale_rowsets;
+    {
+        std::shared_lock rdlock(_meta_lock);
+        rowsets.reserve(_rs_version_map.size());
+        for (auto& it : _rs_version_map) {
+            rowsets.push_back(it.second);
+        }
+        stale_rowsets.reserve(_stale_rs_version_map.size());
+        for (auto& it : _stale_rs_version_map) {
+            stale_rowsets.push_back(it.second);
+        }
+    }
+    std::sort(rowsets.begin(), rowsets.end(), Rowset::comparator);
+    std::sort(stale_rowsets.begin(), stale_rowsets.end(), Rowset::comparator);
+
+    // get snapshot version path json_doc
+    _timestamped_version_tracker.get_stale_version_path_json_doc(path_arr);
+    root.AddMember("cumulative point", _cumulative_point.load(), 
root.GetAllocator());
+
+    // print all rowsets' version as an array
+    rapidjson::Document versions_arr;
+    rapidjson::Document missing_versions_arr;
+    versions_arr.SetArray();
+    missing_versions_arr.SetArray();
+    int64_t last_version = -1;
+    for (auto& rowset : rowsets) {
+        const Version& ver = rowset->version();
+        if (ver.first != last_version + 1) {
+            rapidjson::Value miss_value;
+            miss_value.SetString(fmt::format("[{}-{}]", last_version + 1, 
ver.first - 1).c_str(),
+                                 missing_versions_arr.GetAllocator());
+            missing_versions_arr.PushBack(miss_value, 
missing_versions_arr.GetAllocator());
+        }
+        rapidjson::Value value;
+        std::string version_str = rowset->get_rowset_info_str();
+        value.SetString(version_str.c_str(), version_str.length(), 
versions_arr.GetAllocator());
+        versions_arr.PushBack(value, versions_arr.GetAllocator());
+        last_version = ver.second;
+    }
+    root.AddMember("rowsets", versions_arr, root.GetAllocator());
+    root.AddMember("missing_rowsets", missing_versions_arr, 
root.GetAllocator());
+
+    // print all stale rowsets' version as an array
+    rapidjson::Document stale_versions_arr;
+    stale_versions_arr.SetArray();
+    for (auto& rowset : stale_rowsets) {
+        rapidjson::Value value;
+        std::string version_str = rowset->get_rowset_info_str();
+        value.SetString(version_str.c_str(), version_str.length(),
+                        stale_versions_arr.GetAllocator());
+        stale_versions_arr.PushBack(value, stale_versions_arr.GetAllocator());
+    }
+    root.AddMember("stale_rowsets", stale_versions_arr, root.GetAllocator());
+
+    // add stale version rowsets
+    root.AddMember("stale version path", path_arr, root.GetAllocator());
+
+    // to json string
+    rapidjson::StringBuffer strbuf;
+    rapidjson::PrettyWriter<rapidjson::StringBuffer> writer(strbuf);
+    root.Accept(writer);
+    *json_result = std::string(strbuf.GetString());
+}
+
+} // namespace doris
diff --git a/be/src/cloud/cloud_tablet.h b/be/src/cloud/cloud_tablet.h
new file mode 100644
index 00000000000..537c8fe134d
--- /dev/null
+++ b/be/src/cloud/cloud_tablet.h
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <atomic>
+
+#include "olap/base_tablet.h"
+#include "olap/version_graph.h"
+
+namespace doris {
+
+class CloudStorageEngine;
+
+class CloudTablet final : public BaseTablet {
+public:
+    CloudTablet(CloudStorageEngine& engine, TabletMetaSharedPtr tablet_meta);
+
+    ~CloudTablet() override;
+
+    bool exceed_version_limit(int32_t limit) override;
+
+    Result<std::unique_ptr<RowsetWriter>> 
create_rowset_writer(RowsetWriterContext& context,
+                                                               bool vertical) 
override;
+
+    Status capture_rs_readers(const Version& spec_version, 
std::vector<RowSetSplits>* rs_splits,
+                              bool skip_missing_version) override;
+
+    size_t tablet_footprint() override {
+        return _approximate_data_size.load(std::memory_order_relaxed);
+    }
+
+    // clang-format off
+    int64_t fetch_add_approximate_num_rowsets (int64_t x) { return 
_approximate_num_rowsets .fetch_add(x, std::memory_order_relaxed); }
+    int64_t fetch_add_approximate_num_segments(int64_t x) { return 
_approximate_num_segments.fetch_add(x, std::memory_order_relaxed); }
+    int64_t fetch_add_approximate_num_rows    (int64_t x) { return 
_approximate_num_rows    .fetch_add(x, std::memory_order_relaxed); }
+    int64_t fetch_add_approximate_data_size   (int64_t x) { return 
_approximate_data_size   .fetch_add(x, std::memory_order_relaxed); }
+    int64_t fetch_add_approximate_cumu_num_rowsets (int64_t x) { return 
_approximate_cumu_num_rowsets.fetch_add(x, std::memory_order_relaxed); }
+    int64_t fetch_add_approximate_cumu_num_deltas   (int64_t x) { return 
_approximate_cumu_num_deltas.fetch_add(x, std::memory_order_relaxed); }
+    // clang-format on
+
+    // meta lock must be held when calling this function
+    void reset_approximate_stats(int64_t num_rowsets, int64_t num_segments, 
int64_t num_rows,
+                                 int64_t data_size);
+
+    // return a json string to show the compaction status of this tablet
+    void get_compaction_status(std::string* json_result);
+
+    // Synchronize the rowsets from meta service.
+    // If tablet state is not `TABLET_RUNNING`, sync tablet meta and all 
visible rowsets.
+    // If `query_version` > 0 and local max_version of the tablet >= 
`query_version`, do nothing.
+    // If 'need_download_data_async' is true, it means that we need to 
download the new version
+    // rowsets datas async.
+    Status sync_rowsets(int64_t query_version = -1, bool warmup_delta_data = 
false);
+
+    // Synchronize the tablet meta from meta service.
+    Status sync_meta();
+
+    // If `version_overlap` is true, function will delete rowsets with 
overlapped version in this tablet.
+    // If 'warmup_delta_data' is true, download the new version rowset data in 
background.
+    // MUST hold EXCLUSIVE `_meta_lock`.
+    // If 'need_download_data_async' is true, it means that we need to 
download the new version
+    // rowsets datas async.
+    void add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_overlap,
+                     std::unique_lock<std::shared_mutex>& meta_lock,
+                     bool warmup_delta_data = false);
+
+    // MUST hold EXCLUSIVE `_meta_lock`.
+    void delete_rowsets(const std::vector<RowsetSharedPtr>& to_delete,
+                        std::unique_lock<std::shared_mutex>& meta_lock);
+
+    // When the tablet is dropped, we need to recycle cached data:
+    // 1. The data in file cache
+    // 2. The memory in tablet cache
+    void recycle_cached_data();
+
+    void recycle_cached_data(const std::vector<RowsetSharedPtr>& rowsets);
+
+    // Return number of deleted stale rowsets
+    int delete_expired_stale_rowsets();
+
+private:
+    Versions calc_missed_versions(int64_t spec_version);
+
+    // FIXME(plat1ko): No need to record base size if rowsets are ordered by 
version
+    void update_base_size(const Rowset& rs);
+
+    Status sync_if_not_running();
+
+    CloudStorageEngine& _engine;
+
+    // this mutex MUST ONLY be used when sync meta
+    bthread::Mutex _sync_meta_lock;
+
+    std::atomic<int64_t> _cumulative_point {-1};
+    std::atomic<int64_t> _approximate_num_rowsets {-1};
+    std::atomic<int64_t> _approximate_num_segments {-1};
+    std::atomic<int64_t> _approximate_num_rows {-1};
+    std::atomic<int64_t> _approximate_data_size {-1};
+    std::atomic<int64_t> _approximate_cumu_num_rowsets {-1};
+    // Number of sorted arrays (e.g. for rowset with N segments, if rowset is 
overlapping, delta is N, otherwise 1) after cumu point
+    std::atomic<int64_t> _approximate_cumu_num_deltas {-1};
+
+    [[maybe_unused]] int64_t _base_compaction_cnt = 0;
+    [[maybe_unused]] int64_t _cumulative_compaction_cnt = 0;
+    int64_t _max_version = -1;
+    int64_t _base_size = 0;
+};
+
+} // namespace doris
diff --git a/be/src/common/status.h b/be/src/common/status.h
index 5199e86d120..2bec1c397e8 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -274,7 +274,8 @@ namespace ErrorCode {
     E(INVERTED_INDEX_COMPACTION_ERROR, -6010, false);        \
     E(KEY_NOT_FOUND, -7000, false);                          \
     E(KEY_ALREADY_EXISTS, -7001, false);                     \
-    E(ENTRY_NOT_FOUND, -7002, false);
+    E(ENTRY_NOT_FOUND, -7002, false);                        \
+    E(INVALID_TABLET_STATE, -7211, false);
 
 // Define constexpr int error_code_name = error_code_value
 #define M(NAME, ERRORCODE, ENABLESTACKTRACE) constexpr int NAME = ERRORCODE;
diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp
index 43f514906cf..18445cb17a6 100644
--- a/be/src/olap/base_tablet.cpp
+++ b/be/src/olap/base_tablet.cpp
@@ -19,6 +19,8 @@
 
 #include <fmt/format.h>
 
+#include "olap/rowset/rowset.h"
+#include "olap/rowset/rowset_reader.h"
 #include "olap/tablet_fwd.h"
 #include "olap/tablet_schema_cache.h"
 #include "util/doris_metrics.h"
@@ -79,4 +81,42 @@ Status BaseTablet::update_by_least_common_schema(const 
TabletSchemaSPtr& update_
     return Status::OK();
 }
 
+Status BaseTablet::capture_rs_readers_unlocked(const std::vector<Version>& 
version_path,
+                                               std::vector<RowSetSplits>* 
rs_splits) const {
+    DCHECK(rs_splits != nullptr && rs_splits->empty());
+    for (auto version : version_path) {
+        auto it = _rs_version_map.find(version);
+        if (it == _rs_version_map.end()) {
+            VLOG_NOTICE << "fail to find Rowset in rs_version for version. 
tablet=" << tablet_id()
+                        << ", version='" << version.first << "-" << 
version.second;
+
+            it = _stale_rs_version_map.find(version);
+            if (it == _stale_rs_version_map.end()) {
+                return Status::Error<CAPTURE_ROWSET_READER_ERROR>(
+                        "fail to find Rowset in stale_rs_version for version. 
tablet={}, "
+                        "version={}-{}",
+                        tablet_id(), version.first, version.second);
+            }
+        }
+        RowsetReaderSharedPtr rs_reader;
+        auto res = it->second->create_reader(&rs_reader);
+        if (!res.ok()) {
+            return Status::Error<CAPTURE_ROWSET_READER_ERROR>(
+                    "failed to create reader for rowset:{}", 
it->second->rowset_id().to_string());
+        }
+        rs_splits->push_back(RowSetSplits(std::move(rs_reader)));
+    }
+    return Status::OK();
+}
+
+bool BaseTablet::_reconstruct_version_tracker_if_necessary() {
+    double orphan_vertex_ratio = 
_timestamped_version_tracker.get_orphan_vertex_ratio();
+    if (orphan_vertex_ratio >= 
config::tablet_version_graph_orphan_vertex_ratio) {
+        _timestamped_version_tracker.construct_versioned_tracker(
+                _tablet_meta->all_rs_metas(), 
_tablet_meta->all_stale_rs_metas());
+        return true;
+    }
+    return false;
+}
+
 } /* namespace doris */
diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h
index 2fa494b420a..bb327b39532 100644
--- a/be/src/olap/base_tablet.h
+++ b/be/src/olap/base_tablet.h
@@ -24,6 +24,7 @@
 #include "common/status.h"
 #include "olap/tablet_fwd.h"
 #include "olap/tablet_meta.h"
+#include "olap/version_graph.h"
 #include "util/metrics.h"
 
 namespace doris {
@@ -72,19 +73,33 @@ public:
         return _max_version_schema;
     }
 
-    virtual bool exceed_version_limit(int32_t limit) const = 0;
+    virtual bool exceed_version_limit(int32_t limit) = 0;
 
     virtual Result<std::unique_ptr<RowsetWriter>> 
create_rowset_writer(RowsetWriterContext& context,
                                                                        bool 
vertical) = 0;
 
     virtual Status capture_rs_readers(const Version& spec_version,
                                       std::vector<RowSetSplits>* rs_splits,
-                                      bool skip_missing_version) const = 0;
+                                      bool skip_missing_version) = 0;
 
     virtual size_t tablet_footprint() = 0;
 
+    // MUST hold shared meta lock
+    Status capture_rs_readers_unlocked(const std::vector<Version>& 
version_path,
+                                       std::vector<RowSetSplits>* rs_splits) 
const;
+
 protected:
+    bool _reconstruct_version_tracker_if_necessary();
+
     mutable std::shared_mutex _meta_lock;
+    TimestampedVersionTracker _timestamped_version_tracker;
+    // After version 0.13, all newly created rowsets are saved in 
_rs_version_map.
+    // And if rowset being compacted, the old rowsetis will be saved in 
_stale_rs_version_map;
+    std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> 
_rs_version_map;
+    // This variable _stale_rs_version_map is used to record these rowsets 
which are be compacted.
+    // These _stale rowsets are been removed when rowsets' pathVersion is 
expired,
+    // this policy is judged and computed by TimestampedVersionTracker.
+    std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> 
_stale_rs_version_map;
     const TabletMetaSharedPtr _tablet_meta;
     TabletSchemaSPtr _max_version_schema;
 
diff --git a/be/src/olap/rowset/rowset.cpp b/be/src/olap/rowset/rowset.cpp
index b3e6b1ca9e0..d09fce730e5 100644
--- a/be/src/olap/rowset/rowset.cpp
+++ b/be/src/olap/rowset/rowset.cpp
@@ -92,4 +92,13 @@ void Rowset::merge_rowset_meta(const RowsetMetaSharedPtr& 
other) {
     }
 }
 
+std::string Rowset::get_rowset_info_str() {
+    std::string disk_size = PrettyPrinter::print(
+            static_cast<uint64_t>(_rowset_meta->total_disk_size()), 
TUnit::BYTES);
+    return fmt::format("[{}-{}] {} {} {} {} {}", start_version(), 
end_version(), num_segments(),
+                       _rowset_meta->has_delete_predicate() ? "DELETE" : 
"DATA",
+                       
SegmentsOverlapPB_Name(_rowset_meta->segments_overlap()),
+                       rowset_id().to_string(), disk_size);
+}
+
 } // namespace doris
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index a2a275f2eac..a1355a81198 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -302,6 +302,8 @@ public:
     // set skip index compaction next time
     void set_skip_index_compaction(int32_t column_id) { 
skip_index_compaction.insert(column_id); }
 
+    std::string get_rowset_info_str();
+
 protected:
     friend class RowsetFactory;
 
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 66e6ffcf2b2..327afd6a8c4 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -833,7 +833,8 @@ Status 
SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
             }
 
             // acquire data sources correspond to history versions
-            
RETURN_IF_ERROR(base_tablet->capture_rs_readers(versions_to_be_changed, 
&rs_splits));
+            RETURN_IF_ERROR(
+                    
base_tablet->capture_rs_readers_unlocked(versions_to_be_changed, &rs_splits));
             if (rs_splits.empty()) {
                 res = Status::Error<ALTER_DELTA_DOES_NOT_EXISTS>(
                         "fail to acquire all data sources. version_num={}, 
data_source_num={}",
@@ -985,8 +986,8 @@ Status SchemaChangeHandler::_get_versions_to_be_changed(
     }
     *max_rowset = rowset;
 
-    RETURN_IF_ERROR(base_tablet->capture_consistent_versions(Version(0, 
rowset->version().second),
-                                                             
versions_to_be_changed, false, false));
+    RETURN_IF_ERROR(base_tablet->capture_consistent_versions_unlocked(
+            Version(0, rowset->version().second), versions_to_be_changed, 
false, false));
 
     return Status::OK();
 }
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 720cd5b2350..50ec2304096 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -757,7 +757,8 @@ void Tablet::delete_expired_stale_rowset() {
             Version test_version = Version(0, lastest_delta->end_version());
             stale_version_path_map[*path_id_iter] = version_path;
 
-            Status status = capture_consistent_versions(test_version, nullptr, 
false, false);
+            Status status =
+                    capture_consistent_versions_unlocked(test_version, 
nullptr, false, false);
             // 1. When there is no consistent versions, we must reconstruct 
the tracker.
             if (!status.ok()) {
                 // 2. fetch missing version after delete
@@ -867,19 +868,9 @@ void Tablet::delete_expired_stale_rowset() {
 #endif
 }
 
-bool Tablet::_reconstruct_version_tracker_if_necessary() {
-    double orphan_vertex_ratio = 
_timestamped_version_tracker.get_orphan_vertex_ratio();
-    if (orphan_vertex_ratio >= 
config::tablet_version_graph_orphan_vertex_ratio) {
-        _timestamped_version_tracker.construct_versioned_tracker(
-                _tablet_meta->all_rs_metas(), 
_tablet_meta->all_stale_rs_metas());
-        return true;
-    }
-    return false;
-}
-
-Status Tablet::capture_consistent_versions(const Version& spec_version,
-                                           std::vector<Version>* version_path,
-                                           bool skip_missing_version, bool 
quiet) const {
+Status Tablet::capture_consistent_versions_unlocked(const Version& 
spec_version,
+                                                    std::vector<Version>* 
version_path,
+                                                    bool skip_missing_version, 
bool quiet) const {
     Status status =
             
_timestamped_version_tracker.capture_consistent_versions(spec_version, 
version_path);
     if (!status.ok() && !quiet) {
@@ -914,10 +905,10 @@ Status Tablet::capture_consistent_versions(const Version& 
spec_version,
 
 Status Tablet::check_version_integrity(const Version& version, bool quiet) {
     std::shared_lock rdlock(_meta_lock);
-    return capture_consistent_versions(version, nullptr, false, quiet);
+    return capture_consistent_versions_unlocked(version, nullptr, false, 
quiet);
 }
 
-bool Tablet::exceed_version_limit(int32_t limit) const {
+bool Tablet::exceed_version_limit(int32_t limit) {
     if (_tablet_meta->version_count() > limit) {
         exceed_version_limit_counter << 1;
         return true;
@@ -947,7 +938,8 @@ void Tablet::acquire_version_and_rowsets(
 Status Tablet::capture_consistent_rowsets(const Version& spec_version,
                                           std::vector<RowsetSharedPtr>* 
rowsets) const {
     std::vector<Version> version_path;
-    RETURN_IF_ERROR(capture_consistent_versions(spec_version, &version_path, 
false, false));
+    RETURN_IF_ERROR(
+            capture_consistent_versions_unlocked(spec_version, &version_path, 
false, false));
     RETURN_IF_ERROR(_capture_consistent_rowsets_unlocked(version_path, 
rowsets));
     return Status::OK();
 }
@@ -984,39 +976,12 @@ Status Tablet::_capture_consistent_rowsets_unlocked(const 
std::vector<Version>&
 }
 
 Status Tablet::capture_rs_readers(const Version& spec_version, 
std::vector<RowSetSplits>* rs_splits,
-                                  bool skip_missing_version) const {
+                                  bool skip_missing_version) {
+    std::shared_lock rlock(_meta_lock);
     std::vector<Version> version_path;
-    RETURN_IF_ERROR(
-            capture_consistent_versions(spec_version, &version_path, 
skip_missing_version, false));
-    RETURN_IF_ERROR(capture_rs_readers(version_path, rs_splits));
-    return Status::OK();
-}
-
-Status Tablet::capture_rs_readers(const std::vector<Version>& version_path,
-                                  std::vector<RowSetSplits>* rs_splits) const {
-    DCHECK(rs_splits != nullptr && rs_splits->empty());
-    for (auto version : version_path) {
-        auto it = _rs_version_map.find(version);
-        if (it == _rs_version_map.end()) {
-            VLOG_NOTICE << "fail to find Rowset in rs_version for version. 
tablet=" << tablet_id()
-                        << ", version='" << version.first << "-" << 
version.second;
-
-            it = _stale_rs_version_map.find(version);
-            if (it == _stale_rs_version_map.end()) {
-                return Status::Error<CAPTURE_ROWSET_READER_ERROR>(
-                        "fail to find Rowset in stale_rs_version for version. 
tablet={}, "
-                        "version={}-{}",
-                        tablet_id(), version.first, version.second);
-            }
-        }
-        RowsetReaderSharedPtr rs_reader;
-        auto res = it->second->create_reader(&rs_reader);
-        if (!res.ok()) {
-            return Status::Error<CAPTURE_ROWSET_READER_ERROR>(
-                    "failed to create reader for rowset:{}", 
it->second->rowset_id().to_string());
-        }
-        rs_splits->push_back(RowSetSplits(std::move(rs_reader)));
-    }
+    RETURN_IF_ERROR(capture_consistent_versions_unlocked(spec_version, 
&version_path,
+                                                         skip_missing_version, 
false));
+    RETURN_IF_ERROR(capture_rs_readers_unlocked(version_path, rs_splits));
     return Status::OK();
 }
 
@@ -1419,16 +1384,6 @@ std::vector<RowsetSharedPtr> 
Tablet::pick_candidate_rowsets_to_build_inverted_in
     return candidate_rowsets;
 }
 
-std::string Tablet::_get_rowset_info_str(RowsetSharedPtr rowset, bool 
delete_flag) {
-    const Version& ver = rowset->version();
-    std::string disk_size = PrettyPrinter::print(
-            static_cast<uint64_t>(rowset->rowset_meta()->total_disk_size()), 
TUnit::BYTES);
-    return strings::Substitute("[$0-$1] $2 $3 $4 $5 $6", ver.first, ver.second,
-                               rowset->num_segments(), (delete_flag ? "DELETE" 
: "DATA"),
-                               
SegmentsOverlapPB_Name(rowset->rowset_meta()->segments_overlap()),
-                               rowset->rowset_id().to_string(), disk_size);
-}
-
 // For http compaction action
 void Tablet::get_compaction_status(std::string* json_result) {
     rapidjson::Document root;
@@ -1523,17 +1478,16 @@ void Tablet::get_compaction_status(std::string* 
json_result) {
     versions_arr.SetArray();
     missing_versions_arr.SetArray();
     int64_t last_version = -1;
-    for (int i = 0; i < rowsets.size(); ++i) {
-        const Version& ver = rowsets[i]->version();
+    for (auto& rowset : rowsets) {
+        const Version& ver = rowset->version();
         if (ver.first != last_version + 1) {
             rapidjson::Value miss_value;
-            miss_value.SetString(
-                    strings::Substitute("[$0-$1]", last_version + 1, ver.first 
- 1).c_str(),
-                    missing_versions_arr.GetAllocator());
+            miss_value.SetString(fmt::format("[{}-{}]", last_version + 1, 
ver.first - 1).c_str(),
+                                 missing_versions_arr.GetAllocator());
             missing_versions_arr.PushBack(miss_value, 
missing_versions_arr.GetAllocator());
         }
         rapidjson::Value value;
-        std::string version_str = _get_rowset_info_str(rowsets[i], 
delete_flags[i]);
+        std::string version_str = rowset->get_rowset_info_str();
         value.SetString(version_str.c_str(), version_str.length(), 
versions_arr.GetAllocator());
         versions_arr.PushBack(value, versions_arr.GetAllocator());
         last_version = ver.second;
@@ -1544,15 +1498,9 @@ void Tablet::get_compaction_status(std::string* 
json_result) {
     // print all stale rowsets' version as an array
     rapidjson::Document stale_versions_arr;
     stale_versions_arr.SetArray();
-    for (int i = 0; i < stale_rowsets.size(); ++i) {
-        const Version& ver = stale_rowsets[i]->version();
+    for (auto& rowset : stale_rowsets) {
         rapidjson::Value value;
-        std::string disk_size = PrettyPrinter::print(
-                
static_cast<uint64_t>(stale_rowsets[i]->rowset_meta()->total_disk_size()),
-                TUnit::BYTES);
-        std::string version_str = strings::Substitute(
-                "[$0-$1] $2 $3 $4", ver.first, ver.second, 
stale_rowsets[i]->num_segments(),
-                stale_rowsets[i]->rowset_id().to_string(), disk_size);
+        std::string version_str = rowset->get_rowset_info_str();
         value.SetString(version_str.c_str(), version_str.length(),
                         stale_versions_arr.GetAllocator());
         stale_versions_arr.PushBack(value, stale_versions_arr.GetAllocator());
@@ -3449,8 +3397,8 @@ Status Tablet::check_rowid_conversion(
 Status Tablet::all_rs_id(int64_t max_version, RowsetIdUnorderedSet* 
rowset_ids) const {
     //  Ensure that the obtained versions of rowsets are continuous
     std::vector<Version> version_path;
-    RETURN_IF_ERROR(
-            capture_consistent_versions(Version(0, max_version), 
&version_path, false, false));
+    RETURN_IF_ERROR(capture_consistent_versions_unlocked(Version(0, 
max_version), &version_path,
+                                                         false, false));
     for (auto& ver : version_path) {
         if (ver.second == 1) {
             // [0-1] rowset is empty for each tablet, skip it
@@ -3719,8 +3667,7 @@ Status 
Tablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, in
         if (rowsets != nullptr) {
             for (const auto& rowset : *rowsets) {
                 rapidjson::Value value;
-                std::string version_str =
-                        _get_rowset_info_str(rowset, 
rowset->rowset_meta()->has_delete_predicate());
+                std::string version_str = rowset->get_rowset_info_str();
                 value.SetString(version_str.c_str(), version_str.length(),
                                 required_rowsets_arr.GetAllocator());
                 required_rowsets_arr.PushBack(value, 
required_rowsets_arr.GetAllocator());
@@ -3733,8 +3680,7 @@ Status 
Tablet::check_delete_bitmap_correctness(DeleteBitmapPtr delete_bitmap, in
             }
             for (const auto& rowset : rowsets) {
                 rapidjson::Value value;
-                std::string version_str =
-                        _get_rowset_info_str(rowset, 
rowset->rowset_meta()->has_delete_predicate());
+                std::string version_str = rowset->get_rowset_info_str();
                 value.SetString(version_str.c_str(), version_str.length(),
                                 required_rowsets_arr.GetAllocator());
                 required_rowsets_arr.PushBack(value, 
required_rowsets_arr.GetAllocator());
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index d6ad0285233..d953d8fce4f 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -123,7 +123,7 @@ public:
 
     size_t num_rows();
     int version_count() const;
-    bool exceed_version_limit(int32_t limit) const override;
+    bool exceed_version_limit(int32_t limit) override;
     uint64_t segment_count() const;
     Version max_version() const;
     Version max_version_unlocked() const;
@@ -170,9 +170,9 @@ public:
     // Given spec_version, find a continuous version path and store it in 
version_path.
     // If quiet is true, then only "does this path exist" is returned.
     // If skip_missing_version is true, return ok even there are missing 
versions.
-    Status capture_consistent_versions(const Version& spec_version,
-                                       std::vector<Version>* version_path,
-                                       bool skip_missing_version, bool quiet) 
const;
+    Status capture_consistent_versions_unlocked(const Version& spec_version,
+                                                std::vector<Version>* 
version_path,
+                                                bool skip_missing_version, 
bool quiet) const;
     // if quiet is true, no error log will be printed if there are missing 
versions
     Status check_version_integrity(const Version& version, bool quiet = false);
     bool check_version_exist(const Version& version) const;
@@ -183,10 +183,7 @@ public:
                                       std::vector<RowsetSharedPtr>* rowsets) 
const;
     // If skip_missing_version is true, skip versions if they are missing.
     Status capture_rs_readers(const Version& spec_version, 
std::vector<RowSetSplits>* rs_splits,
-                              bool skip_missing_version) const override;
-
-    Status capture_rs_readers(const std::vector<Version>& version_path,
-                              std::vector<RowSetSplits>* rs_splits) const;
+                              bool skip_missing_version) override;
 
     // meta lock
     std::shared_mutex& get_header_lock() { return _meta_lock; }
@@ -583,9 +580,6 @@ private:
             std::shared_ptr<CumulativeCompactionPolicy> 
cumulative_compaction_policy);
     uint32_t _calc_base_compaction_score() const;
 
-    // When the proportion of empty edges in the adjacency matrix used to 
represent the version graph
-    // in the version tracker is greater than the threshold, rebuild the 
version tracker
-    bool _reconstruct_version_tracker_if_necessary();
     void _init_context_common_fields(RowsetWriterContext& context);
 
     void _rowset_ids_difference(const RowsetIdUnorderedSet& cur, const 
RowsetIdUnorderedSet& pre,
@@ -607,7 +601,6 @@ private:
     
////////////////////////////////////////////////////////////////////////////
 
     void _remove_sentinel_mark_from_delete_bitmap(DeleteBitmapPtr 
delete_bitmap);
-    std::string _get_rowset_info_str(RowsetSharedPtr rowset, bool delete_flag);
 
 public:
     static const int64_t K_INVALID_CUMULATIVE_POINT = -1;
@@ -615,7 +608,6 @@ public:
 private:
     StorageEngine& _engine;
     DataDir* _data_dir = nullptr;
-    TimestampedVersionTracker _timestamped_version_tracker;
 
     DorisCallOnce<Status> _init_once;
     // meta store lock is used for prevent 2 threads do checkpoint concurrently
@@ -634,13 +626,6 @@ private:
     // during publish_txn, which might take hundreds of milliseconds
     mutable std::mutex _rowset_update_lock;
 
-    // After version 0.13, all newly created rowsets are saved in 
_rs_version_map.
-    // And if rowset being compacted, the old rowsetis will be saved in 
_stale_rs_version_map;
-    std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> 
_rs_version_map;
-    // This variable _stale_rs_version_map is used to record these rowsets 
which are be compacted.
-    // These _stale rowsets are been removed when rowsets' pathVersion is 
expired,
-    // this policy is judged and computed by TimestampedVersionTracker.
-    std::unordered_map<Version, RowsetSharedPtr, HashOfVersion> 
_stale_rs_version_map;
     // if this tablet is broken, set to true. default is false
     std::atomic<bool> _is_bad;
     // timestamp of last cumu compaction failure
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index c8a0387082c..f14622081d9 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -36,6 +36,7 @@
 #include "olap/file_header.h"
 #include "olap/olap_common.h"
 #include "olap/olap_define.h"
+#include "olap/rowset/rowset.h"
 #include "olap/tablet_meta_manager.h"
 #include "olap/utils.h"
 #include "util/debug_points.h"
@@ -727,6 +728,12 @@ Status TabletMeta::add_rs_meta(const RowsetMetaSharedPtr& 
rs_meta) {
     return Status::OK();
 }
 
+void TabletMeta::add_rowsets_unchecked(const std::vector<RowsetSharedPtr>& 
to_add) {
+    for (const auto& rs : to_add) {
+        _rs_metas.push_back(rs->rowset_meta());
+    }
+}
+
 void TabletMeta::delete_rs_meta_by_version(const Version& version,
                                            std::vector<RowsetMetaSharedPtr>* 
deleted_rs_metas) {
     auto it = _rs_metas.begin();
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index db7912a452c..094bb21507d 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -202,6 +202,12 @@ public:
     // used for after tablet cloned to clear stale rowset
     void clear_stale_rowset() { _stale_rs_metas.clear(); }
 
+    void clear_rowsets() { _rs_metas.clear(); }
+
+    // MUST hold EXCLUSIVE `_meta_lock` in belonged Tablet
+    // `to_add` MUST NOT have overlapped version with `_rs_metas` in tablet 
meta.
+    void add_rowsets_unchecked(const std::vector<RowsetSharedPtr>& to_add);
+
     bool all_beta() const;
 
     int64_t storage_policy_id() const { return _storage_policy_id; }
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index e1f39f2948b..f7fe7f813f2 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -528,16 +528,12 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
 
             if (_shared_scan_opt) {
                 auto& read_source = tablets_read_source.emplace_back();
-                {
-                    std::shared_lock rdlock(tablet->get_header_lock());
-                    auto st =
-                            tablet->capture_rs_readers({0, version}, 
&read_source.rs_splits, false);
-                    if (!st.ok()) {
-                        LOG(WARNING) << "fail to init reader.res=" << st;
-                        return Status::InternalError(
-                                "failed to initialize storage reader. 
tablet_id={} : {}",
-                                tablet->tablet_id(), st.to_string());
-                    }
+                auto st = tablet->capture_rs_readers({0, version}, 
&read_source.rs_splits, false);
+                if (!st.ok()) {
+                    LOG(WARNING) << "fail to init reader.res=" << st;
+                    return Status::InternalError(
+                            "failed to initialize storage reader. tablet_id={} 
: {}",
+                            tablet->tablet_id(), st.to_string());
                 }
                 if (!_state->skip_delete_predicate()) {
                     read_source.fill_delete_predicates();
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index bc15cf7207f..20938ecb8ba 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -182,15 +182,12 @@ Status NewOlapScanner::init() {
             // to prevent this case: when there are lots of olap scanners to 
run for example 10000
             // the rowsets maybe compacted when the last olap scanner starts
             ReadSource read_source;
-            {
-                std::shared_lock rdlock(tablet->get_header_lock());
-                auto st = 
tablet->capture_rs_readers(_tablet_reader_params.version,
-                                                     &read_source.rs_splits,
-                                                     
_state->skip_missing_version());
-                if (!st.ok()) {
-                    LOG(WARNING) << "fail to init reader.res=" << st;
-                    return st;
-                }
+            auto st = tablet->capture_rs_readers(_tablet_reader_params.version,
+                                                 &read_source.rs_splits,
+                                                 
_state->skip_missing_version());
+            if (!st.ok()) {
+                LOG(WARNING) << "fail to init reader.res=" << st;
+                return st;
             }
             if (!_state->skip_delete_predicate()) {
                 read_source.fill_delete_predicates();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to