This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e2245cbdd3 [improvement](filecache) split file cache into sharding
directories (#16767)
e2245cbdd3 is described below
commit e2245cbdd38920941c60950d8143ccc049e5467e
Author: Ashin Gau <[email protected]>
AuthorDate: Thu Feb 16 16:04:29 2023 +0800
[improvement](filecache) split file cache into sharding directories (#16767)
Save cached file segment into path like `cache_path /
hash(filepath).substr(0, 3) / hash(filepath) / offset`
to prevent too many directories in `cache_path`.
---
be/src/io/cache/block/block_file_cache.cpp | 12 +-
be/src/io/cache/block/block_file_cache.h | 5 +
be/src/io/cache/block/block_lru_file_cache.cpp | 176 ++++++++++++++++++-------
be/src/io/cache/block/block_lru_file_cache.h | 4 +
be/test/io/cache/file_block_cache_test.cpp | 4 +-
5 files changed, 150 insertions(+), 51 deletions(-)
diff --git a/be/src/io/cache/block/block_file_cache.cpp
b/be/src/io/cache/block/block_file_cache.cpp
index 147a0a3339..292903c812 100644
--- a/be/src/io/cache/block/block_file_cache.cpp
+++ b/be/src/io/cache/block/block_file_cache.cpp
@@ -32,6 +32,9 @@ namespace fs = std::filesystem;
namespace doris {
namespace io {
+const std::string IFileCache::FILE_CACHE_VERSION = "2.0";
+const int IFileCache::KEY_PREFIX_LENGTH = 3;
+
IFileCache::IFileCache(const std::string& cache_base_path, const
FileCacheSettings& cache_settings)
: _cache_base_path(cache_base_path),
_max_size(cache_settings.max_size),
@@ -55,12 +58,17 @@ std::string IFileCache::get_path_in_local_cache(const Key&
key, size_t offset,
bool is_persistent) const {
auto key_str = key.to_string();
std::string suffix = is_persistent ? "_persistent" : "";
- return fs::path(_cache_base_path) / key_str / (std::to_string(offset) +
suffix);
+ return fs::path(_cache_base_path) / key_str.substr(0, KEY_PREFIX_LENGTH) /
key_str /
+ (std::to_string(offset) + suffix);
}
std::string IFileCache::get_path_in_local_cache(const Key& key) const {
auto key_str = key.to_string();
- return fs::path(_cache_base_path) / key_str;
+ return fs::path(_cache_base_path) / key_str.substr(0, KEY_PREFIX_LENGTH) /
key_str;
+}
+
+std::string IFileCache::get_version_path() const {
+ return fs::path(_cache_base_path) / "version";
}
IFileCache::QueryFileCacheContextHolderPtr
IFileCache::get_query_context_holder(
diff --git a/be/src/io/cache/block/block_file_cache.h
b/be/src/io/cache/block/block_file_cache.h
index 466a9938d8..3e3113eb3c 100644
--- a/be/src/io/cache/block/block_file_cache.h
+++ b/be/src/io/cache/block/block_file_cache.h
@@ -43,6 +43,9 @@ class IFileCache {
friend struct FileBlocksHolder;
public:
+ static const std::string FILE_CACHE_VERSION;
+ static const int KEY_PREFIX_LENGTH;
+
struct Key {
uint128_t key;
std::string to_string() const;
@@ -73,6 +76,8 @@ public:
std::string get_path_in_local_cache(const Key& key) const;
+ std::string get_version_path() const;
+
const std::string& get_base_path() const { return _cache_base_path; }
virtual std::vector<std::string> try_get_cache_paths(const Key& key, bool
is_persistent) = 0;
diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp
b/be/src/io/cache/block/block_lru_file_cache.cpp
index db3418870e..446c662b88 100644
--- a/be/src/io/cache/block/block_lru_file_cache.cpp
+++ b/be/src/io/cache/block/block_lru_file_cache.cpp
@@ -28,6 +28,8 @@
#include "common/status.h"
#include "io/cache/block/block_file_cache.h"
#include "io/cache/block/block_file_cache_settings.h"
+#include "io/fs/local_file_system.h"
+#include "olap/iterators.h"
#include "util/time.h"
#include "vec/common/hex.h"
#include "vec/common/sip_hash.h"
@@ -53,6 +55,7 @@ Status LRUFileCache::initialize() {
return Status::IOError("cannot create {}: {}",
_cache_base_path,
std::strerror(ec.value()));
}
+ RETURN_IF_ERROR(write_file_cache_version());
}
}
_is_initialized = true;
@@ -634,63 +637,113 @@ void LRUFileCache::remove(const Key& key, bool
is_persistent, size_t offset,
}
void LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>&
cache_lock) {
+ /// version 1.0: cache_base_path / key / offset
+ /// version 2.0: cache_base_path / key_prefix / key / offset
+ if (read_file_cache_version() != FILE_CACHE_VERSION) {
+ // move directories format as version 2.0
+ fs::directory_iterator key_it {_cache_base_path};
+ for (; key_it != fs::directory_iterator(); ++key_it) {
+ if (key_it->is_directory()) {
+ std::string cache_key = key_it->path().filename().native();
+ if (cache_key.size() > KEY_PREFIX_LENGTH) {
+ std::string key_prefix =
+ fs::path(_cache_base_path) / cache_key.substr(0,
KEY_PREFIX_LENGTH);
+ if (!fs::exists(key_prefix)) {
+ std::error_code ec;
+ fs::create_directories(key_prefix, ec);
+ if (ec) {
+ LOG(WARNING) << "Failed to create new version
cached directory: "
+ << ec.message();
+ continue;
+ }
+ }
+ std::error_code ec;
+ std::filesystem::rename(key_it->path(), key_prefix /
cache_key, ec);
+ if (ec) {
+ LOG(WARNING)
+ << "Failed to move old version cached
directory: " << ec.message();
+ }
+ }
+ }
+ }
+ if (!write_file_cache_version().ok()) {
+ LOG(WARNING) << "Failed to write version hints for file cache";
+ }
+ }
+
Key key;
uint64_t offset = 0;
size_t size = 0;
std::vector<std::pair<LRUQueue::Iterator, bool>> queue_entries;
- /// cache_base_path / key / offset
- fs::directory_iterator key_it {_cache_base_path};
- for (; key_it != fs::directory_iterator(); ++key_it) {
- key =
Key(vectorized::unhex_uint<uint128_t>(key_it->path().filename().native().c_str()));
-
- fs::directory_iterator offset_it {key_it->path()};
- for (; offset_it != fs::directory_iterator(); ++offset_it) {
- auto offset_with_suffix = offset_it->path().filename().native();
- auto delim_pos = offset_with_suffix.find('_');
- bool is_persistent = false;
- bool parsed = true;
- try {
- if (delim_pos == std::string::npos) {
- offset = stoull(offset_with_suffix);
- } else {
- offset = stoull(offset_with_suffix.substr(0, delim_pos));
- is_persistent = offset_with_suffix.substr(delim_pos + 1)
== "persistent";
- }
- } catch (...) {
- parsed = false;
- }
+ /// version 2.0: cache_base_path / key_prefix / key / offset
+ fs::directory_iterator key_prefix_it {_cache_base_path};
+ for (; key_prefix_it != fs::directory_iterator(); ++key_prefix_it) {
+ if (!key_prefix_it->is_directory()) {
+ // maybe version hits file
+ continue;
+ }
+ if (key_prefix_it->path().filename().native().size() !=
KEY_PREFIX_LENGTH) {
+ LOG(WARNING) << "Unknown directory " <<
key_prefix_it->path().native()
+ << ", try to remove it";
+ std::filesystem::remove(key_prefix_it->path());
+ continue;
+ }
- if (!parsed) {
- LOG(WARNING) << "Unexpected file: " <<
offset_it->path().native();
- continue; /// Or just remove? Some unexpected file.
- }
+ fs::directory_iterator key_it {key_prefix_it->path()};
+ for (; key_it != fs::directory_iterator(); ++key_it) {
+ key = Key(
+
vectorized::unhex_uint<uint128_t>(key_it->path().filename().native().c_str()));
+
+ fs::directory_iterator offset_it {key_it->path()};
+ for (; offset_it != fs::directory_iterator(); ++offset_it) {
+ auto offset_with_suffix =
offset_it->path().filename().native();
+ auto delim_pos = offset_with_suffix.find('_');
+ bool is_persistent = false;
+ bool parsed = true;
+ try {
+ if (delim_pos == std::string::npos) {
+ offset = stoull(offset_with_suffix);
+ } else {
+ offset = stoull(offset_with_suffix.substr(0,
delim_pos));
+ is_persistent = offset_with_suffix.substr(delim_pos +
1) == "persistent";
+ }
+ } catch (...) {
+ parsed = false;
+ }
- size = offset_it->file_size();
- if (size == 0) {
- std::error_code ec;
- fs::remove(offset_it->path(), ec);
- if (ec) {
- LOG(WARNING) << ec.message();
+ if (!parsed) {
+ LOG(WARNING) << "Unexpected file: " <<
offset_it->path().native();
+ continue; /// Or just remove? Some unexpected file.
}
- continue;
- }
- if (try_reserve(key, TUniqueId(), is_persistent, offset, size,
cache_lock)) {
- auto* cell = add_cell(key, is_persistent, offset, size,
- FileBlock::State::DOWNLOADED,
cache_lock);
- if (cell) {
- queue_entries.emplace_back(*cell->queue_iterator,
is_persistent);
+ size = offset_it->file_size();
+ if (size == 0) {
+ std::error_code ec;
+ fs::remove(offset_it->path(), ec);
+ if (ec) {
+ LOG(WARNING) << ec.message();
+ }
+ continue;
}
- } else {
- LOG(WARNING) << "Cache capacity changed (max size: " <<
_max_size << ", available: "
- <<
get_available_cache_size_unlocked(is_persistent, cache_lock)
- << "), cached file " << key_it->path().string()
- << " does not fit in cache anymore (size: " <<
size << ")";
- std::error_code ec;
- fs::remove(offset_it->path(), ec);
- if (ec) {
- LOG(WARNING) << ec.message();
+
+ if (try_reserve(key, TUniqueId(), is_persistent, offset, size,
cache_lock)) {
+ auto* cell = add_cell(key, is_persistent, offset, size,
+ FileBlock::State::DOWNLOADED,
cache_lock);
+ if (cell) {
+ queue_entries.emplace_back(*cell->queue_iterator,
is_persistent);
+ }
+ } else {
+ LOG(WARNING) << "Cache capacity changed (max size: " <<
_max_size
+ << ", available: "
+ <<
get_available_cache_size_unlocked(is_persistent, cache_lock)
+ << "), cached file " <<
key_it->path().string()
+ << " does not fit in cache anymore (size: "
<< size << ")";
+ std::error_code ec;
+ fs::remove(offset_it->path(), ec);
+ if (ec) {
+ LOG(WARNING) << ec.message();
+ }
}
}
}
@@ -706,6 +759,35 @@ void
LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>& cach
}
}
+Status LRUFileCache::write_file_cache_version() const {
+ std::string version_path = get_version_path();
+ Slice version(FILE_CACHE_VERSION);
+ FileWriterPtr version_writer;
+ RETURN_IF_ERROR(global_local_filesystem()->create_file(version_path,
&version_writer));
+ RETURN_IF_ERROR(version_writer->append(version));
+ return version_writer->close();
+}
+
+std::string LRUFileCache::read_file_cache_version() const {
+ std::string version_path = get_version_path();
+ const FileSystemSPtr& fs = global_local_filesystem();
+ bool exists = false;
+ fs->exists(version_path, &exists);
+ if (!exists) {
+ return "1.0";
+ }
+ FileReaderSPtr version_reader;
+ size_t file_size = 0;
+ fs->file_size(version_path, &file_size);
+ char version[file_size];
+
+ IOContext io_ctx;
+ fs->open_file(version_path, &version_reader, &io_ctx);
+ version_reader->read_at(0, Slice(version, file_size), io_ctx, &file_size);
+ version_reader->close();
+ return std::string(version, file_size);
+}
+
std::vector<std::string> LRUFileCache::try_get_cache_paths(const Key& key,
bool is_persistent) {
std::lock_guard cache_lock(_mutex);
diff --git a/be/src/io/cache/block/block_lru_file_cache.h
b/be/src/io/cache/block/block_lru_file_cache.h
index f9f0862d7b..ac6c09371e 100644
--- a/be/src/io/cache/block/block_lru_file_cache.h
+++ b/be/src/io/cache/block/block_lru_file_cache.h
@@ -130,6 +130,10 @@ private:
void load_cache_info_into_memory(std::lock_guard<std::mutex>& cache_lock);
+ Status write_file_cache_version() const;
+
+ std::string read_file_cache_version() const;
+
FileBlocks split_range_into_cells(const Key& key, const TUniqueId&
query_id, bool is_persistent,
size_t offset, size_t size,
FileBlock::State state,
std::lock_guard<std::mutex>& cache_lock);
diff --git a/be/test/io/cache/file_block_cache_test.cpp
b/be/test/io/cache/file_block_cache_test.cpp
index d24bcc8732..157e20b2f7 100644
--- a/be/test/io/cache/file_block_cache_test.cpp
+++ b/be/test/io/cache/file_block_cache_test.cpp
@@ -54,7 +54,7 @@ std::vector<io::FileBlockSPtr> fromHolder(const
io::FileBlocksHolder& holder) {
std::string getFileBlockPath(const std::string& base_path, const
io::IFileCache::Key& key,
size_t offset) {
auto key_str = key.to_string();
- return fs::path(base_path) / key_str / std::to_string(offset);
+ return fs::path(base_path) / key_str.substr(0, 3) / key_str /
std::to_string(offset);
}
void download(io::FileBlockSPtr file_segment) {
@@ -62,7 +62,7 @@ void download(io::FileBlockSPtr file_segment) {
size_t size = file_segment->range().size();
auto key_str = key.to_string();
- auto subdir = fs::path(cache_base_path) / key_str;
+ auto subdir = fs::path(cache_base_path) / key_str.substr(0, 3) / key_str;
ASSERT_TRUE(fs::exists(subdir));
std::string data(size, '0');
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]