This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5014ad03e7 [feature](cooldown) Auto delete unused remote files (#16588)
5014ad03e7 is described below
commit 5014ad03e70a9d2f456d7eddbf91fe8710784917
Author: plat1ko <[email protected]>
AuthorDate: Mon Feb 13 23:59:39 2023 +0800
[feature](cooldown) Auto delete unused remote files (#16588)
---
be/src/agent/agent_server.cpp | 3 +
be/src/agent/task_worker_pool.cpp | 8 +-
be/src/agent/task_worker_pool.h | 4 +-
be/src/agent/utils.cpp | 73 +++++++-
be/src/agent/utils.h | 18 +-
be/src/common/config.h | 2 +
be/src/io/fs/remote_file_system.h | 4 +
be/src/io/fs/s3_file_system.cpp | 69 +++++++-
be/src/io/fs/s3_file_system.h | 2 +
be/src/olap/data_dir.cpp | 20 ++-
be/src/olap/olap_server.cpp | 16 +-
be/src/olap/snapshot_manager.cpp | 8 +
be/src/olap/storage_engine.h | 2 +
be/src/olap/tablet.cpp | 191 ++++++++++++++++++++-
be/src/olap/tablet.h | 16 +-
be/src/olap/tablet_manager.cpp | 70 +++-----
be/src/olap/tablet_manager.h | 16 +-
be/src/olap/tablet_meta.cpp | 8 +
be/src/olap/tablet_meta.h | 5 +
be/src/olap/task/engine_clone_task.cpp | 42 ++++-
be/src/olap/task/engine_clone_task.h | 4 +-
be/src/util/pprof_utils.cpp | 3 +
be/src/util/uid_util.h | 3 +
.../java/org/apache/doris/catalog/Replica.java | 11 ++
.../apache/doris/catalog/TabletInvertedIndex.java | 1 +
.../org/apache/doris/cooldown/CooldownConf.java | 2 +-
.../{CooldownConf.java => CooldownDelete.java} | 42 +----
.../org/apache/doris/journal/JournalEntity.java | 6 +
.../java/org/apache/doris/persist/EditLog.java | 8 +
.../org/apache/doris/persist/OperationType.java | 3 +-
.../apache/doris/service/FrontendServiceImpl.java | 72 ++++++++
gensrc/proto/olap_file.proto | 1 +
gensrc/thrift/FrontendService.thrift | 16 ++
gensrc/thrift/MasterService.thrift | 1 +
34 files changed, 620 insertions(+), 130 deletions(-)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index bd1a63fd69..5fc46a912d 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -25,6 +25,7 @@
#include "agent/task_worker_pool.h"
#include "agent/topic_subscriber.h"
#include "agent/user_resource_listener.h"
+#include "agent/utils.h"
#include "common/logging.h"
#include "common/status.h"
#include "gutil/strings/substitute.h"
@@ -49,6 +50,8 @@ AgentServer::AgentServer(ExecEnv* exec_env, const
TMasterInfo& master_info)
}
}
+ MasterServerClient::create(master_info);
+
// It is the same code to create workers of each type, so we use a macro
// to make code to be more readable.
diff --git a/be/src/agent/task_worker_pool.cpp
b/be/src/agent/task_worker_pool.cpp
index 40d7453f6c..3ed931ddaf 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -29,6 +29,7 @@
#include <sstream>
#include <string>
+#include "agent/utils.h"
#include "common/logging.h"
#include "common/status.h"
#include "env/env.h"
@@ -71,13 +72,11 @@ const int64_t PUBLISH_TIMEOUT_SEC = 10;
std::atomic_ulong TaskWorkerPool::_s_report_version(time(nullptr) * 10000);
std::mutex TaskWorkerPool::_s_task_signatures_lock;
std::map<TTaskType::type, std::set<int64_t>>
TaskWorkerPool::_s_task_signatures;
-FrontendServiceClientCache TaskWorkerPool::_master_service_client_cache;
TaskWorkerPool::TaskWorkerPool(const TaskWorkerType task_worker_type, ExecEnv*
env,
const TMasterInfo& master_info, ThreadModel
thread_model)
: _master_info(master_info),
_agent_utils(new AgentUtils()),
- _master_client(new MasterServerClient(_master_info,
&_master_service_client_cache)),
_env(env),
_stop_background_threads_latch(1),
_is_work(false),
@@ -303,7 +302,8 @@ void TaskWorkerPool::_finish_task(const TFinishTaskRequest&
finish_task_request)
while (try_time < TASK_FINISH_MAX_RETRY) {
DorisMetrics::instance()->finish_task_requests_total->increment(1);
- Status client_status =
_master_client->finish_task(finish_task_request, &result);
+ Status client_status =
+
MasterServerClient::instance()->finish_task(finish_task_request, &result);
if (client_status.ok()) {
break;
@@ -1648,7 +1648,7 @@ Status TaskWorkerPool::_move_dir(const TTabletId
tablet_id, const std::string& s
void TaskWorkerPool::_handle_report(TReportRequest& request, ReportType type) {
TMasterResult result;
- Status status = _master_client->report(request, &result);
+ Status status = MasterServerClient::instance()->report(request, &result);
bool is_report_success = false;
if (!status.ok()) {
LOG_WARNING("failed to report {}", TYPE_STRING(type))
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 3b81117285..d583a02495 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -23,7 +23,6 @@
#include <utility>
#include <vector>
-#include "agent/utils.h"
#include "common/status.h"
#include "gen_cpp/AgentService_types.h"
#include "gen_cpp/HeartbeatService_types.h"
@@ -35,6 +34,7 @@ namespace doris {
class ExecEnv;
class ThreadPool;
+class AgentUtils;
class TaskWorkerPool {
public:
@@ -224,7 +224,6 @@ private:
const TMasterInfo& _master_info;
TBackend _backend;
std::unique_ptr<AgentUtils> _agent_utils;
- std::unique_ptr<MasterServerClient> _master_client;
ExecEnv* _env;
// Protect task queue
@@ -246,7 +245,6 @@ private:
uint32_t _worker_count;
TaskWorkerType _task_worker_type;
- static FrontendServiceClientCache _master_service_client_cache;
static std::atomic_ulong _s_report_version;
static std::mutex _s_task_signatures_lock;
diff --git a/be/src/agent/utils.cpp b/be/src/agent/utils.cpp
index 7c0a552c5f..4543a44e83 100644
--- a/be/src/agent/utils.cpp
+++ b/be/src/agent/utils.cpp
@@ -27,23 +27,34 @@
#include <sstream>
#include "common/status.h"
+#include "runtime/client_cache.h"
using std::map;
using std::string;
using std::stringstream;
-using std::vector;
using apache::thrift::TException;
using apache::thrift::transport::TTransportException;
namespace doris {
-MasterServerClient::MasterServerClient(const TMasterInfo& master_info,
- FrontendServiceClientCache*
client_cache)
- : _master_info(master_info), _client_cache(client_cache) {}
+static FrontendServiceClientCache s_client_cache;
+static std::unique_ptr<MasterServerClient> s_client;
+
+MasterServerClient* MasterServerClient::create(const TMasterInfo& master_info)
{
+ s_client.reset(new MasterServerClient(master_info));
+ return s_client.get();
+}
+
+MasterServerClient* MasterServerClient::instance() {
+ return s_client.get();
+}
+
+MasterServerClient::MasterServerClient(const TMasterInfo& master_info)
+ : _master_info(master_info) {}
Status MasterServerClient::finish_task(const TFinishTaskRequest& request,
TMasterResult* result) {
Status client_status;
- FrontendServiceConnection client(_client_cache,
_master_info.network_address,
+ FrontendServiceConnection client(&s_client_cache,
_master_info.network_address,
config::thrift_rpc_timeout_ms,
&client_status);
if (!client_status.ok()) {
@@ -82,7 +93,7 @@ Status MasterServerClient::finish_task(const
TFinishTaskRequest& request, TMaste
Status MasterServerClient::report(const TReportRequest& request,
TMasterResult* result) {
Status client_status;
- FrontendServiceConnection client(_client_cache,
_master_info.network_address,
+ FrontendServiceConnection client(&s_client_cache,
_master_info.network_address,
config::thrift_rpc_timeout_ms,
&client_status);
if (!client_status.ok()) {
@@ -131,6 +142,56 @@ Status MasterServerClient::report(const TReportRequest&
request, TMasterResult*
return Status::OK();
}
+Status MasterServerClient::confirm_unused_remote_files(
+ const TConfirmUnusedRemoteFilesRequest& request,
TConfirmUnusedRemoteFilesResult* result) {
+ Status client_status;
+ FrontendServiceConnection client(&s_client_cache,
_master_info.network_address,
+ config::thrift_rpc_timeout_ms,
&client_status);
+
+ if (!client_status.ok()) {
+ return Status::InternalError(
+ "fail to get master client from cache. host={}, port={},
code={}",
+ _master_info.network_address.hostname,
_master_info.network_address.port,
+ client_status.code());
+ }
+ try {
+ try {
+ client->confirmUnusedRemoteFiles(*result, request);
+ } catch (TTransportException& e) {
+ TTransportException::TTransportExceptionType type = e.getType();
+ if (type !=
TTransportException::TTransportExceptionType::TIMED_OUT) {
+ // if not TIMED_OUT, retry
+ LOG(WARNING) << "master client, retry finishTask: " <<
e.what();
+
+ client_status = client.reopen(config::thrift_rpc_timeout_ms);
+ if (!client_status.ok()) {
+ return Status::InternalError(
+ "fail to get master client from cache. host={},
port={}, code={}",
+ _master_info.network_address.hostname,
+ _master_info.network_address.port,
client_status.code());
+ }
+
+ client->confirmUnusedRemoteFiles(*result, request);
+ } else {
+ // TIMED_OUT exception. do not retry
+ // actually we don't care what FE returns.
+ return Status::InternalError(
+ "fail to confirm unused remote files. host={},
port={}, code={}, reason={}",
+ _master_info.network_address.hostname,
_master_info.network_address.port,
+ client_status.code(), e.what());
+ }
+ }
+ } catch (TException& e) {
+ client.reopen(config::thrift_rpc_timeout_ms);
+ return Status::InternalError(
+ "fail to confirm unused remote files. host={}, port={},
code={}, reason={}",
+ _master_info.network_address.hostname,
_master_info.network_address.port,
+ client_status.code(), e.what());
+ }
+
+ return Status::OK();
+}
+
bool AgentUtils::exec_cmd(const string& command, string* errmsg, bool
redirect_stderr) {
// The exit status of the command.
uint32_t rc = 0;
diff --git a/be/src/agent/utils.h b/be/src/agent/utils.h
index 851d045f13..cfce0c3413 100644
--- a/be/src/agent/utils.h
+++ b/be/src/agent/utils.h
@@ -22,14 +22,16 @@
#include <gen_cpp/MasterService_types.h>
#include "common/status.h"
-#include "runtime/client_cache.h"
+#include "gutil/macros.h"
namespace doris {
class MasterServerClient {
public:
- MasterServerClient(const TMasterInfo& master_info,
FrontendServiceClientCache* client_cache);
- virtual ~MasterServerClient() = default;
+ static MasterServerClient* create(const TMasterInfo& master_info);
+ static MasterServerClient* instance();
+
+ ~MasterServerClient() = default;
// Report finished task to the master server
//
@@ -38,7 +40,7 @@ public:
//
// Output parameters:
// * result: The result of report task
- virtual Status finish_task(const TFinishTaskRequest& request,
TMasterResult* result);
+ Status finish_task(const TFinishTaskRequest& request, TMasterResult*
result);
// Report tasks/olap tablet/disk state to the master server
//
@@ -47,14 +49,18 @@ public:
//
// Output parameters:
// * result: The result of report task
- virtual Status report(const TReportRequest& request, TMasterResult*
result);
+ Status report(const TReportRequest& request, TMasterResult* result);
+
+ Status confirm_unused_remote_files(const TConfirmUnusedRemoteFilesRequest&
request,
+ TConfirmUnusedRemoteFilesResult*
result);
private:
+ MasterServerClient(const TMasterInfo& master_info);
+
DISALLOW_COPY_AND_ASSIGN(MasterServerClient);
// Not owner. Reference to the ExecEnv::_master_info
const TMasterInfo& _master_info;
- FrontendServiceClientCache* _client_cache;
};
class AgentUtils {
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 100664fb97..176b3d665a 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -809,6 +809,8 @@ CONF_mInt32(bloom_filter_predicate_check_row_num, "204800");
// cooldown task configs
CONF_Int32(cooldown_thread_num, "5");
CONF_mInt64(generate_cooldown_task_interval_sec, "20");
+CONF_mInt32(remove_unused_remote_files_interval_sec, "21600"); // 6h
+CONF_mInt32(confirm_unused_remote_files_interval_sec, "60");
CONF_mInt64(generate_cache_cleaner_task_interval_sec, "43200"); // 12 h
CONF_Int32(concurrency_per_dir, "2");
CONF_mInt64(cooldown_lag_time_sec, "10800"); // 3h
diff --git a/be/src/io/fs/remote_file_system.h
b/be/src/io/fs/remote_file_system.h
index f4c525a181..ff1d2f6961 100644
--- a/be/src/io/fs/remote_file_system.h
+++ b/be/src/io/fs/remote_file_system.h
@@ -34,6 +34,10 @@ public:
virtual Status batch_upload(const std::vector<Path>& local_paths,
const std::vector<Path>& dest_paths) = 0;
+ virtual Status batch_delete(const std::vector<Path>& paths) {
+ return Status::NotSupported("batch_delete");
+ }
+
virtual Status connect() = 0;
Status open_file(const Path& path, const FileReaderOptions& reader_options,
diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp
index b05df0e36f..111f9f1723 100644
--- a/be/src/io/fs/s3_file_system.cpp
+++ b/be/src/io/fs/s3_file_system.cpp
@@ -26,6 +26,7 @@
#include <aws/s3/model/ListObjectsV2Request.h>
#include <aws/s3/model/PutObjectRequest.h>
#include <aws/transfer/TransferManager.h>
+#include <opentelemetry/common/threadlocal.h>
#include <filesystem>
#include <fstream>
@@ -62,6 +63,9 @@ S3FileSystem::S3FileSystem(S3Conf&& s3_conf, std::string&& id)
if (_s3_conf.prefix.size() > 0 && _s3_conf.prefix[0] == '/') {
_s3_conf.prefix = _s3_conf.prefix.substr(1);
}
+ if (!_s3_conf.prefix.empty() && _s3_conf.prefix.back() == '/') {
+ _s3_conf.prefix.pop_back();
+ }
_executor = Aws::MakeShared<Aws::Utils::Threading::PooledThreadExecutor>(
id.c_str(), config::s3_transfer_executor_pool_size);
}
@@ -372,7 +376,70 @@ Status S3FileSystem::file_size_impl(const Path& path,
size_t* file_size) const {
}
Status S3FileSystem::list(const Path& path, std::vector<Path>* files) {
- return Status::NotSupported("not support");
+ auto client = get_client();
+ CHECK_S3_CLIENT(client);
+
+ Aws::S3::Model::ListObjectsV2Request request;
+ auto prefix = get_key(path);
+ if (!prefix.empty() && prefix.back() != '/') {
+ prefix.push_back('/');
+ }
+ request.WithBucket(_s3_conf.bucket).WithPrefix(prefix);
+ bool is_trucated = false;
+ do {
+ auto outcome = client->ListObjectsV2(request);
+ if (!outcome.IsSuccess()) {
+ return Status::IOError("failed to list objects(endpoint={},
bucket={}, prefix={}): {}",
+ _s3_conf.endpoint, _s3_conf.bucket, prefix,
+ outcome.GetError().GetMessage());
+ }
+ for (const auto& obj : outcome.GetResult().GetContents()) {
+ files->push_back(obj.GetKey().substr(prefix.size()));
+ }
+ is_trucated = outcome.GetResult().GetIsTruncated();
+ } while (is_trucated);
+ return Status::OK();
+}
+
+Status S3FileSystem::batch_delete(const std::vector<Path>& paths) {
+ auto client = get_client();
+ CHECK_S3_CLIENT(client);
+
+ // `DeleteObjectsRequest` can only contain 1000 keys at most.
+ constexpr size_t max_delete_batch = 1000;
+ auto path_iter = paths.begin();
+
+ Aws::S3::Model::DeleteObjectsRequest delete_request;
+ delete_request.SetBucket(_s3_conf.bucket);
+ do {
+ Aws::S3::Model::Delete del;
+ Aws::Vector<Aws::S3::Model::ObjectIdentifier> objects;
+ auto path_begin = path_iter;
+ for (; path_iter != paths.end() && (path_iter - path_begin <
max_delete_batch);
+ ++path_iter) {
+ objects.emplace_back().SetKey(get_key(*path_iter));
+ }
+ if (objects.empty()) {
+ return Status::OK();
+ }
+ del.WithObjects(std::move(objects)).SetQuiet(true);
+ delete_request.SetDelete(std::move(del));
+ auto delete_outcome = client->DeleteObjects(delete_request);
+ if (UNLIKELY(!delete_outcome.IsSuccess())) {
+ return Status::IOError(
+ "failed to delete objects(endpoint={}, bucket={},
key[0]={}): {}",
+ _s3_conf.endpoint, _s3_conf.bucket,
objects.front().GetKey(),
+ delete_outcome.GetError().GetMessage());
+ }
+ if (UNLIKELY(!delete_outcome.GetResult().GetErrors().empty())) {
+ const auto& e = delete_outcome.GetResult().GetErrors().front();
+ return Status::IOError("failed to delete objects(endpoint={},
bucket={}, key={}): {}",
+ _s3_conf.endpoint, _s3_conf.bucket,
e.GetKey(),
+ delete_outcome.GetError().GetMessage());
+ }
+ } while (path_iter != paths.end());
+
+ return Status::OK();
}
std::string S3FileSystem::get_key(const Path& path) const {
diff --git a/be/src/io/fs/s3_file_system.h b/be/src/io/fs/s3_file_system.h
index f8b8bc0bf8..0576421e53 100644
--- a/be/src/io/fs/s3_file_system.h
+++ b/be/src/io/fs/s3_file_system.h
@@ -80,6 +80,8 @@ public:
Status batch_upload_impl(const std::vector<Path>& local_paths,
const std::vector<Path>& dest_paths);
+ Status batch_delete(const std::vector<Path>& paths) override;
+
Status connect() override;
Status connect_impl();
diff --git a/be/src/olap/data_dir.cpp b/be/src/olap/data_dir.cpp
index f1c5b65110..2ecece7d8a 100644
--- a/be/src/olap/data_dir.cpp
+++ b/be/src/olap/data_dir.cpp
@@ -848,18 +848,18 @@ void DataDir::perform_remote_rowset_gc() {
continue;
}
DCHECK(fs->type() != io::FileSystemType::LOCAL);
- bool success = true;
+ std::vector<io::Path> seg_paths;
+ seg_paths.reserve(gc_pb.num_segments());
for (int i = 0; i < gc_pb.num_segments(); ++i) {
- auto seg_path = BetaRowset::remote_segment_path(gc_pb.tablet_id(),
rowset_id, i);
- // TODO(plat1ko): batch delete
- auto st = fs->delete_file(seg_path);
- if (!st.ok()) {
- LOG(WARNING) << "failed to delete remote rowset. err=" << st;
- success = false;
- }
+
seg_paths.push_back(BetaRowset::remote_segment_path(gc_pb.tablet_id(),
rowset_id, i));
}
- if (success) {
+ LOG(INFO) << "delete remote rowset. root_path=" << fs->root_path()
+ << ", rowset_id=" << rowset_id;
+ auto st =
std::static_pointer_cast<io::RemoteFileSystem>(fs)->batch_delete(seg_paths);
+ if (st.ok()) {
deleted_keys.push_back(std::move(key));
+ } else {
+ LOG(WARNING) << "failed to delete remote rowset. err=" << st;
}
}
for (const auto& key : deleted_keys) {
@@ -892,6 +892,8 @@ void DataDir::perform_remote_tablet_gc() {
success = false;
continue;
}
+ LOG(INFO) << "delete remote rowsets of tablet. root_path=" <<
fs->root_path()
+ << ", tablet_id=" << tablet_id;
auto st = fs->delete_directory(DATA_PREFIX + '/' + tablet_id);
if (!st.ok()) {
LOG(WARNING) << "failed to delete all remote rowset in tablet.
err=" << st;
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 3038fadcc1..5f61adea46 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -156,6 +156,12 @@ Status StorageEngine::start_bg_threads() {
&_cooldown_tasks_producer_thread));
LOG(INFO) << "cooldown tasks producer thread started";
+ RETURN_IF_ERROR(Thread::create(
+ "StorageEngine", "remove_unused_remote_files_thread",
+ [this]() { this->_remove_unused_remote_files_callback(); },
+ &_remove_unused_remote_files_thread));
+ LOG(INFO) << "remove unused remote files thread started";
+
RETURN_IF_ERROR(Thread::create(
"StorageEngine", "cache_file_cleaner_tasks_producer_thread",
[this]() { this->_cache_file_cleaner_tasks_producer_callback(); },
@@ -734,13 +740,21 @@ void StorageEngine::_cooldown_tasks_producer_callback() {
task.priority = max_priority--;
bool submited = _cooldown_thread_pool->offer(std::move(task));
- if (submited) {
+ if (!submited) {
LOG(INFO) << "failed to submit cooldown task";
}
}
} while
(!_stop_background_threads_latch.wait_for(std::chrono::seconds(interval)));
}
+void StorageEngine::_remove_unused_remote_files_callback() {
+ while (!_stop_background_threads_latch.wait_for(
+
std::chrono::seconds(config::remove_unused_remote_files_interval_sec))) {
+ LOG(INFO) << "begin to remove unused remote files";
+ Tablet::remove_unused_remote_files();
+ }
+}
+
void StorageEngine::_cache_file_cleaner_tasks_producer_callback() {
int64_t interval = config::generate_cache_cleaner_task_interval_sec;
do {
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index d7166dd97d..2c348ccc14 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -411,6 +411,14 @@ Status SnapshotManager::_create_snapshot_files(const
TabletSharedPtr& ref_tablet
// find rowset in both rs_meta and stale_rs_meta
const RowsetSharedPtr rowset =
ref_tablet->get_rowset_by_version(version, true);
if (rowset != nullptr) {
+ if (!rowset->is_local()) {
+ // MUST make full snapshot to ensure
`cooldown_meta_id` is consistent with the cooldowned rowsets after clone.
+ LOG(INFO) << "missed version is a cooldowned
rowset, must make full "
+ "snapshot. missed_version="
+ << missed_version << " tablet_id=" <<
ref_tablet->tablet_id();
+ res = Status::Error<ErrorCode::INTERNAL_ERROR>();
+ break;
+ }
consistent_rowsets.push_back(rowset);
} else {
res = Status::InternalError(
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index c89d6723a1..768e95daf9 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -273,6 +273,7 @@ private:
void _adjust_compaction_thread_num();
void _cooldown_tasks_producer_callback();
+ void _remove_unused_remote_files_callback();
void _cache_file_cleaner_tasks_producer_callback();
@@ -394,6 +395,7 @@ private:
std::shared_ptr<CumulativeCompactionPolicy> _cumulative_compaction_policy;
scoped_refptr<Thread> _cooldown_tasks_producer_thread;
+ scoped_refptr<Thread> _remove_unused_remote_files_thread;
scoped_refptr<Thread> _cache_file_cleaner_tasks_producer_thread;
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 7ed8d14b43..8748c67c86 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -30,6 +30,7 @@
#include <sys/stat.h>
#include <algorithm>
+#include <atomic>
#include <cstdint>
#include <map>
#include <memory>
@@ -38,9 +39,11 @@
#include <shared_mutex>
#include <string>
+#include "agent/utils.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
+#include "gutil/strings/stringpiece.h"
#include "io/fs/path.h"
#include "io/fs/remote_file_system.h"
#include "olap/base_compaction.h"
@@ -69,6 +72,7 @@
#include "util/scoped_cleanup.h"
#include "util/time.h"
#include "util/trace.h"
+#include "util/uid_util.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/jsonb/serialize.h"
@@ -1397,6 +1401,9 @@ void Tablet::build_tablet_report_info(TTabletInfo*
tablet_info,
tablet_info->__set_cooldown_replica_id(_cooldown_replica_id);
tablet_info->__set_cooldown_term(_cooldown_term);
}
+ if (_tablet_meta->cooldown_meta_id().initialized()) {
+
tablet_info->__set_cooldown_meta_id(_tablet_meta->cooldown_meta_id().to_thrift());
+ }
}
// should use this method to get a copy of current tablet meta
@@ -1671,14 +1678,20 @@ Status Tablet::cooldown() {
Status Tablet::_cooldown_data(const std::shared_ptr<io::RemoteFileSystem>&
dest_fs) {
auto old_rowset = pick_cooldown_rowset();
if (!old_rowset) {
- LOG(WARNING) << "Cannot pick cooldown rowset in tablet " <<
tablet_id();
- return Status::OK();
+ return Status::InternalError("cannot pick cooldown rowset in tablet
{}", tablet_id());
}
RowsetId new_rowset_id = StorageEngine::instance()->next_rowset_id();
auto start = std::chrono::steady_clock::now();
- auto st = old_rowset->upload_to(dest_fs.get(), new_rowset_id);
+ Status st;
+ {
+ std::shared_lock slock(_remote_files_lock, std::try_to_lock);
+ if (!slock.owns_lock()) {
+ return Status::Status::Error<TRY_LOCK_FAILED>("try
remote_files_lock failed");
+ }
+ st = old_rowset->upload_to(dest_fs.get(), new_rowset_id);
+ }
if (!st.ok()) {
// reclaim the incomplete rowset data in remote storage
record_unused_remote_rowset(new_rowset_id, dest_fs->id(),
old_rowset->num_segments());
@@ -1697,9 +1710,10 @@ Status Tablet::_cooldown_data(const
std::shared_ptr<io::RemoteFileSystem>& dest_
new_rowset_meta->set_resource_id(dest_fs->id());
new_rowset_meta->set_fs(dest_fs);
new_rowset_meta->set_creation_time(time(nullptr));
+ UniqueId cooldown_meta_id = UniqueId::gen_uid();
// upload cooldowned rowset meta to remote fs
- RETURN_IF_ERROR(_write_cooldown_meta(dest_fs.get(),
new_rowset_meta.get()));
+ RETURN_IF_ERROR(_write_cooldown_meta(dest_fs.get(), cooldown_meta_id,
new_rowset_meta.get()));
RowsetSharedPtr new_rowset;
RowsetFactory::create_rowset(_schema, _tablet_path, new_rowset_meta,
&new_rowset);
@@ -1711,6 +1725,7 @@ Status Tablet::_cooldown_data(const
std::shared_ptr<io::RemoteFileSystem>& dest_
std::unique_lock meta_wlock(_meta_lock);
if (tablet_state() == TABLET_RUNNING) {
modify_rowsets(to_add, to_delete);
+ _tablet_meta->set_cooldown_meta_id(cooldown_meta_id);
save_meta();
}
}
@@ -1735,7 +1750,8 @@ Status Tablet::_read_cooldown_meta(io::RemoteFileSystem*
fs, int64_t cooldown_re
return Status::OK();
}
-Status Tablet::_write_cooldown_meta(io::RemoteFileSystem* fs, RowsetMeta*
new_rs_meta) {
+Status Tablet::_write_cooldown_meta(io::RemoteFileSystem* fs, UniqueId
cooldown_meta_id,
+ RowsetMeta* new_rs_meta) {
std::vector<RowsetMetaSharedPtr> cooldowned_rs_metas;
{
std::shared_lock meta_rlock(_meta_lock);
@@ -1757,6 +1773,8 @@ Status Tablet::_write_cooldown_meta(io::RemoteFileSystem*
fs, RowsetMeta* new_rs
rs_metas->Add(rs_meta->get_rowset_pb());
}
rs_metas->Add(new_rs_meta->get_rowset_pb());
+ tablet_meta_pb.mutable_cooldown_meta_id()->set_hi(cooldown_meta_id.hi);
+ tablet_meta_pb.mutable_cooldown_meta_id()->set_lo(cooldown_meta_id.lo);
std::string remote_meta_path =
BetaRowset::remote_tablet_meta_path(tablet_id(),
_tablet_meta->replica_id());
@@ -1774,6 +1792,11 @@ Status
Tablet::_follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t cooldow
TabletMetaPB cooldown_meta_pb;
RETURN_IF_ERROR(_read_cooldown_meta(fs, cooldown_replica_id,
&cooldown_meta_pb));
DCHECK(cooldown_meta_pb.rs_metas_size() > 0);
+ if (_tablet_meta->cooldown_meta_id() ==
cooldown_meta_pb.cooldown_meta_id()) {
+ // cooldowned rowsets are same, no need to follow
+ return Status::OK();
+ }
+
int64_t cooldowned_version =
cooldown_meta_pb.rs_metas().rbegin()->end_version();
std::vector<RowsetSharedPtr> overlap_rowsets;
@@ -1836,6 +1859,7 @@ Status
Tablet::_follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t cooldow
// TODO(plat1ko): process primary key
+ _tablet_meta->set_cooldown_meta_id(cooldown_meta_pb.cooldown_meta_id());
save_meta();
return Status::OK();
@@ -1950,6 +1974,163 @@ Status Tablet::remove_all_remote_rowsets() {
gc_pb.SerializeAsString());
}
+void Tablet::remove_unused_remote_files() {
+ auto tablets =
StorageEngine::instance()->tablet_manager()->get_all_tablet([](Tablet* t) {
+ return t->tablet_meta()->cooldown_meta_id().initialized() &&
t->is_used() &&
+ t->tablet_state() == TABLET_RUNNING;
+ });
+ TConfirmUnusedRemoteFilesRequest req;
+ req.__isset.confirm_list = true;
+ // tablet_id -> [fs, unused_remote_files]
+ using unused_remote_files_buffer_t = std::unordered_map<
+ int64_t, std::pair<std::shared_ptr<io::RemoteFileSystem>,
std::vector<io::Path>>>;
+ unused_remote_files_buffer_t buffer;
+ int64_t num_files_in_buffer = 0;
+ // assume a filename is 0.1KB, buffer size should not larger than 100MB
+ constexpr int64_t max_files_in_buffer = 1000000;
+
+ auto calc_unused_remote_files = [&req, &buffer,
&num_files_in_buffer](Tablet* t) {
+ auto storage_policy = get_storage_policy(t->storage_policy_id());
+ if (storage_policy == nullptr) {
+ LOG(WARNING) << "could not find storage_policy, storage_policy_id="
+ << t->storage_policy_id();
+ return;
+ }
+ auto resource = get_storage_resource(storage_policy->resource_id);
+ auto dest_fs =
std::static_pointer_cast<io::RemoteFileSystem>(resource.fs);
+ if (dest_fs == nullptr) {
+ LOG(WARNING) << "could not find resource, resouce_id=" <<
storage_policy->resource_id;
+ return;
+ }
+ DCHECK(atol(dest_fs->id().c_str()) == storage_policy->resource_id);
+ DCHECK(dest_fs->type() != io::FileSystemType::LOCAL);
+
+ Status st;
+ std::vector<io::Path> files;
+ {
+ std::unique_lock xlock(t->_remote_files_lock, std::try_to_lock);
+ if (!xlock.owns_lock()) {
+ LOG(WARNING) << "try remote_files_lock failed. tablet_id=" <<
t->tablet_id();
+ return;
+ }
+ // FIXME(plat1ko): What if user reset resource in storage policy
to another resource?
+ // Maybe we should also list files in previously uploaded
resources.
+ st = dest_fs->list(BetaRowset::remote_tablet_path(t->tablet_id()),
&files);
+ }
+ if (!st.ok()) {
+ LOG(WARNING) << "encounter error when remove unused remote files,
tablet_id="
+ << t->tablet_id() << " : " << st;
+ }
+ if (files.empty()) {
+ return;
+ }
+ // get all cooldowned rowsets
+ std::unordered_set<std::string> cooldowned_rowsets;
+ UniqueId cooldown_meta_id;
+ {
+ std::shared_lock rlock(t->_meta_lock);
+ for (auto& rs_meta : t->_tablet_meta->all_rs_metas()) {
+ if (!rs_meta->is_local()) {
+
cooldowned_rowsets.insert(rs_meta->rowset_id().to_string());
+ }
+ }
+ cooldown_meta_id = t->_tablet_meta->cooldown_meta_id();
+ }
+ // {replica_id}.meta
+ std::string remote_meta_path = std::to_string(t->replica_id()) +
".meta";
+ // filter out the paths that should be reserved
+ // clang-format off
+ files.erase(std::remove_if(files.begin(), files.end(), [&](io::Path&
path) {
+ const std::string& path_str = path.native();
+ if (StringPiece(path_str).ends_with(".meta")) {
+ return path_str == remote_meta_path;
+ }
+ if (StringPiece(path_str).ends_with(".dat")) {
+ // extract rowset id. filename format:
{rowset_id}_{segment_num}.dat
+ auto end = path_str.rfind('_');
+ if (UNLIKELY(end == std::string::npos)) {
+ return false;
+ }
+ return !!cooldowned_rowsets.count(path_str.substr(0, end));
+ }
+ if (StringPiece(path_str).ends_with(".idx")) {
+ // extract rowset id. filename format:
{rowset_id}_{segment_num}_{index_id}.idx
+ auto end = path_str.find('_');
+ if (UNLIKELY(end == std::string::npos)) {
+ return false;
+ }
+ return !!cooldowned_rowsets.count(path_str.substr(0, end));
+ }
+ return false;
+ }), files.end());
+ // clang-format on
+ if (files.empty()) {
+ return;
+ }
+ files.shrink_to_fit();
+ num_files_in_buffer += files.size();
+ buffer.insert({t->tablet_id(), {std::move(dest_fs),
std::move(files)}});
+ auto& info = req.confirm_list.emplace_back();
+ info.__set_tablet_id(t->tablet_id());
+ info.__set_cooldown_replica_id(t->replica_id());
+ info.__set_cooldown_meta_id(cooldown_meta_id.to_thrift());
+ };
+
+ auto confirm_and_remove_files = [&buffer, &req, &num_files_in_buffer]() {
+ TConfirmUnusedRemoteFilesResult result;
+ LOG(INFO) << "begin to confirm unused remote files. num_tablets=" <<
buffer.size()
+ << " num_files=" << num_files_in_buffer;
+ auto st =
MasterServerClient::instance()->confirm_unused_remote_files(req, &result);
+ if (!st.ok()) {
+ LOG(WARNING) << st;
+ return;
+ }
+ for (auto id : result.confirmed_tablets) {
+ if (auto it = buffer.find(id); LIKELY(it != buffer.end())) {
+ auto& fs = it->second.first;
+ auto& files = it->second.second;
+ // delete unused files
+ LOG(INFO) << "delete unused files. root_path=" <<
fs->root_path()
+ << " tablet_id=" << id;
+ io::Path dir("data/" + std::to_string(id));
+ for (auto& file : files) {
+ file = dir / file;
+ LOG(INFO) << "delete unused file: " << file.native();
+ }
+ st = fs->batch_delete(files);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to delete unused files,
tablet_id=" << id << " : "
+ << st;
+ }
+ }
+ }
+ };
+
+ // batch confirm to reduce FE's overhead
+ auto next_confirm_time = std::chrono::steady_clock::now() +
+
std::chrono::seconds(config::confirm_unused_remote_files_interval_sec);
+ for (auto& t : tablets) {
+ if (t.use_count() <= 1 // this means tablet has been dropped
+ || t->_cooldown_replica_id != t->replica_id() || t->_state !=
TABLET_RUNNING) {
+ continue;
+ }
+ calc_unused_remote_files(t.get());
+ if (num_files_in_buffer > 0 && (num_files_in_buffer >
max_files_in_buffer ||
+ std::chrono::steady_clock::now() >
next_confirm_time)) {
+ confirm_and_remove_files();
+ buffer.clear();
+ req.confirm_list.clear();
+ num_files_in_buffer = 0;
+ next_confirm_time =
+ std::chrono::steady_clock::now() +
+
std::chrono::seconds(config::confirm_unused_remote_files_interval_sec);
+ }
+ }
+ if (num_files_in_buffer > 0) {
+ confirm_and_remove_files();
+ }
+}
+
TabletSchemaSPtr Tablet::tablet_schema() const {
std::shared_lock wrlock(_meta_lock);
return _max_version_schema;
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index c56808e8b5..dc0034f065 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -318,6 +318,8 @@ public:
void record_unused_remote_rowset(const RowsetId& rowset_id, const
std::string& resource,
int64_t num_segments);
+
+ static void remove_unused_remote_files();
////////////////////////////////////////////////////////////////////////////
// end cooldown functions
////////////////////////////////////////////////////////////////////////////
@@ -361,6 +363,16 @@ public:
RowsetSharedPtr get_rowset(const RowsetId& rowset_id);
+ void traverse_rowsets(std::function<void(const RowsetSharedPtr&)> visitor)
{
+ std::shared_lock rlock(_meta_lock);
+ for (auto& [v, rs] : _rs_version_map) {
+ visitor(rs);
+ }
+ for (auto& [v, rs] : _stale_rs_version_map) {
+ visitor(rs);
+ }
+ }
+
private:
Status _init_once_action();
void _print_missed_versions(const std::vector<Version>& missed_versions)
const;
@@ -404,7 +416,8 @@ private:
Status _follow_cooldowned_data(io::RemoteFileSystem* fs, int64_t
cooldown_replica_id);
Status _read_cooldown_meta(io::RemoteFileSystem* fs, int64_t
cooldown_replica_id,
TabletMetaPB* tablet_meta_pb);
- Status _write_cooldown_meta(io::RemoteFileSystem* fs, RowsetMeta*
new_rs_meta);
+ Status _write_cooldown_meta(io::RemoteFileSystem* fs, UniqueId
cooldown_meta_id,
+ RowsetMeta* new_rs_meta);
////////////////////////////////////////////////////////////////////////////
// end cooldown functions
////////////////////////////////////////////////////////////////////////////
@@ -484,6 +497,7 @@ private:
// cooldown conf
int64_t _cooldown_replica_id = -1;
int64_t _cooldown_term = -1;
+ std::shared_mutex _remote_files_lock;
DISALLOW_COPY_AND_ASSIGN(Tablet);
diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp
index 407fb78520..45a98bf5bc 100644
--- a/be/src/olap/tablet_manager.cpp
+++ b/be/src/olap/tablet_manager.cpp
@@ -31,6 +31,7 @@
#include <cstdlib>
#include <filesystem>
+#include "common/compiler_util.h"
#include "env/env.h"
#include "env/env_util.h"
#include "gutil/strings/strcat.h"
@@ -90,6 +91,12 @@ TabletManager::~TabletManager() {
DEREGISTER_HOOK_METRIC(tablet_meta_mem_consumption);
}
+void TabletManager::add_tablet(const TabletSharedPtr& tablet) {
+ auto& tablet_map = _get_tablet_map(tablet->tablet_id());
+ std::lock_guard wlock(_get_tablets_shard_lock(tablet->tablet_id()));
+ tablet_map[tablet->tablet_id()] = tablet;
+}
+
Status TabletManager::_add_tablet_unlocked(TTabletId tablet_id, const
TabletSharedPtr& tablet,
bool update_meta, bool force) {
Status res = Status::OK();
@@ -580,23 +587,6 @@ TabletSharedPtr TabletManager::get_tablet(TTabletId
tablet_id, TabletUid tablet_
return nullptr;
}
-std::vector<TabletSharedPtr> TabletManager::get_all_tablet() {
- std::vector<TabletSharedPtr> res;
- for (const auto& tablets_shard : _tablets_shards) {
- std::shared_lock rdlock(tablets_shard.lock);
- for (const auto& tablet_map : tablets_shard.tablet_map) {
- // these are tablets which is not deleted
- TabletSharedPtr tablet = tablet_map.second;
- if (!tablet->is_used()) {
- LOG(WARNING) << "tablet cannot be used. tablet=" <<
tablet->tablet_id();
- continue;
- }
- res.emplace_back(tablet);
- }
- }
- return res;
-}
-
uint64_t TabletManager::get_rowset_nums() {
uint64_t rowset_nums = 0;
for (const auto& tablets_shard : _tablets_shards) {
@@ -898,37 +888,31 @@ Status
TabletManager::build_all_report_tablets_info(std::map<TTabletId, TTablet>
DorisMetrics::instance()->report_all_tablets_requests_total->increment(1);
HistogramStat tablet_version_num_hist;
+ auto tablets = get_all_tablet([](Tablet*) { return true; });
auto local_cache = std::make_shared<std::vector<TTabletStat>>();
- for (const auto& tablets_shard : _tablets_shards) {
- std::shared_lock rdlock(tablets_shard.lock);
- for (const auto& item : tablets_shard.tablet_map) {
- uint64_t tablet_id = item.first;
- TabletSharedPtr tablet_ptr = item.second;
- TTablet t_tablet;
- TTabletInfo tablet_info;
- tablet_ptr->build_tablet_report_info(&tablet_info, true);
- // find expired transaction corresponding to this tablet
- TabletInfo tinfo(tablet_id, tablet_ptr->schema_hash(),
tablet_ptr->tablet_uid());
- auto find = expire_txn_map.find(tinfo);
- if (find != expire_txn_map.end()) {
- tablet_info.__set_transaction_ids(find->second);
- expire_txn_map.erase(find);
- }
- t_tablet.tablet_infos.push_back(tablet_info);
- tablet_version_num_hist.add(tablet_ptr->version_count());
- tablets_info->emplace(tablet_id, t_tablet);
- TTabletStat t_tablet_stat;
- t_tablet_stat.__set_tablet_id(tablet_info.tablet_id);
- t_tablet_stat.__set_data_size(tablet_info.data_size);
- t_tablet_stat.__set_remote_data_size(tablet_info.remote_data_size);
- t_tablet_stat.__set_row_num(tablet_info.row_count);
- t_tablet_stat.__set_version_count(tablet_info.version_count);
- local_cache->emplace_back(std::move(t_tablet_stat));
+ local_cache->reserve(tablets.size());
+ for (auto& tablet : tablets) {
+ auto& t_tablet = (*tablets_info)[tablet->tablet_id()];
+ TTabletInfo& tablet_info = t_tablet.tablet_infos.emplace_back();
+ tablet->build_tablet_report_info(&tablet_info, true);
+ // find expired transaction corresponding to this tablet
+ TabletInfo tinfo(tablet->tablet_id(), tablet->schema_hash(),
tablet->tablet_uid());
+ auto find = expire_txn_map.find(tinfo);
+ if (find != expire_txn_map.end()) {
+ tablet_info.__set_transaction_ids(find->second);
+ expire_txn_map.erase(find);
}
+ tablet_version_num_hist.add(tablet->version_count());
+ auto& t_tablet_stat = local_cache->emplace_back();
+ t_tablet_stat.__set_tablet_id(tablet_info.tablet_id);
+ t_tablet_stat.__set_data_size(tablet_info.data_size);
+ t_tablet_stat.__set_remote_data_size(tablet_info.remote_data_size);
+ t_tablet_stat.__set_row_num(tablet_info.row_count);
+ t_tablet_stat.__set_version_count(tablet_info.version_count);
}
{
std::lock_guard<std::mutex> guard(_tablet_stat_cache_mutex);
- _tablet_stat_list_cache = local_cache;
+ _tablet_stat_list_cache.swap(local_cache);
}
DorisMetrics::instance()->tablet_version_num_distribution->set_histogram(
tablet_version_num_hist);
diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h
index 7ae4060142..b1f3300596 100644
--- a/be/src/olap/tablet_manager.h
+++ b/be/src/olap/tablet_manager.h
@@ -61,6 +61,8 @@ public:
// task to be fail, even if there is enough space on other disks
Status create_tablet(const TCreateTabletReq& request,
std::vector<DataDir*> stores);
+ void add_tablet(const TabletSharedPtr& tablet);
+
// Drop a tablet by description.
// If `is_drop_table_or_partition` is true, we need to remove all remote
rowsets in this tablet.
Status drop_tablet(TTabletId tablet_id, TReplicaId replica_id, bool
is_drop_table_or_partition);
@@ -78,7 +80,19 @@ public:
TabletSharedPtr get_tablet(TTabletId tablet_id, TabletUid tablet_uid,
bool include_deleted = false, std::string* err
= nullptr);
- std::vector<TabletSharedPtr> get_all_tablet();
+ std::vector<TabletSharedPtr> get_all_tablet(std::function<bool(Tablet*)>&&
filter =
+ [](Tablet* t) { return
t->is_used(); }) {
+ std::vector<TabletSharedPtr> res;
+ for (const auto& tablets_shard : _tablets_shards) {
+ std::shared_lock rdlock(tablets_shard.lock);
+ for (auto& [id, tablet] : tablets_shard.tablet_map) {
+ if (filter(tablet.get())) {
+ res.emplace_back(tablet);
+ }
+ }
+ }
+ return res;
+ }
uint64_t get_rowset_nums();
uint64_t get_segment_nums();
diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp
index 7562efa5b2..1fc57239d1 100644
--- a/be/src/olap/tablet_meta.cpp
+++ b/be/src/olap/tablet_meta.cpp
@@ -538,6 +538,10 @@ void TabletMeta::init_from_pb(const TabletMetaPB&
tablet_meta_pb) {
}
_storage_policy_id = tablet_meta_pb.storage_policy_id();
+ if (tablet_meta_pb.has_cooldown_meta_id()) {
+ _cooldown_meta_id = tablet_meta_pb.cooldown_meta_id();
+ }
+
if (tablet_meta_pb.has_enable_unique_key_merge_on_write()) {
_enable_unique_key_merge_on_write =
tablet_meta_pb.enable_unique_key_merge_on_write();
}
@@ -606,6 +610,10 @@ void TabletMeta::to_meta_pb(TabletMetaPB* tablet_meta_pb) {
if (_storage_policy_id > 0) {
tablet_meta_pb->set_storage_policy_id(_storage_policy_id);
}
+ if (_cooldown_meta_id.initialized()) {
+
tablet_meta_pb->mutable_cooldown_meta_id()->CopyFrom(_cooldown_meta_id.to_proto());
+ }
+
tablet_meta_pb->set_enable_unique_key_merge_on_write(_enable_unique_key_merge_on_write);
if (_enable_unique_key_merge_on_write) {
diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h
index 452b5fbf45..8b147216a8 100644
--- a/be/src/olap/tablet_meta.h
+++ b/be/src/olap/tablet_meta.h
@@ -116,6 +116,7 @@ public:
TabletTypePB tablet_type() const { return _tablet_type; }
TabletUid tablet_uid() const;
+ void set_tablet_uid(TabletUid uid) { _tablet_uid = uid; }
int64_t table_id() const;
int64_t partition_id() const;
int64_t tablet_id() const;
@@ -194,6 +195,9 @@ public:
_storage_policy_id = id;
}
+ UniqueId cooldown_meta_id() const { return _cooldown_meta_id; }
+ void set_cooldown_meta_id(UniqueId uid) { _cooldown_meta_id = uid; }
+
static void init_column_from_tcolumn(uint32_t unique_id, const TColumn&
tcolumn,
ColumnPB* column);
@@ -238,6 +242,7 @@ private:
// meta for cooldown
int64_t _storage_policy_id = 0; // <= 0 means no storage policy
+ UniqueId _cooldown_meta_id;
// For unique key data model, the feature Merge-on-Write will leverage a
primary
// key index and a delete-bitmap to mark duplicate keys as deleted in load
stage,
diff --git a/be/src/olap/task/engine_clone_task.cpp
b/be/src/olap/task/engine_clone_task.cpp
index 5df1c32bbc..d3f59653b7 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -17,6 +17,7 @@
#include "olap/task/engine_clone_task.h"
+#include <memory>
#include <set>
#include <system_error>
@@ -30,6 +31,8 @@
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/snapshot_manager.h"
+#include "olap/storage_engine.h"
+#include "olap/tablet_meta.h"
#include "runtime/client_cache.h"
#include "runtime/thread_context.h"
#include "util/defer_op.h"
@@ -485,8 +488,8 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const
std::string& clone_d
// The tablet meta info is downloaded from source BE as .hdr file.
// So we load it and generate cloned_tablet_meta.
auto cloned_tablet_meta_file = fmt::format("{}/{}.hdr", clone_dir,
tablet->tablet_id());
- TabletMeta cloned_tablet_meta;
-
RETURN_IF_ERROR(cloned_tablet_meta.create_from_file(cloned_tablet_meta_file));
+ auto cloned_tablet_meta = std::make_shared<TabletMeta>();
+
RETURN_IF_ERROR(cloned_tablet_meta->create_from_file(cloned_tablet_meta_file));
// remove the cloned meta file
FileUtils::remove(cloned_tablet_meta_file);
@@ -528,7 +531,7 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const
std::string& clone_d
if (is_incremental_clone) {
status = _finish_incremental_clone(tablet, cloned_tablet_meta,
committed_version);
} else {
- status = _finish_full_clone(tablet,
const_cast<TabletMeta*>(&cloned_tablet_meta));
+ status = _finish_full_clone(tablet, cloned_tablet_meta);
}
// if full clone success, need to update cumulative layer point
@@ -544,11 +547,11 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet,
const std::string& clone_d
/// 1. Get missing version from local tablet again and check if they exist in
cloned tablet.
/// 2. Revise the local tablet meta to add all incremental cloned rowset's
meta.
Status EngineCloneTask::_finish_incremental_clone(Tablet* tablet,
- const TabletMeta&
cloned_tablet_meta,
+ const TabletMetaSharedPtr&
cloned_tablet_meta,
int64_t committed_version) {
LOG(INFO) << "begin to finish incremental clone. tablet=" <<
tablet->full_name()
<< ", committed_version=" << committed_version
- << ", cloned_tablet_replica_id=" <<
cloned_tablet_meta.replica_id();
+ << ", cloned_tablet_replica_id=" <<
cloned_tablet_meta->replica_id();
/// Get missing versions again from local tablet.
/// We got it before outside the lock, so it has to be got again.
@@ -561,7 +564,7 @@ Status EngineCloneTask::_finish_incremental_clone(Tablet*
tablet,
// check missing versions exist in clone src
std::vector<RowsetMetaSharedPtr> rowsets_to_clone;
for (Version version : missed_versions) {
- RowsetMetaSharedPtr rs_meta =
cloned_tablet_meta.acquire_rs_meta_by_version(version);
+ RowsetMetaSharedPtr rs_meta =
cloned_tablet_meta->acquire_rs_meta_by_version(version);
if (rs_meta == nullptr) {
return Status::InternalError("missed version {} is not found in
cloned tablet meta",
version.to_string());
@@ -576,10 +579,29 @@ Status EngineCloneTask::_finish_incremental_clone(Tablet*
tablet,
return tablet->revise_tablet_meta(rowsets_to_clone, versions_to_delete);
}
+// replace `origin_tablet` with Tablet created by `cloned_tablet_meta`.
+static Status full_clone(Tablet* origin_tablet, const TabletMetaSharedPtr&
cloned_tablet_meta) {
+ // keep origin `replica_id` and `tablet_uid`
+ cloned_tablet_meta->set_replica_id(origin_tablet->replica_id());
+ cloned_tablet_meta->set_tablet_uid(origin_tablet->tablet_uid());
+ auto tablet = Tablet::create_tablet_from_meta(cloned_tablet_meta,
origin_tablet->data_dir());
+ RETURN_IF_ERROR(tablet->init());
+ tablet->save_meta();
+ StorageEngine::instance()->tablet_manager()->add_tablet(tablet);
+ // reclaim local rowsets in origin tablet
+ tablet->traverse_rowsets([](auto& rs) {
+ if (rs->is_local()) {
+ StorageEngine::instance()->add_unused_rowset(rs);
+ }
+ });
+ return Status::OK();
+}
+
/// This method will do:
/// 1. Compare the version of local tablet and cloned tablet to decide which
version to keep
/// 2. Revise the local tablet meta
-Status EngineCloneTask::_finish_full_clone(Tablet* tablet, TabletMeta*
cloned_tablet_meta) {
+Status EngineCloneTask::_finish_full_clone(Tablet* tablet,
+ const TabletMetaSharedPtr&
cloned_tablet_meta) {
Version cloned_max_version = cloned_tablet_meta->max_version();
LOG(INFO) << "begin to finish full clone. tablet=" << tablet->full_name()
<< ", cloned_max_version=" << cloned_max_version;
@@ -648,6 +670,12 @@ Status EngineCloneTask::_finish_full_clone(Tablet* tablet,
TabletMeta* cloned_ta
}
std::vector<RowsetMetaSharedPtr> rowsets_to_clone;
for (auto& rs_meta : cloned_tablet_meta->all_rs_metas()) {
+ if (!rs_meta->is_local()) {
+ // MUST clone all cooldowned rowset if there is any cooldowned
rowset to clone to ensure
+ // `cooldown_meta_id` is consistent with the cooldowned rowsets
after clone.
+ LOG(INFO) << "clone cooldowned rowsets, tablet_id=" <<
tablet->tablet_id();
+ RETURN_IF_ERROR(full_clone(tablet, cloned_tablet_meta));
+ }
rowsets_to_clone.push_back(rs_meta);
LOG(INFO) << "version to be cloned from clone tablet to local tablet: "
<< tablet->full_name() << ", version=" << rs_meta->version();
diff --git a/be/src/olap/task/engine_clone_task.h
b/be/src/olap/task/engine_clone_task.h
index bfa6d212a6..67a164b6c2 100644
--- a/be/src/olap/task/engine_clone_task.h
+++ b/be/src/olap/task/engine_clone_task.h
@@ -45,10 +45,10 @@ private:
virtual Status _finish_clone(Tablet* tablet, const std::string& clone_dir,
int64_t committed_version, bool
is_incremental_clone);
- Status _finish_incremental_clone(Tablet* tablet, const TabletMeta&
cloned_tablet_meta,
+ Status _finish_incremental_clone(Tablet* tablet, const
TabletMetaSharedPtr& cloned_tablet_meta,
int64_t committed_version);
- Status _finish_full_clone(Tablet* tablet, TabletMeta* cloned_tablet_meta);
+ Status _finish_full_clone(Tablet* tablet, const TabletMetaSharedPtr&
cloned_tablet_meta);
Status _make_and_download_snapshots(DataDir& data_dir, const std::string&
local_data_path,
TBackend* src_host, string*
src_file_path,
diff --git a/be/src/util/pprof_utils.cpp b/be/src/util/pprof_utils.cpp
index 8fc58410c8..da90888297 100644
--- a/be/src/util/pprof_utils.cpp
+++ b/be/src/util/pprof_utils.cpp
@@ -24,6 +24,9 @@
#include "util/file_utils.h"
namespace doris {
+namespace config {
+extern std::string pprof_profile_dir;
+}
Status PprofUtils::get_pprof_cmd(std::string* cmd) {
AgentUtils util;
diff --git a/be/src/util/uid_util.h b/be/src/util/uid_util.h
index d43367d95f..7a310f3cf1 100644
--- a/be/src/util/uid_util.h
+++ b/be/src/util/uid_util.h
@@ -56,6 +56,7 @@ struct UniqueId {
int64_t hi = 0;
int64_t lo = 0;
+ UniqueId() = default;
UniqueId(int64_t hi_, int64_t lo_) : hi(hi_), lo(lo_) {}
UniqueId(const UniqueId& uid) : hi(uid.hi), lo(uid.lo) {}
UniqueId(const TUniqueId& tuid) : hi(tuid.hi), lo(tuid.lo) {}
@@ -65,6 +66,8 @@ struct UniqueId {
from_hex(&lo, lo_str);
}
+ bool initialized() const { return hi != 0 || lo != 0; }
+
// currently, the implementation is uuid, but it may change in the future
static UniqueId gen_uid() {
UniqueId uid(0, 0);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
index b65eb91486..ce891a521d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Replica.java
@@ -19,6 +19,7 @@ package org.apache.doris.catalog;
import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
+import org.apache.doris.thrift.TUniqueId;
import com.google.gson.annotations.SerializedName;
import org.apache.logging.log4j.LogManager;
@@ -108,6 +109,8 @@ public class Replica implements Writable {
// bad means this Replica is unrecoverable, and we will delete it
private boolean bad = false;
+ private TUniqueId cooldownMetaId;
+
/*
* If set to true, with means this replica need to be repaired. explicitly.
* This can happen when this replica is created by a balance clone task,
and
@@ -234,6 +237,14 @@ public class Replica implements Writable {
return true;
}
+ public TUniqueId getCooldownMetaId() {
+ return cooldownMetaId;
+ }
+
+ public void setCooldownMetaId(TUniqueId cooldownMetaId) {
+ this.cooldownMetaId = cooldownMetaId;
+ }
+
public boolean needFurtherRepair() {
if (needFurtherRepair && System.currentTimeMillis() -
this.furtherRepairSetTime < FURTHER_REPAIR_TIMEOUT_MS) {
return true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
index 0ed99185ef..8de447050c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TabletInvertedIndex.java
@@ -193,6 +193,7 @@ public class TabletInvertedIndex {
if (Config.enable_storage_policy) {
handleCooldownConf(tabletMeta,
backendTabletInfo, cooldownConfToPush,
cooldownConfToUpdate);
+
replica.setCooldownMetaId(backendTabletInfo.getCooldownMetaId());
}
long partitionId = tabletMeta.getPartitionId();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
index b0dcec8732..bb0e7debab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
@@ -29,7 +29,7 @@ import java.io.DataOutput;
import java.io.IOException;
/**
- * This class represents the olap replica related metadata.
+ * This class is used to log update cooldown conf operation.
*/
@Data
public class CooldownConf implements Writable {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownDelete.java
similarity index 52%
copy from fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
copy to fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownDelete.java
index b0dcec8732..4ca8220ba8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownConf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/cooldown/CooldownDelete.java
@@ -21,51 +21,15 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.persist.gson.GsonUtils;
-import com.google.gson.annotations.SerializedName;
-import lombok.Data;
-
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
- * This class represents the olap replica related metadata.
+ * This class is used to log delete unused remote files operation.
*/
-@Data
-public class CooldownConf implements Writable {
- @SerializedName(value = "dbId")
- protected long dbId;
- @SerializedName(value = "tableId")
- protected long tableId;
- @SerializedName(value = "partitionId")
- protected long partitionId;
- @SerializedName(value = "indexId")
- protected long indexId;
- @SerializedName(value = "tabletId")
- protected long tabletId;
- @SerializedName(value = "cooldownReplicaId")
- protected long cooldownReplicaId = -1;
- @SerializedName(value = "cooldownTerm")
- protected long cooldownTerm = -1;
-
- public CooldownConf() {
- }
-
- // for update
- public CooldownConf(long dbId, long tableId, long partitionId, long
indexId, long tabletId, long cooldownTerm) {
- this.dbId = dbId;
- this.tableId = tableId;
- this.partitionId = partitionId;
- this.indexId = indexId;
- this.tabletId = tabletId;
- this.cooldownTerm = cooldownTerm;
- }
-
- // for push
- public CooldownConf(long tabletId, long cooldownReplicaId, long
cooldownTerm) {
- this.tabletId = tabletId;
- this.cooldownReplicaId = cooldownReplicaId;
- this.cooldownTerm = cooldownTerm;
+public class CooldownDelete implements Writable {
+ public CooldownDelete() {
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
index a97cbf6a7e..9162cd837d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/journal/JournalEntity.java
@@ -37,6 +37,7 @@ import org.apache.doris.common.io.Text;
import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.SmallFileMgr.SmallFile;
import org.apache.doris.cooldown.CooldownConfList;
+import org.apache.doris.cooldown.CooldownDelete;
import org.apache.doris.datasource.CatalogLog;
import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.InitCatalogLog;
@@ -586,6 +587,11 @@ public class JournalEntity implements Writable {
isRead = true;
break;
}
+ case OperationType.OP_COOLDOWN_DELETE: {
+ data = CooldownDelete.read(in);
+ isRead = true;
+ break;
+ }
case OperationType.OP_BATCH_ADD_ROLLUP: {
data = BatchAlterJobPersistInfo.read(in);
isRead = true;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
index 0d2bc4f5a0..cfcac88a62 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/EditLog.java
@@ -43,6 +43,7 @@ import org.apache.doris.common.io.Writable;
import org.apache.doris.common.util.SmallFileMgr.SmallFile;
import org.apache.doris.cooldown.CooldownConfHandler;
import org.apache.doris.cooldown.CooldownConfList;
+import org.apache.doris.cooldown.CooldownDelete;
import org.apache.doris.datasource.CatalogLog;
import org.apache.doris.datasource.ExternalObjectLog;
import org.apache.doris.datasource.InitCatalogLog;
@@ -725,6 +726,9 @@ public class EditLog {
CooldownConfList cooldownConfList = (CooldownConfList)
journal.getData();
CooldownConfHandler.replayUpdateCooldownConf(cooldownConfList);
break;
+ case OperationType.OP_COOLDOWN_DELETE:
+ // noop
+ break;
case OperationType.OP_BATCH_ADD_ROLLUP: {
BatchAlterJobPersistInfo batchAlterJobV2 =
(BatchAlterJobPersistInfo) journal.getData();
for (AlterJobV2 alterJobV2 :
batchAlterJobV2.getAlterJobV2List()) {
@@ -1526,6 +1530,10 @@ public class EditLog {
logEdit(OperationType.OP_UPDATE_COOLDOWN_CONF, cooldownConf);
}
+ public void logCooldownDelete(CooldownDelete cooldownDelete) {
+ logEdit(OperationType.OP_COOLDOWN_DELETE, cooldownDelete);
+ }
+
public void logBatchAlterJob(BatchAlterJobPersistInfo batchAlterJobV2) {
logEdit(OperationType.OP_BATCH_ADD_ROLLUP, batchAlterJobV2);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
index 47954cdbdf..bc99c80359 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/persist/OperationType.java
@@ -267,8 +267,9 @@ public class OperationType {
public static final short OP_REFRESH_EXTERNAL_PARTITIONS = 356;
public static final short OP_ALTER_USER = 400;
- // cooldown conf
+ // cooldown related
public static final short OP_UPDATE_COOLDOWN_CONF = 401;
+ public static final short OP_COOLDOWN_DELETE = 402;
/**
* Get opcode name by op code.
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index 380a69a485..4c8f36b127 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -32,9 +32,12 @@ import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.HMSResource;
import org.apache.doris.catalog.Index;
import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.Replica;
import org.apache.doris.catalog.Table;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.TableIf.TableType;
+import org.apache.doris.catalog.Tablet;
+import org.apache.doris.catalog.TabletMeta;
import org.apache.doris.catalog.external.ExternalDatabase;
import org.apache.doris.cluster.Cluster;
import org.apache.doris.cluster.ClusterNamespace;
@@ -52,6 +55,7 @@ import org.apache.doris.common.ThriftServerEventProcessor;
import org.apache.doris.common.UserException;
import org.apache.doris.common.Version;
import org.apache.doris.common.util.TimeUtils;
+import org.apache.doris.cooldown.CooldownDelete;
import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalCatalog;
import org.apache.doris.datasource.HMSExternalCatalog;
@@ -76,6 +80,8 @@ import org.apache.doris.thrift.TCell;
import org.apache.doris.thrift.TColumn;
import org.apache.doris.thrift.TColumnDef;
import org.apache.doris.thrift.TColumnDesc;
+import org.apache.doris.thrift.TConfirmUnusedRemoteFilesRequest;
+import org.apache.doris.thrift.TConfirmUnusedRemoteFilesResult;
import org.apache.doris.thrift.TDescribeTableParams;
import org.apache.doris.thrift.TDescribeTableResult;
import org.apache.doris.thrift.TExecPlanFragmentParams;
@@ -174,6 +180,72 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
this.exeEnv = exeEnv;
}
+ @Override
+ public TConfirmUnusedRemoteFilesResult
confirmUnusedRemoteFiles(TConfirmUnusedRemoteFilesRequest request)
+ throws TException {
+ if (!Env.getCurrentEnv().isMaster()) {
+ throw new TException("FE is not master");
+ }
+ TConfirmUnusedRemoteFilesResult res = new
TConfirmUnusedRemoteFilesResult();
+ if (!request.isSetConfirmList()) {
+ throw new TException("confirm_list in null");
+ }
+ request.getConfirmList().forEach(info -> {
+ if (!info.isSetCooldownMetaId()) {
+ LOG.warn("cooldown_meta_id is null");
+ return;
+ }
+ TabletMeta tabletMeta =
Env.getCurrentEnv().getTabletInvertedIndex().getTabletMeta(info.tablet_id);
+ if (tabletMeta == null) {
+ LOG.warn("tablet {} not found", info.tablet_id);
+ return;
+ }
+ Tablet tablet;
+ try {
+ OlapTable table = (OlapTable)
Env.getCurrentInternalCatalog().getDbNullable(tabletMeta.getDbId())
+ .getTable(tabletMeta.getTableId())
+ .get();
+ table.readLock();
+ try {
+ tablet =
table.getPartition(tabletMeta.getPartitionId()).getIndex(tabletMeta.getIndexId())
+ .getTablet(info.tablet_id);
+ } finally {
+ table.readUnlock();
+ }
+ } catch (RuntimeException e) {
+ LOG.warn("tablet {} not found", info.tablet_id);
+ return;
+ }
+ // check cooldownReplicaId
+ long cooldownReplicaId = tablet.getCooldownConf().first;
+ if (cooldownReplicaId != info.cooldown_replica_id) {
+ LOG.info("cooldown replica id not match({} vs {}), tablet={}",
cooldownReplicaId,
+ info.cooldown_replica_id, info.tablet_id);
+ return;
+ }
+ // check cooldownMetaId of all replicas are the same
+ List<Replica> replicas =
Env.getCurrentEnv().getTabletInvertedIndex().getReplicas(info.tablet_id);
+ for (Replica replica : replicas) {
+ if
(!info.cooldown_meta_id.equals(replica.getCooldownMetaId())) {
+ LOG.info("cooldown meta id are not same, tablet={}",
info.tablet_id);
+ return;
+ }
+ }
+ res.addToConfirmedTablets(info.tablet_id);
+ });
+
+ if (res.isSetConfirmedTablets() &&
!res.getConfirmedTablets().isEmpty()) {
+ if (Env.getCurrentEnv().isMaster()) {
+ // ensure FE is real master
+ Env.getCurrentEnv().getEditLog().logCooldownDelete(new
CooldownDelete());
+ } else {
+ throw new TException("FE is not master");
+ }
+ }
+
+ return res;
+ }
+
@Override
public TGetDbsResult getDbNames(TGetDbsParams params) throws TException {
LOG.debug("get db request: {}", params);
diff --git a/gensrc/proto/olap_file.proto b/gensrc/proto/olap_file.proto
index df04a2a052..5aac8d98c4 100644
--- a/gensrc/proto/olap_file.proto
+++ b/gensrc/proto/olap_file.proto
@@ -296,6 +296,7 @@ message TabletMetaPB {
// Use primary key index to speed up tabel unique key model
optional bool enable_unique_key_merge_on_write = 24 [default = false];
optional int64 storage_policy_id = 25;
+ optional PUniqueId cooldown_meta_id = 26;
}
message OLAPRawDeltaHeaderMessage {
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 26ce8fb988..08b2345488 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -739,6 +739,20 @@ struct TMySqlLoadAcquireTokenResult {
2: optional string token
}
+struct TTabletCooldownInfo {
+ 1: optional Types.TTabletId tablet_id
+ 2: optional Types.TReplicaId cooldown_replica_id
+ 3: optional Types.TUniqueId cooldown_meta_id
+}
+
+struct TConfirmUnusedRemoteFilesRequest {
+ 1: optional list<TTabletCooldownInfo> confirm_list
+}
+
+struct TConfirmUnusedRemoteFilesResult {
+ 1: optional list<Types.TTabletId> confirmed_tablets
+}
+
service FrontendService {
TGetDbsResult getDbNames(1: TGetDbsParams params)
TGetTablesResult getTableNames(1: TGetTablesParams params)
@@ -780,4 +794,6 @@ service FrontendService {
TFetchSchemaTableDataResult fetchSchemaTableData(1:
TFetchSchemaTableDataRequest request)
TMySqlLoadAcquireTokenResult acquireToken()
+
+ TConfirmUnusedRemoteFilesResult confirmUnusedRemoteFiles(1:
TConfirmUnusedRemoteFilesRequest request)
}
diff --git a/gensrc/thrift/MasterService.thrift
b/gensrc/thrift/MasterService.thrift
index fe98c7ef4e..3c1d4ced03 100644
--- a/gensrc/thrift/MasterService.thrift
+++ b/gensrc/thrift/MasterService.thrift
@@ -45,6 +45,7 @@ struct TTabletInfo {
17: optional Types.TReplicaId cooldown_replica_id
// 18: optional bool is_cooldown
19: optional i64 cooldown_term
+ 20: optional Types.TUniqueId cooldown_meta_id
}
struct TFinishTaskRequest {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]