This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 5014ad03e7 [feature](cooldown) Auto delete unused remote files (#16588)
5014ad03e7 is described below

commit 5014ad03e70a9d2f456d7eddbf91fe8710784917
Author: plat1ko <[email protected]>
AuthorDate: Mon Feb 13 23:59:39 2023 +0800

    [feature](cooldown) Auto delete unused remote files (#16588)
---
 be/src/agent/agent_server.cpp                      |   3 +
 be/src/agent/task_worker_pool.cpp                  |   8 +-
 be/src/agent/task_worker_pool.h                    |   4 +-
 be/src/agent/utils.cpp                             |  73 +++++++-
 be/src/agent/utils.h                               |  18 +-
 be/src/common/config.h                             |   2 +
 be/src/io/fs/remote_file_system.h                  |   4 +
 be/src/io/fs/s3_file_system.cpp                    |  69 +++++++-
 be/src/io/fs/s3_file_system.h                      |   2 +
 be/src/olap/data_dir.cpp                           |  20 ++-
 be/src/olap/olap_server.cpp                        |  16 +-
 be/src/olap/snapshot_manager.cpp                   |   8 +
 be/src/olap/storage_engine.h                       |   2 +
 be/src/olap/tablet.cpp                             | 191 ++++++++++++++++++++-
 be/src/olap/tablet.h                               |  16 +-
 be/src/olap/tablet_manager.cpp                     |  70 +++-----
 be/src/olap/tablet_manager.h                       |  16 +-
 be/src/olap/tablet_meta.cpp                        |   8 +
 be/src/olap/tablet_meta.h                          |   5 +
 be/src/olap/task/engine_clone_task.cpp             |  42 ++++-
 be/src/olap/task/engine_clone_task.h               |   4 +-
 be/src/util/pprof_utils.cpp                        |   3 +
 be/src/util/uid_util.h                             |   3 +
 .../java/org/apache/doris/catalog/Replica.java     |  11 ++
 .../apache/doris/catalog/TabletInvertedIndex.java  |   1 +
 .../org/apache/doris/cooldown/CooldownConf.java    |   2 +-
 .../{CooldownConf.java => CooldownDelete.java}     |  42 +----
 .../org/apache/doris/journal/JournalEntity.java    |   6 +
 .../java/org/apache/doris/persist/EditLog.java     |   8 +
 .../org/apache/doris/persist/OperationType.java    |   3 +-
 .../apache/doris/service/FrontendServiceImpl.java  |  72 ++++++++
 gensrc/proto/olap_file.proto                       |   1 +
 gensrc/thrift/FrontendService.thrift               |  16 ++
 gensrc/thrift/MasterService.thrift                 |   1 +
 34 files changed, 620 insertions(+), 130 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index bd1a63fd69..5fc46a912d 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -25,6 +25,7 @@
 #include "agent/task_worker_pool.h"
 #include "agent/topic_subscriber.h"
 #include "agent/user_resource_listener.h"
+#include "agent/utils.h"
 #include "common/logging.h"
 #include "common/status.h"
 #include "gutil/strings/substitute.h"
@@ -49,6 +50,8 @@ AgentServer::AgentServer(ExecEnv* exec_env, const 
TMasterInfo& master_info)
         }
     }
 
+    MasterServerClient::create(master_info);
+
     // It is the same code to create workers of each type, so we use a macro
     // to make code to be more readable.
 
diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index 40d7453f6c..3ed931ddaf 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -29,6 +29,7 @@
 #include <sstream>
 #include <string>
 
+#include "agent/utils.h"
 #include "common/logging.h"
 #include "common/status.h"
 #include "env/env.h"
@@ -71,13 +72,11 @@ const int64_t PUBLISH_TIMEOUT_SEC = 10;
 std::atomic_ulong TaskWorkerPool::_s_report_version(time(nullptr) * 10000);
 std::mutex TaskWorkerPool::_s_task_signatures_lock;
 std::map<TTaskType::type, std::set<int64_t>> 
TaskWorkerPool::_s_task_signatures;
-FrontendServiceClientCache TaskWorkerPool::_master_service_client_cache;
 
 TaskWorkerPool::TaskWorkerPool(const TaskWorkerType task_worker_type, ExecEnv* 
env,
                                const TMasterInfo& master_info, ThreadModel 
thread_model)
         : _master_info(master_info),
           _agent_utils(new AgentUtils()),
-          _master_client(new MasterServerClient(_master_info, 
&_master_service_client_cache)),
           _env(env),
           _stop_background_threads_latch(1),
           _is_work(false),
