github-actions[bot] commented on code in PR #16752:
URL: https://github.com/apache/doris/pull/16752#discussion_r1105512142
##########
be/src/olap/tablet_manager.cpp:
##########
@@ -1372,4 +1372,161 @@ std::set<int64_t>
TabletManager::check_all_tablet_segment(bool repair) {
return bad_tablets;
}
+void TabletManager::remove_unused_remote_files() {
+ auto tablets =
StorageEngine::instance()->tablet_manager()->get_all_tablet([](Tablet* t) {
+ return t->tablet_meta()->cooldown_meta_id().initialized() &&
t->is_used() &&
+ t->tablet_state() == TABLET_RUNNING;
+ });
+ TConfirmUnusedRemoteFilesRequest req;
+ req.__isset.confirm_list = true;
+ // tablet_id -> [fs, unused_remote_files]
+ using unused_remote_files_buffer_t = std::unordered_map<
+ int64_t, std::pair<std::shared_ptr<io::RemoteFileSystem>,
std::vector<io::Path>>>;
+ unused_remote_files_buffer_t buffer;
+ int64_t num_files_in_buffer = 0;
+ // assume a filename is 0.1KB, buffer size should not larger than 100MB
+ constexpr int64_t max_files_in_buffer = 1000000;
+
+ auto calc_unused_remote_files = [&req, &buffer,
&num_files_in_buffer](Tablet* t) {
+ auto storage_policy = get_storage_policy(t->storage_policy_id());
+ if (storage_policy == nullptr) {
+ LOG(WARNING) << "could not find storage_policy, storage_policy_id="
+ << t->storage_policy_id();
+ return;
+ }
+ auto resource = get_storage_resource(storage_policy->resource_id);
+ auto dest_fs =
std::static_pointer_cast<io::RemoteFileSystem>(resource.fs);
+ if (dest_fs == nullptr) {
+ LOG(WARNING) << "could not find resource, resouce_id=" <<
storage_policy->resource_id;
+ return;
+ }
+ DCHECK(atol(dest_fs->id().c_str()) == storage_policy->resource_id);
+ DCHECK(dest_fs->type() != io::FileSystemType::LOCAL);
+
+ Status st;
+ std::vector<io::Path> files;
+ {
+ std::unique_lock xlock(t->_remote_files_lock, std::try_to_lock);
Review Comment:
warning: '_remote_files_lock' is a private member of 'doris::Tablet'
[clang-diagnostic-error]
```cpp
std::unique_lock xlock(t->_remote_files_lock, std::try_to_lock);
^
```
**be/src/olap/tablet.h:508:** declared private here
```cpp
std::shared_mutex _remote_files_lock;
^
```
##########
be/src/olap/tablet_manager.cpp:
##########
@@ -1372,4 +1372,161 @@
return bad_tablets;
}
+void TabletManager::remove_unused_remote_files() {
+ auto tablets =
StorageEngine::instance()->tablet_manager()->get_all_tablet([](Tablet* t) {
+ return t->tablet_meta()->cooldown_meta_id().initialized() &&
t->is_used() &&
+ t->tablet_state() == TABLET_RUNNING;
+ });
+ TConfirmUnusedRemoteFilesRequest req;
+ req.__isset.confirm_list = true;
+ // tablet_id -> [fs, unused_remote_files]
+ using unused_remote_files_buffer_t = std::unordered_map<
+ int64_t, std::pair<std::shared_ptr<io::RemoteFileSystem>,
std::vector<io::Path>>>;
+ unused_remote_files_buffer_t buffer;
+ int64_t num_files_in_buffer = 0;
+ // assume a filename is 0.1KB, buffer size should not larger than 100MB
+ constexpr int64_t max_files_in_buffer = 1000000;
+
+ auto calc_unused_remote_files = [&req, &buffer,
&num_files_in_buffer](Tablet* t) {
+ auto storage_policy = get_storage_policy(t->storage_policy_id());
+ if (storage_policy == nullptr) {
+ LOG(WARNING) << "could not find storage_policy, storage_policy_id="
+ << t->storage_policy_id();
+ return;
+ }
+ auto resource = get_storage_resource(storage_policy->resource_id);
+ auto dest_fs =
std::static_pointer_cast<io::RemoteFileSystem>(resource.fs);
+ if (dest_fs == nullptr) {
+ LOG(WARNING) << "could not find resource, resouce_id=" <<
storage_policy->resource_id;
+ return;
+ }
+ DCHECK(atol(dest_fs->id().c_str()) == storage_policy->resource_id);
+ DCHECK(dest_fs->type() != io::FileSystemType::LOCAL);
+
+ Status st;
+ std::vector<io::Path> files;
+ {
+ std::unique_lock xlock(t->_remote_files_lock, std::try_to_lock);
+ if (!xlock.owns_lock()) {
+ LOG(WARNING) << "try remote_files_lock failed. tablet_id=" <<
t->tablet_id();
+ return;
+ }
+ // FIXME(plat1ko): What if user reset resource in storage policy
to another resource?
+ // Maybe we should also list files in previously uploaded
resources.
+ st = dest_fs->list(BetaRowset::remote_tablet_path(t->tablet_id()),
&files);
+ }
+ if (!st.ok()) {
+ LOG(WARNING) << "encounter error when remove unused remote files,
tablet_id="
+ << t->tablet_id() << " : " << st;
+ }
+ if (files.empty()) {
+ return;
+ }
+ // get all cooldowned rowsets
+ std::unordered_set<std::string> cooldowned_rowsets;
+ UniqueId cooldown_meta_id;
+ {
+ std::shared_lock rlock(t->_meta_lock);
Review Comment:
warning: '_meta_lock' is a private member of 'doris::Tablet'
[clang-diagnostic-error]
```cpp
std::shared_lock rlock(t->_meta_lock);
^
```
**be/src/olap/tablet.h:451:** declared private here
```cpp
mutable std::shared_mutex _meta_lock;
^
```
##########
be/src/olap/tablet_manager.cpp:
##########
@@ -1372,4 +1372,161 @@
return bad_tablets;
}
+void TabletManager::remove_unused_remote_files() {
+ auto tablets =
StorageEngine::instance()->tablet_manager()->get_all_tablet([](Tablet* t) {
+ return t->tablet_meta()->cooldown_meta_id().initialized() &&
t->is_used() &&
+ t->tablet_state() == TABLET_RUNNING;
+ });
+ TConfirmUnusedRemoteFilesRequest req;
+ req.__isset.confirm_list = true;
+ // tablet_id -> [fs, unused_remote_files]
+ using unused_remote_files_buffer_t = std::unordered_map<
+ int64_t, std::pair<std::shared_ptr<io::RemoteFileSystem>,
std::vector<io::Path>>>;
+ unused_remote_files_buffer_t buffer;
+ int64_t num_files_in_buffer = 0;
+ // assume a filename is 0.1KB, buffer size should not larger than 100MB
+ constexpr int64_t max_files_in_buffer = 1000000;
+
+ auto calc_unused_remote_files = [&req, &buffer,
&num_files_in_buffer](Tablet* t) {
+ auto storage_policy = get_storage_policy(t->storage_policy_id());
+ if (storage_policy == nullptr) {
+ LOG(WARNING) << "could not find storage_policy, storage_policy_id="
+ << t->storage_policy_id();
+ return;
+ }
+ auto resource = get_storage_resource(storage_policy->resource_id);
+ auto dest_fs =
std::static_pointer_cast<io::RemoteFileSystem>(resource.fs);
+ if (dest_fs == nullptr) {
+ LOG(WARNING) << "could not find resource, resouce_id=" <<
storage_policy->resource_id;
+ return;
+ }
+ DCHECK(atol(dest_fs->id().c_str()) == storage_policy->resource_id);
+ DCHECK(dest_fs->type() != io::FileSystemType::LOCAL);
+
+ Status st;
+ std::vector<io::Path> files;
+ {
+ std::unique_lock xlock(t->_remote_files_lock, std::try_to_lock);
+ if (!xlock.owns_lock()) {
+ LOG(WARNING) << "try remote_files_lock failed. tablet_id=" <<
t->tablet_id();
+ return;
+ }
+ // FIXME(plat1ko): What if user reset resource in storage policy
to another resource?
+ // Maybe we should also list files in previously uploaded
resources.
+ st = dest_fs->list(BetaRowset::remote_tablet_path(t->tablet_id()),
&files);
+ }
+ if (!st.ok()) {
+ LOG(WARNING) << "encounter error when remove unused remote files,
tablet_id="
+ << t->tablet_id() << " : " << st;
+ }
+ if (files.empty()) {
+ return;
+ }
+ // get all cooldowned rowsets
+ std::unordered_set<std::string> cooldowned_rowsets;
+ UniqueId cooldown_meta_id;
+ {
+ std::shared_lock rlock(t->_meta_lock);
+ for (auto& rs_meta : t->_tablet_meta->all_rs_metas()) {
Review Comment:
warning: '_tablet_meta' is a protected member of 'doris::BaseTablet'
[clang-diagnostic-error]
```cpp
for (auto& rs_meta : t->_tablet_meta->all_rs_metas()) {
^
```
**be/src/olap/base_tablet.h:75:** declared protected here
```cpp
TabletMetaSharedPtr _tablet_meta;
^
```
##########
be/src/olap/tablet_manager.cpp:
##########
@@ -1372,4 +1372,161 @@
return bad_tablets;
}
+void TabletManager::remove_unused_remote_files() {
+ auto tablets =
StorageEngine::instance()->tablet_manager()->get_all_tablet([](Tablet* t) {
+ return t->tablet_meta()->cooldown_meta_id().initialized() &&
t->is_used() &&
+ t->tablet_state() == TABLET_RUNNING;
+ });
+ TConfirmUnusedRemoteFilesRequest req;
+ req.__isset.confirm_list = true;
+ // tablet_id -> [fs, unused_remote_files]
+ using unused_remote_files_buffer_t = std::unordered_map<
+ int64_t, std::pair<std::shared_ptr<io::RemoteFileSystem>,
std::vector<io::Path>>>;
+ unused_remote_files_buffer_t buffer;
+ int64_t num_files_in_buffer = 0;
+ // assume a filename is 0.1KB, buffer size should not larger than 100MB
+ constexpr int64_t max_files_in_buffer = 1000000;
+
+ auto calc_unused_remote_files = [&req, &buffer,
&num_files_in_buffer](Tablet* t) {
+ auto storage_policy = get_storage_policy(t->storage_policy_id());
+ if (storage_policy == nullptr) {
+ LOG(WARNING) << "could not find storage_policy, storage_policy_id="
+ << t->storage_policy_id();
+ return;
+ }
+ auto resource = get_storage_resource(storage_policy->resource_id);
+ auto dest_fs =
std::static_pointer_cast<io::RemoteFileSystem>(resource.fs);
+ if (dest_fs == nullptr) {
+ LOG(WARNING) << "could not find resource, resouce_id=" <<
storage_policy->resource_id;
+ return;
+ }
+ DCHECK(atol(dest_fs->id().c_str()) == storage_policy->resource_id);
+ DCHECK(dest_fs->type() != io::FileSystemType::LOCAL);
+
+ Status st;
+ std::vector<io::Path> files;
+ {
+ std::unique_lock xlock(t->_remote_files_lock, std::try_to_lock);
+ if (!xlock.owns_lock()) {
+ LOG(WARNING) << "try remote_files_lock failed. tablet_id=" <<
t->tablet_id();
+ return;
+ }
+ // FIXME(plat1ko): What if user reset resource in storage policy
to another resource?
+ // Maybe we should also list files in previously uploaded
resources.
+ st = dest_fs->list(BetaRowset::remote_tablet_path(t->tablet_id()),
&files);
+ }
+ if (!st.ok()) {
+ LOG(WARNING) << "encounter error when remove unused remote files,
tablet_id="
+ << t->tablet_id() << " : " << st;
+ }
+ if (files.empty()) {
+ return;
+ }
+ // get all cooldowned rowsets
+ std::unordered_set<std::string> cooldowned_rowsets;
+ UniqueId cooldown_meta_id;
+ {
+ std::shared_lock rlock(t->_meta_lock);
+ for (auto& rs_meta : t->_tablet_meta->all_rs_metas()) {
+ if (!rs_meta->is_local()) {
+
cooldowned_rowsets.insert(rs_meta->rowset_id().to_string());
+ }
+ }
+ cooldown_meta_id = t->_tablet_meta->cooldown_meta_id();
+ }
+ // {replica_id}.meta
+ std::string remote_meta_path = std::to_string(t->replica_id()) +
".meta";
+ // filter out the paths that should be reserved
+ // clang-format off
+ files.erase(std::remove_if(files.begin(), files.end(), [&](io::Path&
path) {
+ const std::string& path_str = path.native();
+ if (StringPiece(path_str).ends_with(".meta")) {
+ return path_str == remote_meta_path;
+ }
+ if (StringPiece(path_str).ends_with(".dat")) {
+ // extract rowset id. filename format:
{rowset_id}_{segment_num}.dat
+ auto end = path_str.rfind('_');
+ if (UNLIKELY(end == std::string::npos)) {
+ return false;
+ }
+ return !!cooldowned_rowsets.count(path_str.substr(0, end));
+ }
+ if (StringPiece(path_str).ends_with(".idx")) {
+ // extract rowset id. filename format:
{rowset_id}_{segment_num}_{index_id}.idx
+ auto end = path_str.find('_');
+ if (UNLIKELY(end == std::string::npos)) {
+ return false;
+ }
+ return !!cooldowned_rowsets.count(path_str.substr(0, end));
+ }
+ return false;
+ }), files.end());
+ // clang-format on
+ if (files.empty()) {
+ return;
+ }
+ files.shrink_to_fit();
+ num_files_in_buffer += files.size();
+ buffer.insert({t->tablet_id(), {std::move(dest_fs),
std::move(files)}});
+ auto& info = req.confirm_list.emplace_back();
+ info.__set_tablet_id(t->tablet_id());
+ info.__set_cooldown_replica_id(t->replica_id());
+ info.__set_cooldown_meta_id(cooldown_meta_id.to_thrift());
+ };
+
+ auto confirm_and_remove_files = [&buffer, &req, &num_files_in_buffer]() {
+ TConfirmUnusedRemoteFilesResult result;
+ LOG(INFO) << "begin to confirm unused remote files. num_tablets=" <<
buffer.size()
+ << " num_files=" << num_files_in_buffer;
+ auto st =
MasterServerClient::instance()->confirm_unused_remote_files(req, &result);
Review Comment:
warning: use of undeclared identifier 'MasterServerClient'
[clang-diagnostic-error]
```cpp
auto st =
MasterServerClient::instance()->confirm_unused_remote_files(req, &result);
^
```
##########
be/src/olap/tablet_manager.cpp:
##########
@@ -1372,4 +1372,161 @@
return bad_tablets;
}
+void TabletManager::remove_unused_remote_files() {
+ auto tablets =
StorageEngine::instance()->tablet_manager()->get_all_tablet([](Tablet* t) {
+ return t->tablet_meta()->cooldown_meta_id().initialized() &&
t->is_used() &&
+ t->tablet_state() == TABLET_RUNNING;
+ });
+ TConfirmUnusedRemoteFilesRequest req;
+ req.__isset.confirm_list = true;
+ // tablet_id -> [fs, unused_remote_files]
+ using unused_remote_files_buffer_t = std::unordered_map<
+ int64_t, std::pair<std::shared_ptr<io::RemoteFileSystem>,
std::vector<io::Path>>>;
+ unused_remote_files_buffer_t buffer;
+ int64_t num_files_in_buffer = 0;
+ // assume a filename is 0.1KB, buffer size should not larger than 100MB
+ constexpr int64_t max_files_in_buffer = 1000000;
+
+ auto calc_unused_remote_files = [&req, &buffer,
&num_files_in_buffer](Tablet* t) {
+ auto storage_policy = get_storage_policy(t->storage_policy_id());
+ if (storage_policy == nullptr) {
+ LOG(WARNING) << "could not find storage_policy, storage_policy_id="
+ << t->storage_policy_id();
+ return;
+ }
+ auto resource = get_storage_resource(storage_policy->resource_id);
+ auto dest_fs =
std::static_pointer_cast<io::RemoteFileSystem>(resource.fs);
+ if (dest_fs == nullptr) {
+ LOG(WARNING) << "could not find resource, resouce_id=" <<
storage_policy->resource_id;
+ return;
+ }
+ DCHECK(atol(dest_fs->id().c_str()) == storage_policy->resource_id);
+ DCHECK(dest_fs->type() != io::FileSystemType::LOCAL);
+
+ Status st;
+ std::vector<io::Path> files;
+ {
+ std::unique_lock xlock(t->_remote_files_lock, std::try_to_lock);
+ if (!xlock.owns_lock()) {
+ LOG(WARNING) << "try remote_files_lock failed. tablet_id=" <<
t->tablet_id();
+ return;
+ }
+ // FIXME(plat1ko): What if user reset resource in storage policy
to another resource?
+ // Maybe we should also list files in previously uploaded
resources.
+ st = dest_fs->list(BetaRowset::remote_tablet_path(t->tablet_id()),
&files);
+ }
+ if (!st.ok()) {
+ LOG(WARNING) << "encounter error when remove unused remote files,
tablet_id="
+ << t->tablet_id() << " : " << st;
+ }
+ if (files.empty()) {
+ return;
+ }
+ // get all cooldowned rowsets
+ std::unordered_set<std::string> cooldowned_rowsets;
+ UniqueId cooldown_meta_id;
+ {
+ std::shared_lock rlock(t->_meta_lock);
+ for (auto& rs_meta : t->_tablet_meta->all_rs_metas()) {
+ if (!rs_meta->is_local()) {
+
cooldowned_rowsets.insert(rs_meta->rowset_id().to_string());
+ }
+ }
+ cooldown_meta_id = t->_tablet_meta->cooldown_meta_id();
+ }
+ // {replica_id}.meta
+ std::string remote_meta_path = std::to_string(t->replica_id()) +
".meta";
+ // filter out the paths that should be reserved
+ // clang-format off
+ files.erase(std::remove_if(files.begin(), files.end(), [&](io::Path&
path) {
+ const std::string& path_str = path.native();
+ if (StringPiece(path_str).ends_with(".meta")) {
+ return path_str == remote_meta_path;
+ }
+ if (StringPiece(path_str).ends_with(".dat")) {
+ // extract rowset id. filename format:
{rowset_id}_{segment_num}.dat
+ auto end = path_str.rfind('_');
+ if (UNLIKELY(end == std::string::npos)) {
+ return false;
+ }
+ return !!cooldowned_rowsets.count(path_str.substr(0, end));
+ }
+ if (StringPiece(path_str).ends_with(".idx")) {
+ // extract rowset id. filename format:
{rowset_id}_{segment_num}_{index_id}.idx
+ auto end = path_str.find('_');
+ if (UNLIKELY(end == std::string::npos)) {
+ return false;
+ }
+ return !!cooldowned_rowsets.count(path_str.substr(0, end));
+ }
+ return false;
+ }), files.end());
+ // clang-format on
+ if (files.empty()) {
+ return;
+ }
+ files.shrink_to_fit();
+ num_files_in_buffer += files.size();
+ buffer.insert({t->tablet_id(), {std::move(dest_fs),
std::move(files)}});
+ auto& info = req.confirm_list.emplace_back();
+ info.__set_tablet_id(t->tablet_id());
+ info.__set_cooldown_replica_id(t->replica_id());
+ info.__set_cooldown_meta_id(cooldown_meta_id.to_thrift());
+ };
+
+ auto confirm_and_remove_files = [&buffer, &req, &num_files_in_buffer]() {
+ TConfirmUnusedRemoteFilesResult result;
+ LOG(INFO) << "begin to confirm unused remote files. num_tablets=" <<
buffer.size()
+ << " num_files=" << num_files_in_buffer;
+ auto st =
MasterServerClient::instance()->confirm_unused_remote_files(req, &result);
+ if (!st.ok()) {
+ LOG(WARNING) << st;
+ return;
+ }
+ for (auto id : result.confirmed_tablets) {
+ if (auto it = buffer.find(id); LIKELY(it != buffer.end())) {
+ auto& fs = it->second.first;
+ auto& files = it->second.second;
+ // delete unused files
+ LOG(INFO) << "delete unused files. root_path=" <<
fs->root_path()
+ << " tablet_id=" << id;
+ io::Path dir("data/" + std::to_string(id));
+ for (auto& file : files) {
+ file = dir / file;
+ LOG(INFO) << "delete unused file: " << file.native();
+ }
+ st = fs->batch_delete(files);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to delete unused files,
tablet_id=" << id << " : "
+ << st;
+ }
+ }
+ }
+ };
+
+ // batch confirm to reduce FE's overhead
+ auto next_confirm_time = std::chrono::steady_clock::now() +
+
std::chrono::seconds(config::confirm_unused_remote_files_interval_sec);
+ for (auto& t : tablets) {
+ if (t.use_count() <= 1 // this means tablet has been dropped
+ || t->_cooldown_replica_id != t->replica_id() || t->_state !=
TABLET_RUNNING) {
Review Comment:
warning: '_state' is a protected member of 'doris::BaseTablet'
[clang-diagnostic-error]
```cpp
|| t->_cooldown_replica_id != t->replica_id() || t->_state !=
TABLET_RUNNING) {
^
```
**be/src/olap/base_tablet.h:74:** declared protected here
```cpp
TabletState _state;
^
```
##########
be/src/olap/tablet_manager.cpp:
##########
@@ -1372,4 +1372,161 @@
return bad_tablets;
}
+void TabletManager::remove_unused_remote_files() {
+ auto tablets =
StorageEngine::instance()->tablet_manager()->get_all_tablet([](Tablet* t) {
+ return t->tablet_meta()->cooldown_meta_id().initialized() &&
t->is_used() &&
+ t->tablet_state() == TABLET_RUNNING;
+ });
+ TConfirmUnusedRemoteFilesRequest req;
+ req.__isset.confirm_list = true;
+ // tablet_id -> [fs, unused_remote_files]
+ using unused_remote_files_buffer_t = std::unordered_map<
+ int64_t, std::pair<std::shared_ptr<io::RemoteFileSystem>,
std::vector<io::Path>>>;
+ unused_remote_files_buffer_t buffer;
+ int64_t num_files_in_buffer = 0;
+ // assume a filename is 0.1KB, buffer size should not larger than 100MB
+ constexpr int64_t max_files_in_buffer = 1000000;
+
+ auto calc_unused_remote_files = [&req, &buffer,
&num_files_in_buffer](Tablet* t) {
+ auto storage_policy = get_storage_policy(t->storage_policy_id());
+ if (storage_policy == nullptr) {
+ LOG(WARNING) << "could not find storage_policy, storage_policy_id="
+ << t->storage_policy_id();
+ return;
+ }
+ auto resource = get_storage_resource(storage_policy->resource_id);
+ auto dest_fs =
std::static_pointer_cast<io::RemoteFileSystem>(resource.fs);
+ if (dest_fs == nullptr) {
+ LOG(WARNING) << "could not find resource, resouce_id=" <<
storage_policy->resource_id;
+ return;
+ }
+ DCHECK(atol(dest_fs->id().c_str()) == storage_policy->resource_id);
+ DCHECK(dest_fs->type() != io::FileSystemType::LOCAL);
+
+ Status st;
+ std::vector<io::Path> files;
+ {
+ std::unique_lock xlock(t->_remote_files_lock, std::try_to_lock);
+ if (!xlock.owns_lock()) {
+ LOG(WARNING) << "try remote_files_lock failed. tablet_id=" <<
t->tablet_id();
+ return;
+ }
+ // FIXME(plat1ko): What if user reset resource in storage policy
to another resource?
+ // Maybe we should also list files in previously uploaded
resources.
+ st = dest_fs->list(BetaRowset::remote_tablet_path(t->tablet_id()),
&files);
Review Comment:
warning: use of undeclared identifier 'BetaRowset' [clang-diagnostic-error]
```cpp
st =
dest_fs->list(BetaRowset::remote_tablet_path(t->tablet_id()), &files);
^
```
##########
be/src/olap/tablet_manager.cpp:
##########
@@ -1372,4 +1372,161 @@
return bad_tablets;
}
+void TabletManager::remove_unused_remote_files() {
+ auto tablets =
StorageEngine::instance()->tablet_manager()->get_all_tablet([](Tablet* t) {
+ return t->tablet_meta()->cooldown_meta_id().initialized() &&
t->is_used() &&
+ t->tablet_state() == TABLET_RUNNING;
+ });
+ TConfirmUnusedRemoteFilesRequest req;
+ req.__isset.confirm_list = true;
+ // tablet_id -> [fs, unused_remote_files]
+ using unused_remote_files_buffer_t = std::unordered_map<
+ int64_t, std::pair<std::shared_ptr<io::RemoteFileSystem>,
std::vector<io::Path>>>;
+ unused_remote_files_buffer_t buffer;
+ int64_t num_files_in_buffer = 0;
+ // assume a filename is 0.1KB, buffer size should not larger than 100MB
+ constexpr int64_t max_files_in_buffer = 1000000;
+
+ auto calc_unused_remote_files = [&req, &buffer,
&num_files_in_buffer](Tablet* t) {
+ auto storage_policy = get_storage_policy(t->storage_policy_id());
+ if (storage_policy == nullptr) {
+ LOG(WARNING) << "could not find storage_policy, storage_policy_id="
+ << t->storage_policy_id();
+ return;
+ }
+ auto resource = get_storage_resource(storage_policy->resource_id);
+ auto dest_fs =
std::static_pointer_cast<io::RemoteFileSystem>(resource.fs);
+ if (dest_fs == nullptr) {
+ LOG(WARNING) << "could not find resource, resouce_id=" <<
storage_policy->resource_id;
+ return;
+ }
+ DCHECK(atol(dest_fs->id().c_str()) == storage_policy->resource_id);
+ DCHECK(dest_fs->type() != io::FileSystemType::LOCAL);
+
+ Status st;
+ std::vector<io::Path> files;
+ {
+ std::unique_lock xlock(t->_remote_files_lock, std::try_to_lock);
+ if (!xlock.owns_lock()) {
+ LOG(WARNING) << "try remote_files_lock failed. tablet_id=" <<
t->tablet_id();
+ return;
+ }
+ // FIXME(plat1ko): What if user reset resource in storage policy
to another resource?
+ // Maybe we should also list files in previously uploaded
resources.
+ st = dest_fs->list(BetaRowset::remote_tablet_path(t->tablet_id()),
&files);
+ }
+ if (!st.ok()) {
+ LOG(WARNING) << "encounter error when remove unused remote files,
tablet_id="
+ << t->tablet_id() << " : " << st;
+ }
+ if (files.empty()) {
+ return;
+ }
+ // get all cooldowned rowsets
+ std::unordered_set<std::string> cooldowned_rowsets;
+ UniqueId cooldown_meta_id;
+ {
+ std::shared_lock rlock(t->_meta_lock);
+ for (auto& rs_meta : t->_tablet_meta->all_rs_metas()) {
+ if (!rs_meta->is_local()) {
+
cooldowned_rowsets.insert(rs_meta->rowset_id().to_string());
+ }
+ }
+ cooldown_meta_id = t->_tablet_meta->cooldown_meta_id();
+ }
+ // {replica_id}.meta
+ std::string remote_meta_path = std::to_string(t->replica_id()) +
".meta";
+ // filter out the paths that should be reserved
+ // clang-format off
+ files.erase(std::remove_if(files.begin(), files.end(), [&](io::Path&
path) {
+ const std::string& path_str = path.native();
+ if (StringPiece(path_str).ends_with(".meta")) {
+ return path_str == remote_meta_path;
+ }
+ if (StringPiece(path_str).ends_with(".dat")) {
+ // extract rowset id. filename format:
{rowset_id}_{segment_num}.dat
+ auto end = path_str.rfind('_');
+ if (UNLIKELY(end == std::string::npos)) {
+ return false;
+ }
+ return !!cooldowned_rowsets.count(path_str.substr(0, end));
+ }
+ if (StringPiece(path_str).ends_with(".idx")) {
+ // extract rowset id. filename format:
{rowset_id}_{segment_num}_{index_id}.idx
+ auto end = path_str.find('_');
+ if (UNLIKELY(end == std::string::npos)) {
+ return false;
+ }
+ return !!cooldowned_rowsets.count(path_str.substr(0, end));
+ }
+ return false;
+ }), files.end());
+ // clang-format on
+ if (files.empty()) {
+ return;
+ }
+ files.shrink_to_fit();
+ num_files_in_buffer += files.size();
+ buffer.insert({t->tablet_id(), {std::move(dest_fs),
std::move(files)}});
+ auto& info = req.confirm_list.emplace_back();
+ info.__set_tablet_id(t->tablet_id());
+ info.__set_cooldown_replica_id(t->replica_id());
+ info.__set_cooldown_meta_id(cooldown_meta_id.to_thrift());
+ };
+
+ auto confirm_and_remove_files = [&buffer, &req, &num_files_in_buffer]() {
+ TConfirmUnusedRemoteFilesResult result;
+ LOG(INFO) << "begin to confirm unused remote files. num_tablets=" <<
buffer.size()
+ << " num_files=" << num_files_in_buffer;
+ auto st =
MasterServerClient::instance()->confirm_unused_remote_files(req, &result);
+ if (!st.ok()) {
+ LOG(WARNING) << st;
+ return;
+ }
+ for (auto id : result.confirmed_tablets) {
+ if (auto it = buffer.find(id); LIKELY(it != buffer.end())) {
+ auto& fs = it->second.first;
+ auto& files = it->second.second;
+ // delete unused files
+ LOG(INFO) << "delete unused files. root_path=" <<
fs->root_path()
+ << " tablet_id=" << id;
+ io::Path dir("data/" + std::to_string(id));
+ for (auto& file : files) {
+ file = dir / file;
+ LOG(INFO) << "delete unused file: " << file.native();
+ }
+ st = fs->batch_delete(files);
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to delete unused files,
tablet_id=" << id << " : "
+ << st;
+ }
+ }
+ }
+ };
+
+ // batch confirm to reduce FE's overhead
+ auto next_confirm_time = std::chrono::steady_clock::now() +
+
std::chrono::seconds(config::confirm_unused_remote_files_interval_sec);
+ for (auto& t : tablets) {
+ if (t.use_count() <= 1 // this means tablet has been dropped
+ || t->_cooldown_replica_id != t->replica_id() || t->_state !=
TABLET_RUNNING) {
Review Comment:
warning: '_cooldown_replica_id' is a private member of 'doris::Tablet'
[clang-diagnostic-error]
```cpp
|| t->_cooldown_replica_id != t->replica_id() || t->_state !=
TABLET_RUNNING) {
^
```
**be/src/olap/tablet.h:506:** declared private here
```cpp
int64_t _cooldown_replica_id = -1;
^
```
##########
be/src/olap/tablet_manager.cpp:
##########
@@ -1372,4 +1372,161 @@
return bad_tablets;
}
+void TabletManager::remove_unused_remote_files() {
+ auto tablets =
StorageEngine::instance()->tablet_manager()->get_all_tablet([](Tablet* t) {
+ return t->tablet_meta()->cooldown_meta_id().initialized() &&
t->is_used() &&
+ t->tablet_state() == TABLET_RUNNING;
+ });
+ TConfirmUnusedRemoteFilesRequest req;
+ req.__isset.confirm_list = true;
+ // tablet_id -> [fs, unused_remote_files]
+ using unused_remote_files_buffer_t = std::unordered_map<
+ int64_t, std::pair<std::shared_ptr<io::RemoteFileSystem>,
std::vector<io::Path>>>;
+ unused_remote_files_buffer_t buffer;
+ int64_t num_files_in_buffer = 0;
+ // assume a filename is 0.1KB, buffer size should not larger than 100MB
+ constexpr int64_t max_files_in_buffer = 1000000;
+
+ auto calc_unused_remote_files = [&req, &buffer,
&num_files_in_buffer](Tablet* t) {
+ auto storage_policy = get_storage_policy(t->storage_policy_id());
+ if (storage_policy == nullptr) {
+ LOG(WARNING) << "could not find storage_policy, storage_policy_id="
+ << t->storage_policy_id();
+ return;
+ }
+ auto resource = get_storage_resource(storage_policy->resource_id);
+ auto dest_fs =
std::static_pointer_cast<io::RemoteFileSystem>(resource.fs);
+ if (dest_fs == nullptr) {
+ LOG(WARNING) << "could not find resource, resouce_id=" <<
storage_policy->resource_id;
+ return;
+ }
+ DCHECK(atol(dest_fs->id().c_str()) == storage_policy->resource_id);
+ DCHECK(dest_fs->type() != io::FileSystemType::LOCAL);
+
+ Status st;
+ std::vector<io::Path> files;
+ {
+ std::unique_lock xlock(t->_remote_files_lock, std::try_to_lock);
+ if (!xlock.owns_lock()) {
+ LOG(WARNING) << "try remote_files_lock failed. tablet_id=" <<
t->tablet_id();
+ return;
+ }
+ // FIXME(plat1ko): What if user reset resource in storage policy
to another resource?
+ // Maybe we should also list files in previously uploaded
resources.
+ st = dest_fs->list(BetaRowset::remote_tablet_path(t->tablet_id()),
&files);
+ }
+ if (!st.ok()) {
+ LOG(WARNING) << "encounter error when remove unused remote files,
tablet_id="
+ << t->tablet_id() << " : " << st;
+ }
+ if (files.empty()) {
+ return;
+ }
+ // get all cooldowned rowsets
+ std::unordered_set<std::string> cooldowned_rowsets;
+ UniqueId cooldown_meta_id;
+ {
+ std::shared_lock rlock(t->_meta_lock);
+ for (auto& rs_meta : t->_tablet_meta->all_rs_metas()) {
+ if (!rs_meta->is_local()) {
+
cooldowned_rowsets.insert(rs_meta->rowset_id().to_string());
+ }
+ }
+ cooldown_meta_id = t->_tablet_meta->cooldown_meta_id();
Review Comment:
warning: '_tablet_meta' is a protected member of 'doris::BaseTablet'
[clang-diagnostic-error]
```cpp
cooldown_meta_id = t->_tablet_meta->cooldown_meta_id();
^
```
**be/src/olap/base_tablet.h:75:** declared protected here
```cpp
TabletMetaSharedPtr _tablet_meta;
^
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]