This is an automated email from the ASF dual-hosted git repository.
pengxiangyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0ec218108d [enhancement](remote) support local cache GC at the
granularity of cache files (#14920)
0ec218108d is described below
commit 0ec218108d7000175f24f75a32d3740e841ff617
Author: luozenglin <[email protected]>
AuthorDate: Fri Dec 9 17:35:23 2022 +0800
[enhancement](remote) support local cache GC at the granularity of cache
files (#14920)
* [enhancement](remote) support local cache GC at the granularity of cache
files
* update
* update
* update
---
be/src/io/cache/dummy_file_cache.cpp | 59 +++++++++++------------
be/src/io/cache/dummy_file_cache.h | 21 ++++++--
be/src/io/cache/file_cache.cpp | 29 +++++++++++
be/src/io/cache/file_cache.h | 25 +++++++---
be/src/io/cache/file_cache_manager.cpp | 49 ++++++++++++++-----
be/src/io/cache/file_cache_manager.h | 4 +-
be/src/io/cache/sub_file_cache.cpp | 88 ++++++++++++++++++++--------------
be/src/io/cache/sub_file_cache.h | 21 +++++++-
be/src/io/cache/whole_file_cache.cpp | 33 ++++---------
be/src/io/cache/whole_file_cache.h | 12 ++++-
10 files changed, 224 insertions(+), 117 deletions(-)
diff --git a/be/src/io/cache/dummy_file_cache.cpp
b/be/src/io/cache/dummy_file_cache.cpp
index 7e94bb388e..166c8b55e4 100644
--- a/be/src/io/cache/dummy_file_cache.cpp
+++ b/be/src/io/cache/dummy_file_cache.cpp
@@ -28,21 +28,15 @@ namespace io {
DummyFileCache::DummyFileCache(const Path& cache_dir, int64_t alive_time_sec)
: _cache_dir(cache_dir), _alive_time_sec(alive_time_sec) {}
-DummyFileCache::~DummyFileCache() {}
-
-void DummyFileCache::_update_last_mtime(const Path& done_file) {
- Path cache_done_file = _cache_dir / done_file;
- time_t m_time;
- if (FileUtils::mtime(cache_done_file.native(), &m_time).ok() && m_time >
_last_match_time) {
- _last_match_time = m_time;
- }
-}
+DummyFileCache::~DummyFileCache() = default;
void DummyFileCache::_add_file_cache(const Path& data_file) {
Path cache_file = _cache_dir / data_file;
size_t file_size = 0;
- if (io::global_local_filesystem()->file_size(cache_file, &file_size).ok())
{
- _file_sizes[cache_file] = file_size;
+ time_t m_time = 0;
+ if (io::global_local_filesystem()->file_size(cache_file, &file_size).ok()
&&
+ FileUtils::mtime(cache_file.native(), &m_time).ok()) {
+ _gc_lru_queue.push({cache_file, m_time});
_cache_file_size += file_size;
} else {
_unfinished_files.push_back(cache_file);
@@ -72,7 +66,6 @@ void DummyFileCache::_load() {
Path cache_filename = StringReplace(iter->native(),
CACHE_DONE_FILE_SUFFIX, "", true);
if (cache_names.find(cache_filename) != cache_names.end()) {
cache_names.erase(cache_filename);
- _update_last_mtime(*iter);
_add_file_cache(cache_filename);
} else {
// not data file, but with DONE file
@@ -110,35 +103,39 @@ Status DummyFileCache::load_and_clean() {
}
Status DummyFileCache::clean_timeout_cache() {
- if (time(nullptr) - _last_match_time > _alive_time_sec) {
- return _clean_cache_internal();
+ while (!_gc_lru_queue.empty() &&
+ time(nullptr) - _gc_lru_queue.top().last_match_time >
_alive_time_sec) {
+ size_t cleaned_size = 0;
+ RETURN_IF_ERROR(_clean_cache_internal(_gc_lru_queue.top().file,
&cleaned_size));
+ _cache_file_size -= cleaned_size;
+ _gc_lru_queue.pop();
}
return Status::OK();
}
Status DummyFileCache::clean_all_cache() {
- return _clean_cache_internal();
+ while (!_gc_lru_queue.empty()) {
+ RETURN_IF_ERROR(_clean_cache_internal(_gc_lru_queue.top().file,
nullptr));
+ _gc_lru_queue.pop();
+ }
+ _cache_file_size = 0;
+ return Status::OK();
}
-Status DummyFileCache::_clean_cache_internal() {
- for (const auto& iter : _file_sizes) {
- const auto cache_file_path = iter.first;
- Path done_file_path = cache_file_path.native() +
CACHE_DONE_FILE_SUFFIX;
- LOG(INFO) << "Delete unused done_cache_path: " <<
done_file_path.native()
- << ", cache_file_path: " << cache_file_path.native();
- if (!io::global_local_filesystem()->delete_file(done_file_path).ok()) {
- LOG(ERROR) << "delete_file failed: " << done_file_path.native();
- continue;
- }
- if (!io::global_local_filesystem()->delete_file(cache_file_path).ok())
{
- LOG(ERROR) << "delete_file failed: " << cache_file_path.native();
- continue;
- }
+Status DummyFileCache::clean_one_cache(size_t* cleaned_size) {
+ if (!_gc_lru_queue.empty()) {
+ const auto& cache = _gc_lru_queue.top();
+ RETURN_IF_ERROR(_clean_cache_internal(cache.file, cleaned_size));
+ _cache_file_size -= *cleaned_size;
+ _gc_lru_queue.pop();
}
- _file_sizes.clear();
- _cache_file_size = 0;
return Status::OK();
}
+Status DummyFileCache::_clean_cache_internal(const Path& cache_file_path,
size_t* cleaned_size) {
+ Path done_file_path = cache_file_path.native() + CACHE_DONE_FILE_SUFFIX;
+ return _remove_file(cache_file_path, done_file_path, cleaned_size);
+}
+
} // namespace io
} // namespace doris
diff --git a/be/src/io/cache/dummy_file_cache.h
b/be/src/io/cache/dummy_file_cache.h
index e59d867a54..4d604cdb6a 100644
--- a/be/src/io/cache/dummy_file_cache.h
+++ b/be/src/io/cache/dummy_file_cache.h
@@ -19,6 +19,7 @@
#include <future>
#include <memory>
+#include <queue>
#include "common/status.h"
#include "io/cache/file_cache.h"
@@ -55,22 +56,36 @@ public:
Status clean_all_cache() override;
+ Status clean_one_cache(size_t* cleaned_size) override;
+
Status load_and_clean();
bool is_dummy_file_cache() override { return true; }
+ int64_t get_oldest_match_time() const override {
+ return _gc_lru_queue.empty() ? 0 : _gc_lru_queue.top().last_match_time;
+ };
+
+ bool is_gc_finish() const override { return _gc_lru_queue.empty(); }
+
private:
Status _clean_unfinished_cache();
- void _update_last_mtime(const Path& done_file);
void _add_file_cache(const Path& data_file);
void _load();
- Status _clean_cache_internal();
+ Status _clean_cache_internal(const Path&, size_t*);
private:
+ struct DummyFileInfo {
+ Path file;
+ int64_t last_match_time;
+ };
+ using DummyGcQueue = std::priority_queue<DummyFileInfo,
std::vector<DummyFileInfo>,
+
SubFileLRUComparator<DummyFileInfo>>;
+ DummyGcQueue _gc_lru_queue;
+
Path _cache_dir;
int64_t _alive_time_sec;
- std::map<Path, int64_t> _file_sizes;
std::list<Path> _unfinished_files;
};
diff --git a/be/src/io/cache/file_cache.cpp b/be/src/io/cache/file_cache.cpp
index 72f96c3776..b016ee2aa5 100644
--- a/be/src/io/cache/file_cache.cpp
+++ b/be/src/io/cache/file_cache.cpp
@@ -86,5 +86,34 @@ Status FileCache::download_cache_to_local(const Path&
cache_file, const Path& ca
return Status::OK();
}
+Status FileCache::_remove_file(const Path& cache_file, const Path&
cache_done_file,
+ size_t* cleaned_size) {
+ bool done_file_exist = false;
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->exists(cache_done_file,
&done_file_exist),
+ "Check local done file exist failed.");
+ if (done_file_exist) {
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->delete_file(cache_done_file),
+ fmt::format("Delete local done file failed: {}",
cache_done_file.native()));
+ }
+ bool cache_file_exist = false;
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->exists(cache_file,
&cache_file_exist),
+ "Check local cache file exist failed.");
+ if (cache_file_exist) {
+ if (cleaned_size) {
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->file_size(cache_file,
cleaned_size),
+ fmt::format("get local cache file size failed: {}",
cache_file.native()));
+ }
+ RETURN_NOT_OK_STATUS_WITH_WARN(
+ io::global_local_filesystem()->delete_file(cache_file),
+ fmt::format("Delete local cache file failed: {}",
cache_file.native()));
+ }
+ LOG(INFO) << "Delete local cache file successfully: " <<
cache_file.native();
+ return Status::OK();
+}
+
} // namespace io
} // namespace doris
diff --git a/be/src/io/cache/file_cache.h b/be/src/io/cache/file_cache.h
index 8e0cb0a679..0d7afb5754 100644
--- a/be/src/io/cache/file_cache.h
+++ b/be/src/io/cache/file_cache.h
@@ -18,7 +18,9 @@
#pragma once
#include <memory>
+#include <queue>
#include <shared_mutex>
+#include <string>
#include "common/status.h"
#include "io/fs/file_reader.h"
@@ -31,8 +33,8 @@ const std::string CACHE_DONE_FILE_SUFFIX = "_DONE";
class FileCache : public FileReader {
public:
- FileCache() : _last_match_time(time(nullptr)), _cache_file_size(0) {}
- virtual ~FileCache() = default;
+ FileCache() : _cache_file_size(0) {}
+ ~FileCache() override = default;
DISALLOW_COPY_AND_ASSIGN(FileCache);
@@ -46,17 +48,28 @@ public:
virtual Status clean_all_cache() = 0;
+ virtual Status clean_one_cache(size_t* cleaned_size) = 0;
+
+ virtual bool is_gc_finish() const = 0;
+
virtual bool is_dummy_file_cache() { return false; }
Status download_cache_to_local(const Path& cache_file, const Path&
cache_done_file,
io::FileReaderSPtr remote_file_reader,
size_t req_size,
size_t offset = 0);
- void update_last_match_time() { _last_match_time = time(nullptr); }
- int64_t get_last_match_time() const { return _last_match_time; }
+ virtual int64_t get_oldest_match_time() const = 0;
protected:
- int64_t _last_match_time;
+ Status _remove_file(const Path& cache_file, const Path& cache_done_file,
size_t* cleaned_size);
+
+ template <typename T>
+ struct SubFileLRUComparator {
+ bool operator()(const T& lhs, const T& rhs) const {
+ return lhs.last_match_time > rhs.last_match_time;
+ };
+ };
+
size_t _cache_file_size;
};
@@ -64,7 +77,7 @@ using FileCachePtr = std::shared_ptr<FileCache>;
struct FileCacheLRUComparator {
bool operator()(const FileCachePtr& lhs, const FileCachePtr& rhs) const {
- return lhs->get_last_match_time() > rhs->get_last_match_time();
+ return lhs->get_oldest_match_time() > rhs->get_oldest_match_time();
}
};
diff --git a/be/src/io/cache/file_cache_manager.cpp
b/be/src/io/cache/file_cache_manager.cpp
index 840c39f454..6d34434190 100644
--- a/be/src/io/cache/file_cache_manager.cpp
+++ b/be/src/io/cache/file_cache_manager.cpp
@@ -44,13 +44,31 @@ bool GCContextPerDisk::try_add_file_cache(FileCachePtr
cache, int64_t file_size)
return false;
}
-void GCContextPerDisk::get_gc_file_caches(std::list<FileCachePtr>& result) {
- while (!_lru_queue.empty() && _used_size > _conf_max_size) {
+FileCachePtr GCContextPerDisk::top() {
+ if (!_lru_queue.empty() && _used_size > _conf_max_size) {
+ return _lru_queue.top();
+ }
+ return nullptr;
+}
+
+void GCContextPerDisk::pop() {
+ if (!_lru_queue.empty()) {
+ _lru_queue.pop();
+ }
+}
+
+Status GCContextPerDisk::gc_top() {
+ if (!_lru_queue.empty() && _used_size > _conf_max_size) {
auto file_cache = _lru_queue.top();
- _used_size -= file_cache->cache_file_size();
- result.push_back(file_cache);
+ size_t cleaned_size = 0;
+ RETURN_IF_ERROR(file_cache->clean_one_cache(&cleaned_size));
+ _used_size -= cleaned_size;
_lru_queue.pop();
+ if (!file_cache->is_gc_finish()) {
+ _lru_queue.push(file_cache);
+ }
}
+ return Status::OK();
}
void FileCacheManager::add_file_cache(const std::string& cache_path,
FileCachePtr file_cache) {
@@ -175,16 +193,21 @@ void FileCacheManager::gc_file_caches() {
// policy2: GC file cache by disk size
if (gc_conf_size > 0) {
for (size_t i = 0; i < contexts.size(); ++i) {
- std::list<FileCachePtr> gc_file_list;
- contexts[i].get_gc_file_caches(gc_file_list);
- for (auto item : gc_file_list) {
- std::shared_lock<std::shared_mutex> rdlock(_cache_map_lock);
- // for dummy file cache, check already used or not again
- if (item->is_dummy_file_cache() &&
- _file_cache_map.find(item->cache_dir().native()) !=
_file_cache_map.end()) {
- continue;
+ auto& context = contexts[i];
+ FileCachePtr file_cache;
+ while ((file_cache = context.top()) != nullptr) {
+ {
+ std::shared_lock<std::shared_mutex>
rdlock(_cache_map_lock);
+ // for dummy file cache, check already used or not again
+ if (file_cache->is_dummy_file_cache() &&
+ _file_cache_map.find(file_cache->cache_dir().native())
!=
+ _file_cache_map.end()) {
+ context.pop();
+ continue;
+ }
}
- item->clean_all_cache();
+ WARN_IF_ERROR(context.gc_top(),
+ fmt::format("gc {} error",
file_cache->cache_dir().native()));
}
}
}
diff --git a/be/src/io/cache/file_cache_manager.h
b/be/src/io/cache/file_cache_manager.h
index c25d1cfb5c..9332b324fc 100644
--- a/be/src/io/cache/file_cache_manager.h
+++ b/be/src/io/cache/file_cache_manager.h
@@ -33,7 +33,9 @@ public:
GCContextPerDisk() : _conf_max_size(0), _used_size(0) {}
void init(const std::string& path, int64_t max_size);
bool try_add_file_cache(FileCachePtr cache, int64_t file_size);
- void get_gc_file_caches(std::list<FileCachePtr>&);
+ FileCachePtr top();
+ Status gc_top();
+ void pop();
private:
std::string _disk_path;
diff --git a/be/src/io/cache/sub_file_cache.cpp
b/be/src/io/cache/sub_file_cache.cpp
index 292f442ddd..34b509bc9e 100644
--- a/be/src/io/cache/sub_file_cache.cpp
+++ b/be/src/io/cache/sub_file_cache.cpp
@@ -17,6 +17,12 @@
#include "io/cache/sub_file_cache.h"
+#include <glog/logging.h>
+
+#include <algorithm>
+#include <utility>
+#include <vector>
+
#include "common/config.h"
#include "io/fs/local_file_system.h"
#include "olap/iterators.h"
@@ -115,14 +121,17 @@ Status SubFileCache::read_at(size_t offset, Slice result,
const IOContext& io_ct
_last_match_times[*iter] = time(nullptr);
}
}
- update_last_match_time();
return Status::OK();
}
+std::pair<Path, Path> SubFileCache::_cache_path(size_t offset) {
+ return {_cache_dir / fmt::format("{}_{}", SUB_FILE_CACHE_PREFIX, offset),
+ _cache_dir /
+ fmt::format("{}_{}{}", SUB_FILE_CACHE_PREFIX, offset,
CACHE_DONE_FILE_SUFFIX)};
+}
+
Status SubFileCache::_generate_cache_reader(size_t offset, size_t req_size) {
- Path cache_file = _cache_dir / fmt::format("{}_{}", SUB_FILE_CACHE_PREFIX,
offset);
- Path cache_done_file = _cache_dir / fmt::format("{}_{}{}",
SUB_FILE_CACHE_PREFIX, offset,
- CACHE_DONE_FILE_SUFFIX);
+ auto [cache_file, cache_done_file] = _cache_path(offset);
bool done_file_exist = false;
RETURN_NOT_OK_STATUS_WITH_WARN(
io::global_local_filesystem()->exists(cache_done_file,
&done_file_exist),
@@ -134,8 +143,9 @@ Status SubFileCache::_generate_cache_reader(size_t offset,
size_t req_size) {
ThreadPoolToken* thread_token =
ExecEnv::GetInstance()->get_serial_download_cache_thread_token();
if (thread_token != nullptr) {
- auto st = thread_token->submit_func([this, &download_st,
cache_done_file, cache_file,
- offset, req_size] {
+ auto st = thread_token->submit_func([this, &download_st,
+ cache_done_file =
cache_done_file,
+ cache_file = cache_file,
offset, req_size] {
auto func = [this, cache_done_file, cache_file, offset,
req_size] {
bool done_file_exist = false;
// Judge again whether cache_done_file exists, it is
possible that the cache
@@ -180,7 +190,6 @@ Status SubFileCache::_generate_cache_reader(size_t offset,
size_t req_size) {
RETURN_IF_ERROR(io::global_local_filesystem()->open_file(cache_file,
&cache_reader));
_cache_file_readers.emplace(offset, cache_reader);
_last_match_times.emplace(offset, time(nullptr));
- update_last_match_time();
LOG(INFO) << "Create cache file from remote file successfully: "
<< _remote_file_reader->path().native() << "(" << offset << ", "
<< req_size
<< ") -> " << cache_file.native();
@@ -199,6 +208,8 @@ Status SubFileCache::_get_need_cache_offsets(size_t offset,
size_t req_size,
}
Status SubFileCache::clean_timeout_cache() {
+ SubGcQueue gc_queue;
+ _gc_lru_queue.swap(gc_queue);
std::vector<size_t> timeout_keys;
{
std::shared_lock<std::shared_mutex> rlock(_cache_map_lock);
@@ -206,6 +217,9 @@ Status SubFileCache::clean_timeout_cache() {
iter != _last_match_times.cend(); ++iter) {
if (time(nullptr) - iter->second > _alive_time_sec) {
timeout_keys.emplace_back(iter->first);
+ } else {
+ auto [cache_file, done_file] = _cache_path(iter->first);
+ _gc_lru_queue.push({iter->first, iter->second});
}
}
}
@@ -213,9 +227,10 @@ Status SubFileCache::clean_timeout_cache() {
std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock);
for (std::vector<size_t>::const_iterator iter = timeout_keys.cbegin();
iter != timeout_keys.cend(); ++iter) {
- RETURN_IF_ERROR(_clean_cache_internal(*iter));
+ size_t cleaned_size = 0;
+ RETURN_IF_ERROR(_clean_cache_internal(*iter, &cleaned_size));
+ _cache_file_size -= cleaned_size;
}
- _cache_file_size = _calc_cache_file_size();
}
return Status::OK();
}
@@ -224,40 +239,41 @@ Status SubFileCache::clean_all_cache() {
std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock);
for (std::map<size_t, int64_t>::const_iterator iter =
_last_match_times.cbegin();
iter != _last_match_times.cend(); ++iter) {
- RETURN_IF_ERROR(_clean_cache_internal(iter->first));
+ RETURN_IF_ERROR(_clean_cache_internal(iter->first, nullptr));
+ }
+ _cache_file_size = 0;
+ return Status::OK();
+}
+
+Status SubFileCache::clean_one_cache(size_t* cleaned_size) {
+ if (!_gc_lru_queue.empty()) {
+ const auto& cache = _gc_lru_queue.top();
+ {
+ std::unique_lock<std::shared_mutex> wrlock(_cache_map_lock);
+ if (auto it = _last_match_times.find(cache.offset);
+ it != _last_match_times.end() && it->second ==
cache.last_match_time) {
+ RETURN_IF_ERROR(_clean_cache_internal(cache.offset,
cleaned_size));
+ _cache_file_size -= *cleaned_size;
+ _gc_lru_queue.pop();
+ }
+ }
+ decltype(_last_match_times.begin()) it;
+ while (!_gc_lru_queue.empty() &&
+ (it = _last_match_times.find(_gc_lru_queue.top().offset)) !=
+ _last_match_times.end() &&
+ it->second != _gc_lru_queue.top().last_match_time) {
+ _gc_lru_queue.pop();
+ }
}
- _cache_file_size = _calc_cache_file_size();
return Status::OK();
}
-Status SubFileCache::_clean_cache_internal(size_t offset) {
+Status SubFileCache::_clean_cache_internal(size_t offset, size_t*
cleaned_size) {
if (_cache_file_readers.find(offset) != _cache_file_readers.end()) {
_cache_file_readers.erase(offset);
}
- _cache_file_size = 0;
- Path cache_file = _cache_dir / fmt::format("{}_{}", SUB_FILE_CACHE_PREFIX,
offset);
- Path done_file = _cache_dir /
- fmt::format("{}_{}{}", SUB_FILE_CACHE_PREFIX, offset,
CACHE_DONE_FILE_SUFFIX);
- bool done_file_exist = false;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->exists(done_file, &done_file_exist),
- "Check local done file exist failed.");
- if (done_file_exist) {
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->delete_file(done_file),
- fmt::format("Delete local done file failed: {}",
done_file.native()));
- }
- bool cache_file_exist = false;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->exists(cache_file,
&cache_file_exist),
- "Check local cache file exist failed.");
- if (cache_file_exist) {
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->delete_file(cache_file),
- fmt::format("Delete local cache file failed: {}",
cache_file.native()));
- }
- LOG(INFO) << "Delete local cache file successfully: " <<
cache_file.native();
- return Status::OK();
+ auto [cache_file, done_file] = _cache_path(offset);
+ return _remove_file(cache_file, done_file, cleaned_size);
}
size_t SubFileCache::_calc_cache_file_size() {
diff --git a/be/src/io/cache/sub_file_cache.h b/be/src/io/cache/sub_file_cache.h
index 0567a2c4bd..5e26134f11 100644
--- a/be/src/io/cache/sub_file_cache.h
+++ b/be/src/io/cache/sub_file_cache.h
@@ -52,17 +52,36 @@ public:
Status clean_all_cache() override;
+ Status clean_one_cache(size_t* cleaned_size) override;
+
+ int64_t get_oldest_match_time() const override {
+ return _gc_lru_queue.empty() ? 0 : _gc_lru_queue.top().last_match_time;
+ };
+
+ bool is_gc_finish() const override { return _gc_lru_queue.empty(); }
+
private:
Status _generate_cache_reader(size_t offset, size_t req_size);
- Status _clean_cache_internal(size_t offset);
+ Status _clean_cache_internal(size_t offset, size_t* cleaned_size);
Status _get_need_cache_offsets(size_t offset, size_t req_size,
std::vector<size_t>* cache_offsets);
size_t _calc_cache_file_size();
+ std::pair<Path, Path> _cache_path(size_t offset);
+
private:
+ struct SubFileInfo {
+ size_t offset;
+ int64_t last_match_time;
+ };
+ using SubGcQueue = std::priority_queue<SubFileInfo,
std::vector<SubFileInfo>,
+ SubFileLRUComparator<SubFileInfo>>;
+ // used by gc thread, and currently has no lock protection
+ SubGcQueue _gc_lru_queue;
+
Path _cache_dir;
int64_t _alive_time_sec;
io::FileReaderSPtr _remote_file_reader;
diff --git a/be/src/io/cache/whole_file_cache.cpp
b/be/src/io/cache/whole_file_cache.cpp
index e5c5dc7541..613215c93c 100644
--- a/be/src/io/cache/whole_file_cache.cpp
+++ b/be/src/io/cache/whole_file_cache.cpp
@@ -51,7 +51,6 @@ Status WholeFileCache::read_at(size_t offset, Slice result,
const IOContext& io_
<< ", bytes read: " << bytes_read << " vs required size: "
<< result.size;
return Status::OLAPInternalError(OLAP_ERR_OS_ERROR);
}
- update_last_match_time();
return Status::OK();
}
@@ -135,43 +134,27 @@ Status WholeFileCache::_generate_cache_reader(size_t
offset, size_t req_size) {
Status WholeFileCache::clean_timeout_cache() {
if (time(nullptr) - _last_match_time > _alive_time_sec) {
- _clean_cache_internal();
+ _clean_cache_internal(nullptr);
}
return Status::OK();
}
Status WholeFileCache::clean_all_cache() {
- _clean_cache_internal();
- return Status::OK();
+ return _clean_cache_internal(nullptr);
}
-Status WholeFileCache::_clean_cache_internal() {
+Status WholeFileCache::clean_one_cache(size_t* cleaned_size) {
+ return _clean_cache_internal(cleaned_size);
+}
+
+Status WholeFileCache::_clean_cache_internal(size_t* cleaned_size) {
std::unique_lock<std::shared_mutex> wrlock(_cache_lock);
_cache_file_reader.reset();
_cache_file_size = 0;
Path cache_file = _cache_dir / WHOLE_FILE_CACHE_NAME;
Path done_file =
_cache_dir / fmt::format("{}{}", WHOLE_FILE_CACHE_NAME,
CACHE_DONE_FILE_SUFFIX);
- bool done_file_exist = false;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->exists(done_file, &done_file_exist),
- "Check local done file exist failed.");
- if (done_file_exist) {
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->delete_file(done_file),
- fmt::format("Delete local done file failed: {}",
done_file.native()));
- }
- bool cache_file_exist = false;
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->exists(cache_file,
&cache_file_exist),
- "Check local cache file exist failed.");
- if (cache_file_exist) {
- RETURN_NOT_OK_STATUS_WITH_WARN(
- io::global_local_filesystem()->delete_file(cache_file),
- fmt::format("Delete local cache file failed: {}",
cache_file.native()));
- }
- LOG(INFO) << "Delete local cache file successfully: " <<
cache_file.native();
- return Status::OK();
+ return _remove_file(cache_file, done_file, cleaned_size);
}
} // namespace io
diff --git a/be/src/io/cache/whole_file_cache.h
b/be/src/io/cache/whole_file_cache.h
index e4a7628ff6..b1e27791b0 100644
--- a/be/src/io/cache/whole_file_cache.h
+++ b/be/src/io/cache/whole_file_cache.h
@@ -52,16 +52,26 @@ public:
Status clean_all_cache() override;
+ Status clean_one_cache(size_t* cleaned_size) override;
+
+ int64_t get_oldest_match_time() const override { return _last_match_time;
};
+
+ bool is_gc_finish() const override { return _cache_file_reader == nullptr;
}
+
private:
Status _generate_cache_reader(size_t offset, size_t req_size);
- Status _clean_cache_internal();
+ Status _clean_cache_internal(size_t* cleaned_size);
+
+ void update_last_match_time() { _last_match_time = time(nullptr); }
private:
Path _cache_dir;
int64_t _alive_time_sec;
io::FileReaderSPtr _remote_file_reader;
+ int64_t _last_match_time;
+
std::shared_mutex _cache_lock;
io::FileReaderSPtr _cache_file_reader;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]