@@ -303,7 +302,8 @@ void TaskWorkerPool::_finish_task(const TFinishTaskRequest& 
finish_task_request)
 
     while (try_time < TASK_FINISH_MAX_RETRY) {
         DorisMetrics::instance()->finish_task_requests_total->increment(1);
-        Status client_status = 
_master_client->finish_task(finish_task_request, &result);
+        Status client_status =
+                
MasterServerClient::instance()->finish_task(finish_task_request, &result);
 
         if (client_status.ok()) {
             break;
@@ -1648,7 +1648,7 @@ Status TaskWorkerPool::_move_dir(const TTabletId 
tablet_id, const std::string& s
 
 void TaskWorkerPool::_handle_report(TReportRequest& request, ReportType type) {
     TMasterResult result;
-    Status status = _master_client->report(request, &result);
+    Status status = MasterServerClient::instance()->report(request, &result);
     bool is_report_success = false;
     if (!status.ok()) {
         LOG_WARNING("failed to report {}", TYPE_STRING(type))
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 3b81117285..d583a02495 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -23,7 +23,6 @@
 #include <utility>
 #include <vector>
 
-#include "agent/utils.h"
 #include "common/status.h"
 #include "gen_cpp/AgentService_types.h"
 #include "gen_cpp/HeartbeatService_types.h"
@@ -35,6 +34,7 @@ namespace doris {
 
 class ExecEnv;
 class ThreadPool;
+class AgentUtils;
 
 class TaskWorkerPool {
 public:
@@ -224,7 +224,6 @@ private:
     const TMasterInfo& _master_info;
     TBackend _backend;
     std::unique_ptr<AgentUtils> _agent_utils;
-    std::unique_ptr<MasterServerClient> _master_client;
     ExecEnv* _env;
 
     // Protect task queue
@@ -246,7 +245,6 @@ private:
     uint32_t _worker_count;
     TaskWorkerType _task_worker_type;
 
-    static FrontendServiceClientCache _master_service_client_cache;
     static std::atomic_ulong _s_report_version;
 
     static std::mutex _s_task_signatures_lock;
diff --git a/be/src/agent/utils.cpp b/be/src/agent/utils.cpp
index 7c0a552c5f..4543a44e83 100644
--- a/be/src/agent/utils.cpp
+++ b/be/src/agent/utils.cpp
@@ -27,23 +27,34 @@
 #include <sstream>
 
 #include "common/status.h"
+#include "runtime/client_cache.h"
 
 using std::map;
 using std::string;
 using std::stringstream;
-using std::vector;
 using apache::thrift::TException;
 using apache::thrift::transport::TTransportException;
 
 namespace doris {
 
-MasterServerClient::MasterServerClient(const TMasterInfo& master_info,
-                                       FrontendServiceClientCache* 
client_cache)
-        : _master_info(master_info), _client_cache(client_cache) {}
+static FrontendServiceClientCache s_client_cache;
+static std::unique_ptr<MasterServerClient> s_client;
+
+MasterServerClient* MasterServerClient::create(const TMasterInfo& master_info) 
{
+    s_client.reset(new MasterServerClient(master_info));
+    return s_client.get();
+}
+
+MasterServerClient* MasterServerClient::instance() {
+    return s_client.get();
+}
+
+MasterServerClient::MasterServerClient(const TMasterInfo& master_info)
+        : _master_info(master_info) {}
 
 Status MasterServerClient::finish_task(const TFinishTaskRequest& request, 
TMasterResult* result) {
     Status client_status;
-    FrontendServiceConnection client(_client_cache, 
_master_info.network_address,
+    FrontendServiceConnection client(&s_client_cache, 
_master_info.network_address,
                                      config::thrift_rpc_timeout_ms, 
&client_status);
 
     if (!client_status.ok()) {
@@ -82,7 +93,7 @@ Status MasterServerClient::finish_task(const 
TFinishTaskRequest& request, TMaste
 
 Status MasterServerClient::report(const TReportRequest& request, 
TMasterResult* result) {
     Status client_status;
-    FrontendServiceConnection client(_client_cache, 
_master_info.network_address,
+    FrontendServiceConnection client(&s_client_cache, 
_master_info.network_address,
                                      config::thrift_rpc_timeout_ms, 
&client_status);
 
     if (!client_status.ok()) {
@@ -131,6 +142,56 @@ Status MasterServerClient::report(const TReportRequest& 
request, TMasterResult*
     return Status::OK();
 }
 
+Status MasterServerClient::confirm_unused_remote_files(
+        const TConfirmUnusedRemoteFilesRequest& request, 
TConfirmUnusedRemoteFilesResult* result) {
+    Status client_status;
+    FrontendServiceConnection client(&s_client_cache, 
_master_info.network_address,
+                                     config::thrift_rpc_timeout_ms, 
&client_status);
+
+    if (!client_status.ok()) {
+        return Status::InternalError(
+                "fail to get master client from cache. host={}, port={}, 
code={}",
+                _master_info.network_address.hostname, 
_master_info.network_address.port,
+                client_status.code());
+    }
+    try {
+        try {
+            client->confirmUnusedRemoteFiles(*result, request);
+        } catch (TTransportException& e) {
+            TTransportException::TTransportExceptionType type = e.getType();
+            if (type != 
TTransportException::TTransportExceptionType::TIMED_OUT) {
+                // if not TIMED_OUT, retry
+                LOG(WARNING) << "master client, retry finishTask: " << 
e.what();
+
+                client_status = client.reopen(config::thrift_rpc_timeout_ms);
+                if (!client_status.ok()) {
+                    return Status::InternalError(
+                            "fail to get master client from cache. host={}, 
port={}, code={}",
+                            _master_info.network_address.hostname,
+                            _master_info.network_address.port, 
client_status.code());
+                }
+
+                client->confirmUnusedRemoteFiles(*result, request);
+            } else {
+                // TIMED_OUT exception. do not retry
+                // actually we don't care what FE returns.
+                return Status::InternalError(
+                        "fail to confirm unused remote files. host={}, 
port={}, code={}, reason={}",
+                        _master_info.network_address.hostname, 
_master_info.network_address.port,
+                        client_status.code(), e.what());
+            }
+        }
+    } catch (TException& e) {
+        client.reopen(config::thrift_rpc_timeout_ms);
+        return Status::InternalError(
+                "fail to confirm unused remote files. host={}, port={}, 
code={}, reason={}",
+                _master_info.network_address.hostname, 
_master_info.network_address.port,
+                client_status.code(), e.what());
+    }
+
+    return Status::OK();
+}
+
 bool AgentUtils::exec_cmd(const string& command, string* errmsg, bool 
redirect_stderr) {
     // The exit status of the command.
     uint32_t rc = 0;
diff --git a/be/src/agent/utils.h b/be/src/agent/utils.h
index 851d045f13..cfce0c3413 100644
--- a/be/src/agent/utils.h
+++ b/be/src/agent/utils.h
@@ -22,14 +22,16 @@
 #include <gen_cpp/MasterService_types.h>
 
 #include "common/status.h"
-#include "runtime/client_cache.h"
+#include "gutil/macros.h"
 
 namespace doris {
 
 class MasterServerClient {
 public:
-    MasterServerClient(const TMasterInfo& master_info, 
FrontendServiceClientCache* client_cache);
-    virtual ~MasterServerClient() = default;
+    static MasterServerClient* create(const TMasterInfo& master_info);
+    static MasterServerClient* instance();
+
+    ~MasterServerClient() = default;
 
     // Report finished task to the master server
     //
@@ -38,7 +40,7 @@ public:
     //
     // Output parameters:
     // * result: The result of report task
-    virtual Status finish_task(const TFinishTaskRequest& request, 
TMasterResult* result);
+    Status finish_task(const TFinishTaskRequest& request, TMasterResult* 
result);
 
     // Report tasks/olap tablet/disk state to the master server
     //
@@ -47,14 +49,18 @@ public:
     //
     // Output parameters:
     // * result: The result of report task
-    virtual Status report(const TReportRequest& request, TMasterResult* 
result);
+    Status report(const TReportRequest& request, TMasterResult* result);
+
+    Status confirm_unused_remote_files(const TConfirmUnusedRemoteFilesRequest& 
request,
+                                       TConfirmUnusedRemoteFilesResult* 
result);
 
 private:
+    MasterServerClient(const TMasterInfo& master_info);
+
     DISALLOW_COPY_AND_ASSIGN(MasterServerClient);
 
     // Not owner. Reference to the ExecEnv::_master_info
     const TMasterInfo& _master_info;
-    FrontendServiceClientCache* _client_cache;
 };
 
 class AgentUtils {
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 100664fb97..176b3d665a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -809,6 +809,8 @@ CONF_mInt32(bloom_filter_predicate_check_row_num, "204800");
 // cooldown task configs
 CONF_Int32(cooldown_thread_num, "5");
 CONF_mInt64(generate_cooldown_task_interval_sec, "20");
+CONF_mInt32(remove_unused_remote_files_interval_sec, "21600"); // 6h
+CONF_mInt32(confirm_unused_remote_files_interval_sec, "60");
 CONF_mInt64(generate_cache_cleaner_task_interval_sec, "43200"); // 12 h
 CONF_Int32(concurrency_per_dir, "2");
 CONF_mInt64(cooldown_lag_time_sec, "10800");       // 3h
diff --git a/be/src/io/fs/remote_file_system.h 
b/be/src/io/fs/remote_file_system.h
index f4c525a181..ff1d2f6961 100644
--- a/be/src/io/fs/remote_file_system.h
+++ b/be/src/io/fs/remote_file_system.h
@@ -34,6 +34,10 @@ public:
     virtual Status batch_upload(const std::vector<Path>& local_paths,
                                 const std::vector<Path>& dest_paths) = 0;
 
+    virtual Status batch_delete(const std::vector<Path>& paths) {
+        return Status::NotSupported("batch_delete");
+    }
+
     virtual Status connect() = 0;
 
     Status open_file(const Path& path, const FileReaderOptions& reader_options,
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index b05df0e36f..111f9f1723 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -26,6 +26,7 @@
 #include <aws/s3/model/ListObjectsV2Request.h>
 #include <aws/s3/model/PutObjectRequest.h>
 #include <aws/transfer/TransferManager.h>
+#include <opentelemetry/common/threadlocal.h>
 
 #include <filesystem>
 #include <fstream>
@@ -62,6 +63,9 @@ S3FileSystem::S3FileSystem(S3Conf&& s3_conf, std::string&& id)
     if (_s3_conf.prefix.size() > 0 && _s3_conf.prefix[0] == '/') {
         _s3_conf.prefix = _s3_conf.prefix.substr(1);
     }
+    if (!_s3_conf.prefix.empty() && _s3_conf.prefix.back() == '/') {
+        _s3_conf.prefix.pop_back();
+    }
     _executor = Aws::MakeShared<Aws::Utils::Threading::PooledThreadExecutor>(
             id.c_str(), config::s3_transfer_executor_pool_size);
 }
@@ -372,7 +376,70 @@ Status S3FileSystem::file_size_impl(const Path& path, 
size_t* file_size) const {
 }
 
 Status S3FileSystem::list(const Path& path, std::vector<Path>* files) {
-    return Status::NotSupported("not support");
+    auto client = get_client();
+    CHECK_S3_CLIENT(client);
+
+    Aws::S3::Model::ListObjectsV2Request request;
+    auto prefix = get_key(path);
+    if (!prefix.empty() && prefix.back() != '/') {
+        prefix.push_back('/');
+    }
+    request.WithBucket(_s3_conf.bucket).WithPrefix(prefix);
+    bool is_trucated = false;
+    do {
+        auto outcome = client->ListObjectsV2(request);
+        if (!outcome.IsSuccess()) {
+            return Status::IOError("failed to list objects(endpoint={}, 
bucket={}, prefix={}): {}",
+                                   _s3_conf.endpoint, _s3_conf.bucket, prefix,
+                                   outcome.GetError().GetMessage());
+        }
+        for (const auto& obj : outcome.GetResult().GetContents()) {
+            files->push_back(obj.GetKey().substr(prefix.size()));
+        }
+        is_trucated = outcome.GetResult().GetIsTruncated();
+    } while (is_trucated);
+    return Status::OK();
+}
+
+Status S3FileSystem::batch_delete(const std::vector<Path>& paths) {
+    auto client = get_client();
+    CHECK_S3_CLIENT(client);
+
+    // `DeleteObjectsRequest` can only contain 1000 keys at most.
+    constexpr size_t max_delete_batch = 1000;
+    auto path_iter = paths.begin();
+
+    Aws::S3::Model::DeleteObjectsRequest delete_request;
+    delete_request.SetBucket(_s3_conf.bucket);
+    do {
+        Aws::S3::Model::Delete del;
+        Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects;
+        auto path_begin = path_iter;
+        for (; path_iter != paths.end() && (path_iter - path_begin < 
max_delete_batch);
+             ++path_iter) {
+            objects.emplace_back().SetKey(get_key(*path_iter));
+        }
+        if (objects.empty()) {
+            return Status::OK();
+        }
+        del.WithObjects(std::move(objects)).SetQuiet(true);
+        delete_request.SetDelete(std::move(del));
+        auto delete_outcome = client->DeleteObjects(delete_request);
+        if (UNLIKELY(!delete_outcome.IsSuccess())) {
+            return Status::IOError(
+                    "failed to delete objects(endpoint={}, bucket={}, 
key[0]={}): {}",
+                    _s3_conf.endpoint, _s3_conf.bucket, 
objects.front().GetKey(),
+                    delete_outcome.GetError().GetMessage());
+        }
+        if (UNLIKELY(!delete_outcome.GetResult().GetErrors().empty())) {
+            const auto& e = delete_outcome.GetResult().GetErrors().front();
+            return Status::IOError("failed to delete objects(endpoint={}, 
bucket={}, key={}): {}",
+                                   _s3_conf.endpoint, _s3_conf.bucket, 
e.GetKey(),
+                                   delete_outcome.GetError().GetMessage());
+        }
+    } while (path_iter != paths.end());
+
+    return Status::OK();
 }
 
 std::string S3FileSystem::get_key(const Path& path) const {
diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h
index f8b8bc0bf8..0576421e53 100644
--- a/be/src/io/fs/s3_file_system.h
+++ b/be/src/io/fs/s3_file_system.h
@@ -80,6 +80,8 @@ public:
     Status batch_upload_impl(const std::vector<Path>& local_paths,
                              const std::vector<Path>& dest_paths);
 
+    Status batch_delete(const std::vector<Path>& paths) override;
+
     Status connect() override;
 
     Status connect_impl();
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index f1c5b65110..2ecece7d8a 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -848,18 +848,18 @@ void DataDir::perform_remote_rowset_gc() {
             continue;
         }
         DCHECK(fs->type() != io::FileSystemType::LOCAL);
-        bool success = true;
+        std::vector<io::Path> seg_paths;
+        seg_paths.reserve(gc_pb.num_segments());
         for (int i = 0; i < gc_pb.num_segments(); ++i) {
-            auto seg_path = BetaRowset::remote_segment_path(gc_pb.tablet_id(), 
rowset_id, i);
-            // TODO(plat1ko): batch delete
-            auto st = fs->delete_file(seg_path);
-            if (!st.ok()) {
-                LOG(WARNING) << "failed to delete remote rowset. err=" << st;
-                success = false;
-            }
+            
seg_paths.push_back(BetaRowset::remote_segment_path(gc_pb.tablet_id(), 
rowset_id, i));
         }
-        if (success) {
+        LOG(INFO) << "delete remote rowset. root_path=" << fs->root_path()
+                  << ", rowset_id=" << rowset_id;
+        auto st = 
std::static_pointer_cast<io::RemoteFileSystem>(fs)->batch_delete(seg_paths);
+        if (st.ok()) {
             deleted_keys.push_back(std::move(key));
+        } else {
+            LOG(WARNING) << "failed to delete remote rowset. err=" << st;
         }
     }
     for (const auto& key : deleted_keys) {
@@ -892,6 +892,8 @@ void DataDir::perform_remote_tablet_gc() {
                 success = false;
                 continue;
             }
+            LOG(INFO) << "delete remote rowsets of tablet. root_path=" << 
fs->root_path()
+                      << ", tablet_id=" << tablet_id;
             auto st = fs->delete_directory(DATA_PREFIX + '/' + tablet_id);
             if (!st.ok()) {
                 LOG(WARNING) << "failed to delete all remote rowset in tablet. 
err=" << st;
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 3038fadcc1..5f61adea46 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -156,6 +156,12 @@ Status StorageEngine::start_bg_threads() {
             &_cooldown_tasks_producer_thread));
     LOG(INFO) << "cooldown tasks producer thread started";
 
+    RETURN_IF_ERROR(Thread::create(
+            "StorageEngine", "remove_unused_remote_files_thread",
+            [this]() { this->_remove_unused_remote_files_callback(); },
+            &_remove_unused_remote_files_thread));
+    LOG(INFO) << "remove unused remote files thread started";
+
     RETURN_IF_ERROR(Thread::create(
             "StorageEngine", "cache_file_cleaner_tasks_producer_thread",
             [this]() { this->_cache_file_cleaner_tasks_producer_callback(); },
@@ -734,13 +740,21 @@ void StorageEngine::_cooldown_tasks_producer_callback() {
             task.priority = max_priority--;
             bool submited = _cooldown_thread_pool->offer(std::move(task));
 
-            if (submited) {
+            if (!submited) {
                 LOG(INFO) << "failed to submit cooldown task";
             }
         }
     } while 
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
 }
 
+void StorageEngine::_remove_unused_remote_files_callback() {
+    while (!_stop_background_threads_latch.wait_for(
+            
std::chrono::seconds(config::remove_unused_remote_files_interval_sec))) {
+        LOG(INFO) << "begin to remove unused remote files";
+        Tablet::remove_unused_remote_files();
+    }
+}
+
 void StorageEngine::_cache_file_cleaner_tasks_producer_callback() {
     int64_t interval = config::generate_cache_cleaner_task_interval_sec;
     do {
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index d7166dd97d..2c348ccc14 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -411,6 +411,14 @@ Status SnapshotManager::_create_snapshot_files(const 
TabletSharedPtr& ref_tablet
                     // find rowset in both rs_meta and stale_rs_meta
                     const RowsetSharedPtr rowset = 
ref_tablet->get_rowset_by_version(version, true);
                     if (rowset != nullptr) {
+                        if (!rowset->is_local()) {
+                            // MUST make full snapshot to ensure 
`cooldown_meta_id` is consistent with the cooldowned rowsets after clone.
+                            LOG(INFO) << "missed version is a cooldowned 
rowset, must make full "
+                                         "snapshot. missed_version="
+                                      << missed_version << " tablet_id=" << 
ref_tablet->tablet_id();
+                            res = Status::Error<ErrorCode::INTERNAL_ERROR>();
+                            break;
+                        }
                         consistent_rowsets.push_back(rowset);
                     } else {
                         res = Status::InternalError(
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index c89d6723a1..768e95daf9 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -273,6 +273,7 @@ private:
     void _adjust_compaction_thread_num();
 
     void _cooldown_tasks_producer_callback();
+    void _remove_unused_remote_files_callback();
 
     void _cache_file_cleaner_tasks_producer_callback();
 
@@ -394,6 +395,7 @@ private:
     std::shared_ptr<CumulativeCompactionPolicy> _cumulative_compaction_policy;
 
     scoped_refptr<Thread> _cooldown_tasks_producer_thread;
+    scoped_refptr<Thread> _remove_unused_remote_files_thread;
 
     scoped_refptr<Thread> _cache_file_cleaner_tasks_producer_thread;
 
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 7ed8d14b43..8748c67c86 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -30,6 +30,7 @@
 #include <sys/stat.h>
 
 #include <algorithm>
+#include <atomic>
 #include <cstdint>
 #include <map>
 #include <memory>
@@ -38,9 +39,11 @@
 #include <shared_mutex>
 #include <string>
 
+#include "agent/utils.h"
 #include "common/config.h"
 #include "common/logging.h"
 #include "common/status.h"
+#include "gutil/strings/stringpiece.h"
 #include "io/fs/path.h"
 #include "io/fs/remote_file_system.h"
 #include "olap/base_compaction.h"
@@ -69,6 +72,7 @@
 #include "util/scoped_cleanup.h"
 #include "util/time.h"
 #include "util/trace.h"
+#include "util/uid_util.h"
 #include "vec/data_types/data_type_factory.hpp"
 #include "vec/jsonb/serialize.h"
 
@@ -1397,6 +1401,9 @@ void Tablet::build_tablet_report_info(TTabletInfo* 
tablet_info,
         tablet_info->__set_cooldown_replica_id(_cooldown_replica_id);
         tablet_info->__set_cooldown_term(_cooldown_term);
     }
+    if (_tablet_meta->cooldown_meta_id().initialized()) {
+        
tablet_info->__set_cooldown_meta_id(_tablet_meta->cooldown_meta_id().to_thrift());
+    }
 }
 
 // should use this method to get a copy of current tablet meta
@@ -1671,14 +1678,20 @@ Status Tablet::cooldown() {
 Status Tablet::_cooldown_data(const std::shared_ptr<io::RemoteFileSystem>& 
dest_fs) {
     auto old_rowset = pick_cooldown_rowset();
     if (!old_rowset) {
-        LOG(WARNING) << "Cannot pick cooldown rowset in tablet " << 
tablet_id();
-        return Status::OK();
+        return Status::InternalError("cannot pick cooldown rowset in tablet 
{}", tablet_id());
     }
     RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
 
     auto start = std::chrono::steady_clock::now();
 
-    auto st = old_rowset->upload_to(dest_fs.get(), new_rowset_id);
+    Status st;
+    {
+        std::shared_lock slock(_remote_files_lock, std::try_to_lock);
+        if (!slock.owns_lock()) {
+            return Status::Status::Error<TRY_LOCK_FAILED>("try 
remote_files_lock failed");
+        }
+        st = old_rowset->upload_to(dest_fs.get(), new_rowset_id);
+    }
     if (!st.ok()) {
         // reclaim the incomplete rowset data in remote storage
         record_unused_remote_rowset(new_rowset_id, dest_fs->id(), 
old_rowset->num_segments());
@@ -1697,9 +1710,10 @@ Status Tablet::_cooldown_data(const 
std::shared_ptr<io::RemoteFileSystem>& dest_
     new_rowset_meta->set_resource_id(dest_fs->id());
     new_rowset_meta->set_fs(dest_fs);
     new_rowset_meta->set_creation_time(time(nullptr));
+    UniqueId cooldown_meta_id = UniqueId::gen_uid();
 
     // upload cooldowned rowset meta to remote fs
-    RETURN_IF_ERROR(_write_cooldown_meta(dest_fs.get(), 
new_rowset_meta.get()));
+    RETURN_IF_ERROR(_write_cooldown_meta(dest_fs.get(), cooldown_meta_id, 
new_rowset_meta.get()));
 
     RowsetSharedPtr new_rowset;
     RowsetFactory::create_rowset(_schema, _tablet_path, new_rowset_meta, 
&new_rowset);
@@ -1711,6 +1725,7 @@ Status Tablet::_cooldown_data(const 
std::shared_ptr<io::RemoteFileSystem>& dest_
         std::unique_lock meta_wlock(_meta_lock);
         if (tablet_state() == TABLET_RUNNING) {
             modify_rowsets(to_add, to_delete);
+            _tablet_meta->set_cooldown_meta_id(cooldown_meta_id);
             save_meta();
         }
     }
@@ -1735,7 +1750,8 @@ Status Tablet::_read_cooldown_meta(io::RemoteFileSystem* 
fs, int64_t cooldown_re
     return Status::OK();
 }
 
-Status Tablet::_write_cooldown_meta(io::RemoteFileSystem* fs, RowsetMeta* 
new_rs_meta) {
+Status Tablet::_write_cooldown_meta(io::RemoteFileSystem* fs, UniqueId 
cooldown_meta_id,
+                                    RowsetMeta* new_rs_meta) {
     std::vector<RowsetMetaSharedPtr> cooldowned_rs_metas;
     {
         std::shared_lock meta_rlock(_meta_lock);
@@ -1757,6 +1773,8 @@ Status Tablet::_write_cooldown_meta(io::RemoteFileSystem* 
fs, RowsetMeta* new_rs
         rs_metas->Add(rs_meta->get_rowset_pb());
     }
     rs_metas->Add(new_rs_meta->get_rowset_pb());
+    tablet_meta_pb.mutable_cooldown_meta_id()->set_hi(cooldown_meta_id.hi);
+    tablet_meta_pb.mutable_cooldown_meta_id()->set_lo(cooldown_meta_id.lo);
 
     std::string remote_meta_path =
             BetaRowset::remote_tablet_meta_path(tablet_id(), 
_tablet_meta->replica_id());
@@ -1774,6 +1792,11 @@ Status 
Tablet::_follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t cooldow
     TabletMetaPB cooldown_meta_pb;
     RETURN_IF_ERROR(_read_cooldown_meta(fs, cooldown_replica_id, 
&cooldown_meta_pb));
     DCHECK(cooldown_meta_pb.rs_metas_size() > 0);
+    if (_tablet_meta->cooldown_meta_id() == 
cooldown_meta_pb.cooldown_meta_id()) {
+        // cooldowned rowsets are same, no need to follow
+        return Status::OK();
+    }
+
     int64_t cooldowned_version = 
cooldown_meta_pb.rs_metas().rbegin()->end_version();
 
     std::vector<RowsetSharedPtr> overlap_rowsets;
@@ -1836,6 +1859,7 @@ Status 
Tablet::_follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t cooldow
 
     // TODO(plat1ko): process primary key
 
+    _tablet_meta->set_cooldown_meta_id(cooldown_meta_pb.cooldown_meta_id());
     save_meta();
 
     return Status::OK();
@@ -1950,6 +1974,163 @@ Status Tablet::remove_all_remote_rowsets() {
                                       gc_pb.SerializeAsString());
 }
 
+void Tablet::remove_unused_remote_files() {
+    auto tablets = 
StorageEngine::instance()->tablet_manager()->get_all_tablet([](Tablet* t) {
+        return t->tablet_meta()->cooldown_meta_id().initialized() && 
t->is_used() &&
+               t->tablet_state() == TABLET_RUNNING;
+    });
+    TConfirmUnusedRemoteFilesRequest req;
+    req.__isset.confirm_list = true;
+    // tablet_id -> [fs, unused_remote_files]
+    using unused_remote_files_buffer_t = std::unordered_map<
+            int64_t, std::pair<std::shared_ptr<io::RemoteFileSystem>, 
std::vector<io::Path>>>;
+    unused_remote_files_buffer_t buffer;
+    int64_t num_files_in_buffer = 0;
+    // assume a filename is 0.1KB, buffer size should not larger than 100MB
+    constexpr int64_t max_files_in_buffer = 1000000;
+
+    auto calc_unused_remote_files = [&req, &buffer, 
&num_files_in_buffer](Tablet* t) {
+        auto storage_policy = get_storage_policy(t->storage_policy_id());
+        if (storage_policy == nullptr) {
+            LOG(WARNING) << "could not find storage_policy, storage_policy_id="
+                         << t->storage_policy_id();
+            return;
+        }
+        auto resource = get_storage_resource(storage_policy->resource_id);
+        auto dest_fs = 
std::static_pointer_cast<io::RemoteFileSystem>(resource.fs);
+        if (dest_fs == nullptr) {
+            LOG(WARNING) << "could not find resource, resouce_id=" << 
storage_policy->resource_id;
+            return;
+        }
+        DCHECK(atol(dest_fs->id().c_str()) == storage_policy->resource_id);
+        DCHECK(dest_fs->type() != io::FileSystemType::LOCAL);
+
+        Status st;
+        std::vector<io::Path> files;
+        {
+            std::unique_lock xlock(t->_remote_files_lock, std::try_to_lock);
+            if (!xlock.owns_lock()) {
+                LOG(WARNING) << "try remote_files_lock failed. tablet_id=" << 
t->tablet_id();
+                return;
+            }
+            // FIXME(plat1ko): What if user reset resource in storage policy 
to another resource?
+            //  Maybe we should also list files in previously uploaded 
resources.
+            st = dest_fs->list(BetaRowset::remote_tablet_path(t->tablet_id()), 
&files);
+        }
+        if (!st.ok()) {
+            LOG(WARNING) << "encounter error when remove unused remote files, 
tablet_id="
+                         << t->tablet_id() << " : " << st;
+        }
+        if (files.empty()) {
+            return;
+        }
+        // get all cooldowned rowsets
+        std::unordered_set<std::string> cooldowned_rowsets;
+        UniqueId cooldown_meta_id;
+        {
+            std::shared_lock rlock(t->_meta_lock);
+            for (auto& rs_meta : t->_tablet_meta->all_rs_metas()) {
+                if (!rs_meta->is_local()) {
+                    
cooldowned_rowsets.insert(rs_meta->rowset_id().to_string());
+                }
+            }
+            cooldown_meta_id = t->_tablet_meta->cooldown_meta_id();
+        }
+        // {replica_id}.meta
+        std::string remote_meta_path = std::to_string(t->replica_id()) + 
".meta";
+        // filter out the paths that should be reserved
+        // clang-format off
+        files.erase(std::remove_if(files.begin(), files.end(), [&](io::Path& 
path) {
+            const std::string& path_str = path.native();
+            if (StringPiece(path_str).ends_with(".meta")) {
+                return path_str == remote_meta_path;
+            }
+            if (StringPiece(path_str).ends_with(".dat")) {
+                // extract rowset id. filename format: 
{rowset_id}_{segment_num}.dat
+                auto end = path_str.rfind('_');
+                if (UNLIKELY(end == std::string::npos)) {
+                    return false;
+                }
+                return !!cooldowned_rowsets.count(path_str.substr(0, end)); 
+            }
+            if (StringPiece(path_str).ends_with(".idx")) {
+                // extract rowset id. filename format: 
{rowset_id}_{segment_num}_{index_id}.idx
+                auto end = path_str.find('_');
+                if (UNLIKELY(end == std::string::npos)) {
+                    return false;
+                }
+                return !!cooldowned_rowsets.count(path_str.substr(0, end));
+            }
+            return false;
+        }), files.end());
+        // clang-format on
+        if (files.empty()) {
+            return;
+        }
+        files.shrink_to_fit();
+        num_files_in_buffer += files.size();
+        buffer.insert({t->tablet_id(), {std::move(dest_fs), 
std::move(files)}});
+        auto& info = req.confirm_list.emplace_back();
+        info.__set_tablet_id(t->tablet_id());
+        info.__set_cooldown_replica_id(t->replica_id());
+        info.__set_cooldown_meta_id(cooldown_meta_id.to_thrift());
+    };
+
+    auto confirm_and_remove_files = [&buffer, &req, &num_files_in_buffer]() {
+        TConfirmUnusedRemoteFilesResult result;
+        LOG(INFO) << "begin to confirm unused remote files. num_tablets=" << 
buffer.size()
+                  << " num_files=" << num_files_in_buffer;
+        auto st = 
MasterServerClient::instance()->confirm_unused_remote_files(req, &result);
+        if (!st.ok()) {
+            LOG(WARNING) << st;
+            return;
+        }
+        for (auto id : result.confirmed_tablets) {
+            if (auto it = buffer.find(id); LIKELY(it != buffer.end())) {
+                auto& fs = it->second.first;
+                auto& files = it->second.second;
+                // delete unused files
+                LOG(INFO) << "delete unused files. root_path=" << 
fs->root_path()
+                          << " tablet_id=" << id;
+                io::Path dir("data/" + std::to_string(id));
+                for (auto& file : files) {
+                    file = dir / file;
+                    LOG(INFO) << "delete unused file: " << file.native();
+                }
+                st = fs->batch_delete(files);
+                if (!st.ok()) {
+                    LOG(WARNING) << "failed to delete unused files, 
tablet_id=" << id << " : "
+                                 << st;
+                }
+            }
+        }
+    };
+
+    // batch confirm to reduce FE's overhead
+    auto next_confirm_time = std::chrono::steady_clock::now() +
+                             
std::chrono::seconds(config::confirm_unused_remote_files_interval_sec);
+    for (auto& t : tablets) {
+        if (t.use_count() <= 1 // this means tablet has been dropped
+            || t->_cooldown_replica_id != t->replica_id() || t->_state != 
TABLET_RUNNING) {
+            continue;
+        }
+        calc_unused_remote_files(t.get());
+        if (num_files_in_buffer > 0 && (num_files_in_buffer > 
max_files_in_buffer ||
+                                        std::chrono::steady_clock::now() > 
next_confirm_time)) {
+            confirm_and_remove_files();
+            buffer.clear();
+            req.confirm_list.clear();
+            num_files_in_buffer = 0;
+            next_confirm_time =
+                    std::chrono::steady_clock::now() +
+                    
std::chrono::seconds(config::confirm_unused_remote_files_interval_sec);
+        }
+    }
+    if (num_files_in_buffer > 0) {
+        confirm_and_remove_files();
+    }
+}
+
 TabletSchemaSPtr Tablet::tablet_schema() const {
     std::shared_lock wrlock(_meta_lock);
     return _max_version_schema;
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index c56808e8b5..dc0034f065 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -318,6 +318,8 @@ public:
 
     void record_unused_remote_rowset(const RowsetId& rowset_id, const 
std::string& resource,
                                      int64_t num_segments);
+
+    static void remove_unused_remote_files();
     
////////////////////////////////////////////////////////////////////////////
     // end cooldown functions
     
////////////////////////////////////////////////////////////////////////////
@@ -361,6 +363,16 @@ public:
 
     RowsetSharedPtr get_rowset(const RowsetId& rowset_id);
 
+    void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor) 
{
+        std::shared_lock rlock(_meta_lock);
+        for (auto& [v, rs] : _rs_version_map) {
+            visitor(rs);
+        }
+        for (auto& [v, rs] : _stale_rs_version_map) {
+            visitor(rs);
+        }
+    }
+
 private:
     Status _init_once_action();
     void _print_missed_versions(const std::vector<Version>& missed_versions) 
const;
@@ -404,7 +416,8 @@ private:
     Status _follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t 
cooldown_replica_id);
     Status _read_cooldown_meta(io::RemoteFileSystem* fs, int64_t 
cooldown_replica_id,
                                TabletMetaPB* tablet_meta_pb);
-    Status _write_cooldown_meta(io::RemoteFileSystem* fs, RowsetMeta* 
new_rs_meta);
+    Status _write_cooldown_meta(io::RemoteFileSystem* fs, UniqueId 
cooldown_meta_id,
+                                RowsetMeta* new_rs_meta);
     
////////////////////////////////////////////////////////////////////////////
     // end cooldown functions
     
////////////////////////////////////////////////////////////////////////////
@@ -484,6 +497,7 @@ private:
     // cooldown conf
     int64_t _cooldown_replica_id = -1;
     int64_t _cooldown_term = -1;
+    std::shared_mutex _remote_files_lock;
 
     DISALLOW_COPY_AND_ASSIGN(Tablet);
 
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 407fb78520..45a98bf5bc 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -31,6 +31,7 @@
 #include <cstdlib>
 #include <filesystem>
 
+#include "common/compiler_util.h"
 #include "env/env.h"
 #include "env/env_util.h"
 #include "gutil/strings/strcat.h"
@@ -90,6 +91,12 @@ TabletManager::~TabletManager() {
     DEREGISTER_HOOK_METRIC(tablet_meta_mem_consumption);
 }
 
+void TabletManager::add_tablet(const TabletSharedPtr& tablet) {
+    auto& tablet_map = _get_tablet_map(tablet->tablet_id());
+    std::lock_guard wlock(_get_tablets_shard_lock(tablet->tablet_id()));
+    tablet_map[tablet->tablet_id()] = tablet;
+}
+
 Status TabletManager::_add_tablet_unlocked(TTabletId tablet_id, const 
TabletSharedPtr& tablet,
                                            bool update_meta, bool force) {
     Status res = Status::OK();
@@ -580,23 +587,6 @@ TabletSharedPtr TabletManager::get_tablet(TTabletId 
tablet_id, TabletUid tablet_
     return nullptr;
 }
 
-std::vector<TabletSharedPtr> TabletManager::get_all_tablet() {
-    std::vector<TabletSharedPtr> res;
-    for (const auto& tablets_shard : _tablets_shards) {
-        std::shared_lock rdlock(tablets_shard.lock);
-        for (const auto& tablet_map : tablets_shard.tablet_map) {
-            // these are tablets which is not deleted
-            TabletSharedPtr tablet = tablet_map.second;
-            if (!tablet->is_used()) {
-                LOG(WARNING) << "tablet cannot be used. tablet=" << 
tablet->tablet_id();
-                continue;
-            }
-            res.emplace_back(tablet);
-        }
-    }
-    return res;
-}
-
 uint64_t TabletManager::get_rowset_nums() {
     uint64_t rowset_nums = 0;
     for (const auto& tablets_shard : _tablets_shards) {
@@ -898,37 +888,31 @@ Status 
TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet>
 
     DorisMetrics::instance()->report_all_tablets_requests_total->increment(1);
     HistogramStat tablet_version_num_hist;
+    auto tablets = get_all_tablet([](Tablet*) { return true; });
     auto local_cache = std::make_shared<std::vector<TTabletStat>>();
-    for (const auto& tablets_shard : _tablets_shards) {
-        std::shared_lock rdlock(tablets_shard.lock);
-        for (const auto& item : tablets_shard.tablet_map) {
-            uint64_t tablet_id = item.first;
-            TabletSharedPtr tablet_ptr = item.second;
-            TTablet t_tablet;
-            TTabletInfo tablet_info;
-            tablet_ptr->build_tablet_report_info(&tablet_info, true);
-            // find expired transaction corresponding to this tablet
-            TabletInfo tinfo(tablet_id, tablet_ptr->schema_hash(), 
tablet_ptr->tablet_uid());
-            auto find = expire_txn_map.find(tinfo);
-            if (find != expire_txn_map.end()) {
-                tablet_info.__set_transaction_ids(find->second);
-                expire_txn_map.erase(find);
-            }
-            t_tablet.tablet_infos.push_back(tablet_info);
-            tablet_version_num_hist.add(tablet_ptr->version_count());
-            tablets_info->emplace(tablet_id, t_tablet);
-            TTabletStat t_tablet_stat;
-            t_tablet_stat.__set_tablet_id(tablet_info.tablet_id);
-            t_tablet_stat.__set_data_size(tablet_info.data_size);
-            t_tablet_stat.__set_remote_data_size(tablet_info.remote_data_size);
-            t_tablet_stat.__set_row_num(tablet_info.row_count);
-            t_tablet_stat.__set_version_count(tablet_info.version_count);
-            local_cache->emplace_back(std::move(t_tablet_stat));
+    local_cache->reserve(tablets.size());
+    for (auto& tablet : tablets) {
+        auto& t_tablet = (*tablets_info)[tablet->tablet_id()];
+        TTabletInfo& tablet_info = t_tablet.tablet_infos.emplace_back();
+        tablet->build_tablet_report_info(&tablet_info, true);
+        // find expired transaction corresponding to this tablet
+        TabletInfo tinfo(tablet->tablet_id(), tablet->schema_hash(), 
tablet->tablet_uid());
+        auto find = expire_txn_map.find(tinfo);
+        if (find != expire_txn_map.end()) {
+            tablet_info.__set_transaction_ids(find->second);
+            expire_txn_map.erase(find);
         }
+        tablet_version_num_hist.add(tablet->version_count());
+        auto& t_tablet_stat = local_cache->emplace_back();
+        t_tablet_stat.__set_tablet_id(tablet_info.tablet_id);
+        t_tablet_stat.__set_data_size(tablet_info.data_size);
+        t_tablet_stat.__set_remote_data_size(tablet_info.remote_data_size);
+        t_tablet_stat.__set_row_num(tablet_info.row_count);
+        t_tablet_stat.__set_version_count(tablet_info.version_count);
     }
     {
         std::lock_guard<std::mutex> guard(_tablet_stat_cache_mutex);
-        _tablet_stat_list_cache = local_cache;
+        _tablet_stat_list_cache.swap(local_cache);
     }
     DorisMetrics::instance()->tablet_version_num_distribution->set_histogram(
             tablet_version_num_hist);
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 7ae4060142..b1f3300596 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -61,6 +61,8 @@ public:
     // task to be fail, even if there is enough space on other disks
     Status create_tablet(const TCreateTabletReq& request, 
std::vector<DataDir*> stores);
 
+    void add_tablet(const TabletSharedPtr& tablet);
+
     // Drop a tablet by description.
     // If `is_drop_table_or_partition` is true, we need to remove all remote 
rowsets in this tablet.
     Status drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool 
is_drop_table_or_partition);
@@ -78,7 +80,19 @@ public:
     TabletSharedPtr get_tablet(TTabletId tablet_id, TabletUid tablet_uid,
                                bool include_deleted = false, std::string* err 
= nullptr);
 
-    std::vector<TabletSharedPtr> get_all_tablet();
+    std::vector<TabletSharedPtr> get_all_tablet(std::function<bool(Tablet*)>&& 
filter =
+                                                        [](Tablet* t) { return 
t->is_used(); }) {
+        std::vector<TabletSharedPtr> res;
+        for (const auto& tablets_shard : _tablets_shards) {
+            std::shared_lock rdlock(tablets_shard.lock);
+            for (auto& [id, tablet] : tablets_shard.tablet_map) {
+                if (filter(tablet.get())) {
+                    res.emplace_back(tablet);
+                }
+            }
+        }
+        return res;
+    }
 
     uint64_t get_rowset_nums();
     uint64_t get_segment_nums();
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 7562efa5b2..1fc57239d1 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -538,6 +538,10 @@ void TabletMeta::init_from_pb(const TabletMetaPB& 
tablet_meta_pb) {
     }
 
     _storage_policy_id = tablet_meta_pb.storage_policy_id();
+    if (tablet_meta_pb.has_cooldown_meta_id()) {
+        _cooldown_meta_id = tablet_meta_pb.cooldown_meta_id();
+    }
+
     if (tablet_meta_pb.has_enable_unique_key_merge_on_write()) {
         _enable_unique_key_merge_on_write = 
tablet_meta_pb.enable_unique_key_merge_on_write();
     }
@@ -606,6 +610,10 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
     if (_storage_policy_id > 0) {
         tablet_meta_pb->set_storage_policy_id(_storage_policy_id);
     }
+    if (_cooldown_meta_id.initialized()) {
+        
tablet_meta_pb->mutable_cooldown_meta_id()->CopyFrom(_cooldown_meta_id.to_proto());
+    }
+
     
tablet_meta_pb->set_enable_unique_key_merge_on_write(_enable_unique_key_merge_on_write);
 
     if (_enable_unique_key_merge_on_write) {
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 452b5fbf45..8b147216a8 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -116,6 +116,7 @@ public:
 
     TabletTypePB tablet_type() const { return _tablet_type; }
     TabletUid tablet_uid() const;
+    void set_tablet_uid(TabletUid uid) { _tablet_uid = uid; }
     int64_t table_id() const;
     int64_t partition_id() const;
     int64_t tablet_id() const;
@@ -194,6 +195,9 @@ public:
         _storage_policy_id = id;
     }
 
+    UniqueId cooldown_meta_id() const { return _cooldown_meta_id; }
+    void set_cooldown_meta_id(UniqueId uid) { _cooldown_meta_id = uid; }
+
     static void init_column_from_tcolumn(uint32_t unique_id, const TColumn& 
tcolumn,
                                          ColumnPB* column);
 
@@ -238,6 +242,7 @@ private:
 
     // meta for cooldown
     int64_t _storage_policy_id = 0; // <= 0 means no storage policy
+    UniqueId _cooldown_meta_id;
 
     // For unique key data model, the feature Merge-on-Write will leverage a 
primary
     // key index and a delete-bitmap to mark duplicate keys as deleted in load 
stage,
diff --git a/be/src/olap/task/engine_clone_task.cpp 
b/be/src/olap/task/engine_clone_task.cpp
index 5df1c32bbc..d3f59653b7 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -17,6 +17,7 @@
 
 #include "olap/task/engine_clone_task.h"
 
+#include <memory>
 #include <set>
 #include <system_error>
 
@@ -30,6 +31,8 @@
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_factory.h"
 #include "olap/snapshot_manager.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_meta.h"
 #include "runtime/client_cache.h"
 #include "runtime/thread_context.h"
 #include "util/defer_op.h"
@@ -485,8 +488,8 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const 
std::string& clone_d
     // The tablet meta info is downloaded from source BE as .hdr file.
     // So we load it and generate cloned_tablet_meta.
     auto cloned_tablet_meta_file = fmt::format("{}/{}.hdr", clone_dir, 
tablet->tablet_id());
-    TabletMeta cloned_tablet_meta;
-    
RETURN_IF_ERROR(cloned_tablet_meta.create_from_file(cloned_tablet_meta_file));
+    auto cloned_tablet_meta = std::make_shared<TabletMeta>();
+    
RETURN_IF_ERROR(cloned_tablet_meta->create_from_file(cloned_tablet_meta_file));
 
     // remove the cloned meta file
     FileUtils::remove(cloned_tablet_meta_file);
@@ -528,7 +531,7 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const 
std::string& clone_d
     if (is_incremental_clone) {
         status = _finish_incremental_clone(tablet, cloned_tablet_meta, 
committed_version);
     } else {
-        status = _finish_full_clone(tablet, 
const_cast<TabletMeta*>(&cloned_tablet_meta));
+        status = _finish_full_clone(tablet, cloned_tablet_meta);
     }
 
     // if full clone success, need to update cumulative layer point
@@ -544,11 +547,11 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, 
const std::string& clone_d
 /// 1. Get missing version from local tablet again and check if they exist in 
cloned tablet.
 /// 2. Revise the local tablet meta to add all incremental cloned rowset's 
meta.
 Status EngineCloneTask::_finish_incremental_clone(Tablet* tablet,
-                                                  const TabletMeta& 
cloned_tablet_meta,
+                                                  const TabletMetaSharedPtr& 
cloned_tablet_meta,
                                                   int64_t committed_version) {
     LOG(INFO) << "begin to finish incremental clone. tablet=" << 
tablet->full_name()
               << ", committed_version=" << committed_version
-              << ", cloned_tablet_replica_id=" << 
cloned_tablet_meta.replica_id();
+              << ", cloned_tablet_replica_id=" << 
cloned_tablet_meta->replica_id();
 
     /// Get missing versions again from local tablet.
     /// We got it before outside the lock, so it has to be got again.
@@ -561,7 +564,7 @@ Status EngineCloneTask::_finish_incremental_clone(Tablet* 
tablet,
     // check missing versions exist in clone src
     std::vector<RowsetMetaSharedPtr> rowsets_to_clone;
     for (Version version : missed_versions) {
-        RowsetMetaSharedPtr rs_meta = 
cloned_tablet_meta.acquire_rs_meta_by_version(version);
+        RowsetMetaSharedPtr rs_meta = 
cloned_tablet_meta->acquire_rs_meta_by_version(version);
         if (rs_meta == nullptr) {
             return Status::InternalError("missed version {} is not found in 
cloned tablet meta",
                                          version.to_string());
@@ -576,10 +579,29 @@ Status EngineCloneTask::_finish_incremental_clone(Tablet* 
tablet,
     return tablet->revise_tablet_meta(rowsets_to_clone, versions_to_delete);
 }
 
+// replace `origin_tablet` with Tablet created by `cloned_tablet_meta`.
+static Status full_clone(Tablet* origin_tablet, const TabletMetaSharedPtr& 
cloned_tablet_meta) {
+    // keep origin `replica_id` and `tablet_uid`
+    cloned_tablet_meta->set_replica_id(origin_tablet->replica_id());
+    cloned_tablet_meta->set_tablet_uid(origin_tablet->tablet_uid());
+    auto tablet = Tablet::create_tablet_from_meta(cloned_tablet_meta, 
origin_tablet->data_dir());
+    RETURN_IF_ERROR(tablet->init());
+    tablet->save_meta();
+    StorageEngine::instance()->tablet_manager()->add_tablet(tablet);
+    // reclaim local rowsets in origin tablet
+    tablet->traverse_rowsets([](auto& rs) {
+        if (rs->is_local()) {
+            StorageEngine::instance()->add_unused_rowset(rs);
+        }
+    });
+    return Status::OK();
+}
+
 /// This method will do:
 /// 1. Compare the version of local tablet and cloned tablet to decide which 
version to keep
 /// 2. Revise the local tablet meta
-Status EngineCloneTask::_finish_full_clone(Tablet* tablet, TabletMeta* 
cloned_tablet_meta) {
+Status EngineCloneTask::_finish_full_clone(Tablet* tablet,
+                                           const TabletMetaSharedPtr& 
cloned_tablet_meta) {
     Version cloned_max_version = cloned_tablet_meta->max_version();
     LOG(INFO) << "begin to finish full clone. tablet=" << tablet->full_name()
               << ", cloned_max_version=" << cloned_max_version;
@@ -648,6 +670,12 @@ Status EngineCloneTask::_finish_full_clone(Tablet* tablet, 
TabletMeta* cloned_ta
     }
     std::vector<RowsetMetaSharedPtr> rowsets_to_clone;
     for (auto& rs_meta : cloned_tablet_meta->all_rs_metas()) {
+        if (!rs_meta->is_local()) {
+            // MUST clone all cooldowned rowset if there is any cooldowned 
rowset to clone to ensure
+            // `cooldown_meta_id` is consistent with the cooldowned rowsets 
after clone.
+            LOG(INFO) << "clone cooldowned rowsets, tablet_id=" << 
tablet->tablet_id();
+            RETURN_IF_ERROR(full_clone(tablet, cloned_tablet_meta));
+        }
         rowsets_to_clone.push_back(rs_meta);
         LOG(INFO) << "version to be cloned from clone tablet to local tablet: "
                   << tablet->full_name() << ", version=" << rs_meta->version();
diff --git a/be/src/olap/task/engine_clone_task.h 
b/be/src/olap/task/engine_clone_task.h
index bfa6d212a6..67a164b6c2 100644
--- a/be/src/olap/task/engine_clone_task.h
+++ b/be/src/olap/task/engine_clone_task.h
@@ -45,10 +45,10 @@ private:
     virtual Status _finish_clone(Tablet* tablet, const std::string& clone_dir,
                                  int64_t committed_version, bool 
is_incremental_clone);
 
-    Status _finish_incremental_clone(Tablet* tablet, const TabletMeta& 
cloned_tablet_meta,
+    Status _finish_incremental_clone(Tablet* tablet, const 
TabletMetaSharedPtr& cloned_tablet_meta,
                                      int64_t committed_version);
 
-    Status _finish_full_clone(Tablet* tablet, TabletMeta* cloned_tablet_meta);
+    Status _finish_full_clone(Tablet* tablet, const TabletMetaSharedPtr& 
cloned_tablet_meta);
 
     Status _make_and_download_snapshots(DataDir& data_dir, const std::string& 
local_data_path,
                                         TBackend* src_host, string* 
src_file_path,
diff --git a/be/src/util/pprof_utils.cpp b/be/src/util/pprof_utils.cpp
index 8fc58410c8..da90888297 100644
--- a/be/src/util/pprof_utils.cpp
+++ b/be/src/util/pprof_utils.cpp
@@ -24,6 +24,9 @@
 #include "util/file_utils.h"
 
 namespace doris {
+namespace config {
+extern std::string pprof_profile_dir;
+}
 
 Status PprofUtils::get_pprof_cmd(std::string* cmd) {
     AgentUtils util;
diff --git a/be/src/util/uid_util.h b/be/src/util/uid_util.h
index d43367d95f..7a310f3cf1 100644
--- a/be/src/util/uid_util.h
+++ b/be/src/util/uid_util.h
@@ -56,6 +56,7 @@ struct UniqueId {
     int64_t hi = 0;
     int64_t lo = 0;
 
+    UniqueId() = default;
     UniqueId(int64_t hi_, int64_t lo_) : hi(hi_), lo(lo_) {}
     UniqueId(const UniqueId& uid) : hi(uid.hi), lo(uid.lo) {}
     UniqueId(const TUniqueId& tuid) : hi(tuid.hi), lo(tuid.lo) {}
@@ -65,6 +66,8 @@ struct UniqueId {
         from_hex(&lo, lo_str);
     }
 
+    bool initialized() const { return hi != 0 || lo != 0; }
+
     // currently, the implementation is uuid, but it may change in the future
     static UniqueId gen_uid() {
         UniqueId uid(0, 0);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index b65eb91486..ce891a521d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -19,6 +19,7 @@ package org.apache.doris.catalog;
 
 import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
+import org.apache.doris.thrift.TUniqueId;
 
 import com.google.gson.annotations.SerializedName;
 import org.apache.logging.log4j.LogManager;
@@ -108,6 +109,8 @@ public class Replica implements Writable {
     // bad means this Replica is unrecoverable, and we will delete it
     private boolean bad = false;
 
+    private TUniqueId cooldownMetaId;
+
     /*
      * If set to true, with means this replica need to be repaired. explicitly.
      * This can happen when this replica is created by a balance clone task, 
and
@@ -234,6 +237,14 @@ public class Replica implements Writable {
         return true;
     }
 
+    public TUniqueId getCooldownMetaId() {
+        return cooldownMetaId;
+    }
+
+    public void setCooldownMetaId(TUniqueId cooldownMetaId) {
+        this.cooldownMetaId = cooldownMetaId;
+    }
+
     public boolean needFurtherRepair() {
         if (needFurtherRepair && System.currentTimeMillis() - 
this.furtherRepairSetTime < FURTHER_REPAIR_TIMEOUT_MS) {
             return true;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index 0ed99185ef..8de447050c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -193,6 +193,7 @@ public class TabletInvertedIndex {
                             if (Config.enable_storage_policy) {
                                 handleCooldownConf(tabletMeta, 
backendTabletInfo, cooldownConfToPush,
                                         cooldownConfToUpdate);
+                                
replica.setCooldownMetaId(backendTabletInfo.getCooldownMetaId());
                             }
 
                             long partitionId = tabletMeta.getPartitionId();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java 
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
index b0dcec8732..bb0e7debab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
@@ -29,7 +29,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 
 /**
- * This class represents the olap replica related metadata.
+ * This class is used to log update cooldown conf operation.
  */
 @Data
 public class CooldownConf implements Writable {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java 
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownDelete.java
similarity index 52%
copy from fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
copy to fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownDelete.java
index b0dcec8732..4ca8220ba8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownDelete.java
@@ -21,51 +21,15 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.persist.gson.GsonUtils;
 
-import com.google.gson.annotations.SerializedName;
-import lombok.Data;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
 /**
- * This class represents the olap replica related metadata.
+ * This class is used to log delete unused remote files operation.
  */
-@Data
-public class CooldownConf implements Writable {
-    @SerializedName(value = "dbId")
-    protected long dbId;
-    @SerializedName(value = "tableId")
-    protected long tableId;
-    @SerializedName(value = "partitionId")
-    protected long partitionId;
-    @SerializedName(value = "indexId")
-    protected long indexId;
-    @SerializedName(value = "tabletId")
-    protected long tabletId;
-    @SerializedName(value = "cooldownReplicaId")
-    protected long cooldownReplicaId = -1;
-    @SerializedName(value = "cooldownTerm")
-    protected long cooldownTerm = -1;
-
-    public CooldownConf() {
-    }
-
-    // for update
-    public CooldownConf(long dbId, long tableId, long partitionId, long 
indexId, long tabletId, long cooldownTerm) {
-        this.dbId = dbId;
-        this.tableId = tableId;
-        this.partitionId = partitionId;
-        this.indexId = indexId;
-        this.tabletId = tabletId;
-        this.cooldownTerm = cooldownTerm;
-    }
-
-    // for push
-    public CooldownConf(long tabletId, long cooldownReplicaId, long 
cooldownTerm) {
-        this.tabletId = tabletId;
-        this.cooldownReplicaId = cooldownReplicaId;
-        this.cooldownTerm = cooldownTerm;
+public class CooldownDelete implements Writable {
+    public CooldownDelete() {
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java 
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index a97cbf6a7e..9162cd837d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -37,6 +37,7 @@ import org.apache.doris.common.io.Text;
 import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.SmallFileMgr.SmallFile;
 import org.apache.doris.cooldown.CooldownConfList;
+import org.apache.doris.cooldown.CooldownDelete;
 import org.apache.doris.datasource.CatalogLog;
 import org.apache.doris.datasource.ExternalObjectLog;
 import org.apache.doris.datasource.InitCatalogLog;
@@ -586,6 +587,11 @@ public class JournalEntity implements Writable {
                 isRead = true;
                 break;
             }
+            case OperationType.OP_COOLDOWN_DELETE: {
+                data = CooldownDelete.read(in);
+                isRead = true;
+                break;
+            }
             case OperationType.OP_BATCH_ADD_ROLLUP: {
                 data = BatchAlterJobPersistInfo.read(in);
                 isRead = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 0d2bc4f5a0..cfcac88a62 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -43,6 +43,7 @@ import org.apache.doris.common.io.Writable;
 import org.apache.doris.common.util.SmallFileMgr.SmallFile;
 import org.apache.doris.cooldown.CooldownConfHandler;
 import org.apache.doris.cooldown.CooldownConfList;
+import org.apache.doris.cooldown.CooldownDelete;
 import org.apache.doris.datasource.CatalogLog;
 import org.apache.doris.datasource.ExternalObjectLog;
 import org.apache.doris.datasource.InitCatalogLog;
@@ -725,6 +726,9 @@ public class EditLog {
                     CooldownConfList cooldownConfList = (CooldownConfList) 
journal.getData();
                     
CooldownConfHandler.replayUpdateCooldownConf(cooldownConfList);
                     break;
+                case OperationType.OP_COOLDOWN_DELETE:
+                    // noop
+                    break;
                 case OperationType.OP_BATCH_ADD_ROLLUP: {
                     BatchAlterJobPersistInfo batchAlterJobV2 = 
(BatchAlterJobPersistInfo) journal.getData();
                     for (AlterJobV2 alterJobV2 : 
batchAlterJobV2.getAlterJobV2List()) {
@@ -1526,6 +1530,10 @@ public class EditLog {
         logEdit(OperationType.OP_UPDATE_COOLDOWN_CONF, cooldownConf);
     }
 
+    public void logCooldownDelete(CooldownDelete cooldownDelete) {
+        logEdit(OperationType.OP_COOLDOWN_DELETE, cooldownDelete);
+    }
+
     public void logBatchAlterJob(BatchAlterJobPersistInfo batchAlterJobV2) {
         logEdit(OperationType.OP_BATCH_ADD_ROLLUP, batchAlterJobV2);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java 
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 47954cdbdf..bc99c80359 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -267,8 +267,9 @@ public class OperationType {
     public static final short OP_REFRESH_EXTERNAL_PARTITIONS = 356;
 
     public static final short OP_ALTER_USER = 400;
-    // cooldown conf
+    // cooldown related
     public static final short OP_UPDATE_COOLDOWN_CONF = 401;
+    public static final short OP_COOLDOWN_DELETE = 402;
 
     /**
      * Get opcode name by op code.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 380a69a485..4c8f36b127 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -32,9 +32,12 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.HMSResource;
 import org.apache.doris.catalog.Index;
 import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Replica;
 import org.apache.doris.catalog.Table;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletMeta;
 import org.apache.doris.catalog.external.ExternalDatabase;
 import org.apache.doris.cluster.Cluster;
 import org.apache.doris.cluster.ClusterNamespace;
@@ -52,6 +55,7 @@ import org.apache.doris.common.ThriftServerEventProcessor;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.Version;
 import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.cooldown.CooldownDelete;
 import org.apache.doris.datasource.CatalogIf;
 import org.apache.doris.datasource.ExternalCatalog;
 import org.apache.doris.datasource.HMSExternalCatalog;
@@ -76,6 +80,8 @@ import org.apache.doris.thrift.TCell;
 import org.apache.doris.thrift.TColumn;
 import org.apache.doris.thrift.TColumnDef;
 import org.apache.doris.thrift.TColumnDesc;
+import org.apache.doris.thrift.TConfirmUnusedRemoteFilesRequest;
+import org.apache.doris.thrift.TConfirmUnusedRemoteFilesResult;
 import org.apache.doris.thrift.TDescribeTableParams;
 import org.apache.doris.thrift.TDescribeTableResult;
 import org.apache.doris.thrift.TExecPlanFragmentParams;
@@ -174,6 +180,72 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
         this.exeEnv = exeEnv;
     }
 
+    @Override
+    public TConfirmUnusedRemoteFilesResult 
confirmUnusedRemoteFiles(TConfirmUnusedRemoteFilesRequest request)
+            throws TException {
+        if (!Env.getCurrentEnv().isMaster()) {
+            throw new TException("FE is not master");
+        }
+        TConfirmUnusedRemoteFilesResult res = new 
TConfirmUnusedRemoteFilesResult();
+        if (!request.isSetConfirmList()) {
+            throw new TException("confirm_list in null");
+        }
+        request.getConfirmList().forEach(info -> {
+            if (!info.isSetCooldownMetaId()) {
+                LOG.warn("cooldown_meta_id is null");
+                return;
+            }
+            TabletMeta tabletMeta = 
Env.getCurrentEnv().getTabletInvertedIndex().getTabletMeta(info.tablet_id);
+            if (tabletMeta == null) {
+                LOG.warn("tablet {} not found", info.tablet_id);
+                return;
+            }
+            Tablet tablet;
+            try {
+                OlapTable table = (OlapTable) 
Env.getCurrentInternalCatalog().getDbNullable(tabletMeta.getDbId())
+                        .getTable(tabletMeta.getTableId())
+                        .get();
+                table.readLock();
+                try {
+                    tablet = 
table.getPartition(tabletMeta.getPartitionId()).getIndex(tabletMeta.getIndexId())
+                            .getTablet(info.tablet_id);
+                } finally {
+                    table.readUnlock();
+                }
+            } catch (RuntimeException e) {
+                LOG.warn("tablet {} not found", info.tablet_id);
+                return;
+            }
+            // check cooldownReplicaId
+            long cooldownReplicaId = tablet.getCooldownConf().first;
+            if (cooldownReplicaId != info.cooldown_replica_id) {
+                LOG.info("cooldown replica id not match({} vs {}), tablet={}", 
cooldownReplicaId,
+                        info.cooldown_replica_id, info.tablet_id);
+                return;
+            }
+            // check cooldownMetaId of all replicas are the same
+            List<Replica> replicas = 
Env.getCurrentEnv().getTabletInvertedIndex().getReplicas(info.tablet_id);
+            for (Replica replica : replicas) {
+                if 
(!info.cooldown_meta_id.equals(replica.getCooldownMetaId())) {
+                    LOG.info("cooldown meta id are not same, tablet={}", 
info.tablet_id);
+                    return;
+                }
+            }
+            res.addToConfirmedTablets(info.tablet_id);
+        });
+
+        if (res.isSetConfirmedTablets() && 
!res.getConfirmedTablets().isEmpty()) {
+            if (Env.getCurrentEnv().isMaster()) {
+                // ensure FE is real master
+                Env.getCurrentEnv().getEditLog().logCooldownDelete(new 
CooldownDelete());
+            } else {
+                throw new TException("FE is not master");
+            }
+        }
+
+        return res;
+    }
+
     @Override
     public TGetDbsResult getDbNames(TGetDbsParams params) throws TException {
         LOG.debug("get db request: {}", params);
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index df04a2a052..5aac8d98c4 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -296,6 +296,7 @@ message TabletMetaPB {
     // Use primary key index to speed up tabel unique key model
     optional bool enable_unique_key_merge_on_write = 24 [default = false];
     optional int64 storage_policy_id = 25;
+    optional PUniqueId cooldown_meta_id = 26;
 }
 
 message OLAPRawDeltaHeaderMessage {
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 26ce8fb988..08b2345488 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -739,6 +739,20 @@ struct TMySqlLoadAcquireTokenResult {
     2: optional string token
 }
 
+struct TTabletCooldownInfo {
+    1: optional Types.TTabletId tablet_id
+    2: optional Types.TReplicaId cooldown_replica_id
+    3: optional Types.TUniqueId cooldown_meta_id
+}
+
+struct TConfirmUnusedRemoteFilesRequest {
+    1: optional list<TTabletCooldownInfo> confirm_list
+}
+
+struct TConfirmUnusedRemoteFilesResult {
+    1: optional list<Types.TTabletId> confirmed_tablets
+}
+
 service FrontendService {
     TGetDbsResult getDbNames(1: TGetDbsParams params)
     TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -780,4 +794,6 @@ service FrontendService {
     TFetchSchemaTableDataResult fetchSchemaTableData(1: 
TFetchSchemaTableDataRequest request)
 
     TMySqlLoadAcquireTokenResult acquireToken()
+
+    TConfirmUnusedRemoteFilesResult confirmUnusedRemoteFiles(1: 
TConfirmUnusedRemoteFilesRequest request)
 }
diff --git a/gensrc/thrift/MasterService.thrift 
b/gensrc/thrift/MasterService.thrift
index fe98c7ef4e..3c1d4ced03 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -45,6 +45,7 @@ struct TTabletInfo {
     17: optional Types.TReplicaId cooldown_replica_id
     // 18: optional bool is_cooldown
     19: optional i64 cooldown_term
+    20: optional Types.TUniqueId cooldown_meta_id
 }
 
 struct TFinishTaskRequest {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to