This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e2245cbdd3 [improvement](filecache) split file cache into sharding 
directories (#16767)
e2245cbdd3 is described below

commit e2245cbdd38920941c60950d8143ccc049e5467e
Author: Ashin Gau <[email protected]>
AuthorDate: Thu Feb 16 16:04:29 2023 +0800

    [improvement](filecache) split file cache into sharding directories (#16767)
    
    Save cached file segment into path like `cache_path / 
hash(filepath).substr(0, 3) / hash(filepath) / offset`
    to prevent too many directories in `cache_path`.
---
 be/src/io/cache/block/block_file_cache.cpp     |  12 +-
 be/src/io/cache/block/block_file_cache.h       |   5 +
 be/src/io/cache/block/block_lru_file_cache.cpp | 176 ++++++++++++++++++-------
 be/src/io/cache/block/block_lru_file_cache.h   |   4 +
 be/test/io/cache/file_block_cache_test.cpp     |   4 +-
 5 files changed, 150 insertions(+), 51 deletions(-)

diff --git a/be/src/io/cache/block/block_file_cache.cpp 
b/be/src/io/cache/block/block_file_cache.cpp
index 147a0a3339..292903c812 100644
--- a/be/src/io/cache/block/block_file_cache.cpp
+++ b/be/src/io/cache/block/block_file_cache.cpp
@@ -32,6 +32,9 @@ namespace fs = std::filesystem;
 namespace doris {
 namespace io {
 
+const std::string IFileCache::FILE_CACHE_VERSION = "2.0";
+const int IFileCache::KEY_PREFIX_LENGTH = 3;
+
 IFileCache::IFileCache(const std::string& cache_base_path, const 
FileCacheSettings& cache_settings)
         : _cache_base_path(cache_base_path),
           _max_size(cache_settings.max_size),
@@ -55,12 +58,17 @@ std::string IFileCache::get_path_in_local_cache(const Key& 
key, size_t offset,
                                                 bool is_persistent) const {
     auto key_str = key.to_string();
     std::string suffix = is_persistent ? "_persistent" : "";
-    return fs::path(_cache_base_path) / key_str / (std::to_string(offset) + 
suffix);
+    return fs::path(_cache_base_path) / key_str.substr(0, KEY_PREFIX_LENGTH) / 
key_str /
+           (std::to_string(offset) + suffix);
 }
 
 std::string IFileCache::get_path_in_local_cache(const Key& key) const {
     auto key_str = key.to_string();
-    return fs::path(_cache_base_path) / key_str;
+    return fs::path(_cache_base_path) / key_str.substr(0, KEY_PREFIX_LENGTH) / 
key_str;
+}
+
+std::string IFileCache::get_version_path() const {
+    return fs::path(_cache_base_path) / "version";
 }
 
 IFileCache::QueryFileCacheContextHolderPtr 
IFileCache::get_query_context_holder(
diff --git a/be/src/io/cache/block/block_file_cache.h 
b/be/src/io/cache/block/block_file_cache.h
index 466a9938d8..3e3113eb3c 100644
--- a/be/src/io/cache/block/block_file_cache.h
+++ b/be/src/io/cache/block/block_file_cache.h
@@ -43,6 +43,9 @@ class IFileCache {
     friend struct FileBlocksHolder;
 
 public:
+    static const std::string FILE_CACHE_VERSION;
+    static const int KEY_PREFIX_LENGTH;
+
     struct Key {
         uint128_t key;
         std::string to_string() const;
@@ -73,6 +76,8 @@ public:
 
     std::string get_path_in_local_cache(const Key& key) const;
 
+    std::string get_version_path() const;
+
     const std::string& get_base_path() const { return _cache_base_path; }
 
     virtual std::vector<std::string> try_get_cache_paths(const Key& key, bool 
is_persistent) = 0;
diff --git a/be/src/io/cache/block/block_lru_file_cache.cpp 
b/be/src/io/cache/block/block_lru_file_cache.cpp
index db3418870e..446c662b88 100644
--- a/be/src/io/cache/block/block_lru_file_cache.cpp
+++ b/be/src/io/cache/block/block_lru_file_cache.cpp
@@ -28,6 +28,8 @@
 #include "common/status.h"
 #include "io/cache/block/block_file_cache.h"
 #include "io/cache/block/block_file_cache_settings.h"
+#include "io/fs/local_file_system.h"
+#include "olap/iterators.h"
 #include "util/time.h"
 #include "vec/common/hex.h"
 #include "vec/common/sip_hash.h"
@@ -53,6 +55,7 @@ Status LRUFileCache::initialize() {
                 return Status::IOError("cannot create {}: {}", 
_cache_base_path,
                                        std::strerror(ec.value()));
             }
+            RETURN_IF_ERROR(write_file_cache_version());
         }
     }
     _is_initialized = true;
@@ -634,63 +637,113 @@ void LRUFileCache::remove(const Key& key, bool 
is_persistent, size_t offset,
 }
 
 void LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>& 
cache_lock) {
+    /// version 1.0: cache_base_path / key / offset
+    /// version 2.0: cache_base_path / key_prefix / key / offset
+    if (read_file_cache_version() != FILE_CACHE_VERSION) {
+        // move directories format as version 2.0
+        fs::directory_iterator key_it {_cache_base_path};
+        for (; key_it != fs::directory_iterator(); ++key_it) {
+            if (key_it->is_directory()) {
+                std::string cache_key = key_it->path().filename().native();
+                if (cache_key.size() > KEY_PREFIX_LENGTH) {
+                    std::string key_prefix =
+                            fs::path(_cache_base_path) / cache_key.substr(0, 
KEY_PREFIX_LENGTH);
+                    if (!fs::exists(key_prefix)) {
+                        std::error_code ec;
+                        fs::create_directories(key_prefix, ec);
+                        if (ec) {
+                            LOG(WARNING) << "Failed to create new version 
cached directory: "
+                                         << ec.message();
+                            continue;
+                        }
+                    }
+                    std::error_code ec;
+                    std::filesystem::rename(key_it->path(), key_prefix / 
cache_key, ec);
+                    if (ec) {
+                        LOG(WARNING)
+                                << "Failed to move old version cached 
directory: " << ec.message();
+                    }
+                }
+            }
+        }
+        if (!write_file_cache_version().ok()) {
+            LOG(WARNING) << "Failed to write version hints for file cache";
+        }
+    }
+
     Key key;
     uint64_t offset = 0;
     size_t size = 0;
     std::vector<std::pair<LRUQueue::Iterator, bool>> queue_entries;
 
-    /// cache_base_path / key / offset
-    fs::directory_iterator key_it {_cache_base_path};
-    for (; key_it != fs::directory_iterator(); ++key_it) {
-        key = 
Key(vectorized::unhex_uint<uint128_t>(key_it->path().filename().native().c_str()));
-
-        fs::directory_iterator offset_it {key_it->path()};
-        for (; offset_it != fs::directory_iterator(); ++offset_it) {
-            auto offset_with_suffix = offset_it->path().filename().native();
-            auto delim_pos = offset_with_suffix.find('_');
-            bool is_persistent = false;
-            bool parsed = true;
-            try {
-                if (delim_pos == std::string::npos) {
-                    offset = stoull(offset_with_suffix);
-                } else {
-                    offset = stoull(offset_with_suffix.substr(0, delim_pos));
-                    is_persistent = offset_with_suffix.substr(delim_pos + 1) 
== "persistent";
-                }
-            } catch (...) {
-                parsed = false;
-            }
+    /// version 2.0: cache_base_path / key_prefix / key / offset
+    fs::directory_iterator key_prefix_it {_cache_base_path};
+    for (; key_prefix_it != fs::directory_iterator(); ++key_prefix_it) {
+        if (!key_prefix_it->is_directory()) {
+            // maybe version hits file
+            continue;
+        }
+        if (key_prefix_it->path().filename().native().size() != 
KEY_PREFIX_LENGTH) {
+            LOG(WARNING) << "Unknown directory " << 
key_prefix_it->path().native()
+                         << ", try to remove it";
+            std::filesystem::remove(key_prefix_it->path());
+            continue;
+        }
 
-            if (!parsed) {
-                LOG(WARNING) << "Unexpected file: " << 
offset_it->path().native();
-                continue; /// Or just remove? Some unexpected file.
-            }
+        fs::directory_iterator key_it {key_prefix_it->path()};
+        for (; key_it != fs::directory_iterator(); ++key_it) {
+            key = Key(
+                    
vectorized::unhex_uint<uint128_t>(key_it->path().filename().native().c_str()));
+
+            fs::directory_iterator offset_it {key_it->path()};
+            for (; offset_it != fs::directory_iterator(); ++offset_it) {
+                auto offset_with_suffix = 
offset_it->path().filename().native();
+                auto delim_pos = offset_with_suffix.find('_');
+                bool is_persistent = false;
+                bool parsed = true;
+                try {
+                    if (delim_pos == std::string::npos) {
+                        offset = stoull(offset_with_suffix);
+                    } else {
+                        offset = stoull(offset_with_suffix.substr(0, 
delim_pos));
+                        is_persistent = offset_with_suffix.substr(delim_pos + 
1) == "persistent";
+                    }
+                } catch (...) {
+                    parsed = false;
+                }
 
-            size = offset_it->file_size();
-            if (size == 0) {
-                std::error_code ec;
-                fs::remove(offset_it->path(), ec);
-                if (ec) {
-                    LOG(WARNING) << ec.message();
+                if (!parsed) {
+                    LOG(WARNING) << "Unexpected file: " << 
offset_it->path().native();
+                    continue; /// Or just remove? Some unexpected file.
                 }
-                continue;
-            }
 
-            if (try_reserve(key, TUniqueId(), is_persistent, offset, size, 
cache_lock)) {
-                auto* cell = add_cell(key, is_persistent, offset, size,
-                                      FileBlock::State::DOWNLOADED, 
cache_lock);
-                if (cell) {
-                    queue_entries.emplace_back(*cell->queue_iterator, 
is_persistent);
+                size = offset_it->file_size();
+                if (size == 0) {
+                    std::error_code ec;
+                    fs::remove(offset_it->path(), ec);
+                    if (ec) {
+                        LOG(WARNING) << ec.message();
+                    }
+                    continue;
                 }
-            } else {
-                LOG(WARNING) << "Cache capacity changed (max size: " << 
_max_size << ", available: "
-                             << 
get_available_cache_size_unlocked(is_persistent, cache_lock)
-                             << "), cached file " << key_it->path().string()
-                             << " does not fit in cache anymore (size: " << 
size << ")";
-                std::error_code ec;
-                fs::remove(offset_it->path(), ec);
-                if (ec) {
-                    LOG(WARNING) << ec.message();
+
+                if (try_reserve(key, TUniqueId(), is_persistent, offset, size, 
cache_lock)) {
+                    auto* cell = add_cell(key, is_persistent, offset, size,
+                                          FileBlock::State::DOWNLOADED, 
cache_lock);
+                    if (cell) {
+                        queue_entries.emplace_back(*cell->queue_iterator, 
is_persistent);
+                    }
+                } else {
+                    LOG(WARNING) << "Cache capacity changed (max size: " << 
_max_size
+                                 << ", available: "
+                                 << 
get_available_cache_size_unlocked(is_persistent, cache_lock)
+                                 << "), cached file " << 
key_it->path().string()
+                                 << " does not fit in cache anymore (size: " 
<< size << ")";
+                    std::error_code ec;
+                    fs::remove(offset_it->path(), ec);
+                    if (ec) {
+                        LOG(WARNING) << ec.message();
+                    }
                 }
             }
         }
@@ -706,6 +759,35 @@ void 
LRUFileCache::load_cache_info_into_memory(std::lock_guard<std::mutex>& cach
     }
 }
 
+Status LRUFileCache::write_file_cache_version() const {
+    std::string version_path = get_version_path();
+    Slice version(FILE_CACHE_VERSION);
+    FileWriterPtr version_writer;
+    RETURN_IF_ERROR(global_local_filesystem()->create_file(version_path, 
&version_writer));
+    RETURN_IF_ERROR(version_writer->append(version));
+    return version_writer->close();
+}
+
+std::string LRUFileCache::read_file_cache_version() const {
+    std::string version_path = get_version_path();
+    const FileSystemSPtr& fs = global_local_filesystem();
+    bool exists = false;
+    fs->exists(version_path, &exists);
+    if (!exists) {
+        return "1.0";
+    }
+    FileReaderSPtr version_reader;
+    size_t file_size = 0;
+    fs->file_size(version_path, &file_size);
+    char version[file_size];
+
+    IOContext io_ctx;
+    fs->open_file(version_path, &version_reader, &io_ctx);
+    version_reader->read_at(0, Slice(version, file_size), io_ctx, &file_size);
+    version_reader->close();
+    return std::string(version, file_size);
+}
+
 std::vector<std::string> LRUFileCache::try_get_cache_paths(const Key& key, 
bool is_persistent) {
     std::lock_guard cache_lock(_mutex);
 
diff --git a/be/src/io/cache/block/block_lru_file_cache.h 
b/be/src/io/cache/block/block_lru_file_cache.h
index f9f0862d7b..ac6c09371e 100644
--- a/be/src/io/cache/block/block_lru_file_cache.h
+++ b/be/src/io/cache/block/block_lru_file_cache.h
@@ -130,6 +130,10 @@ private:
 
     void load_cache_info_into_memory(std::lock_guard<std::mutex>& cache_lock);
 
+    Status write_file_cache_version() const;
+
+    std::string read_file_cache_version() const;
+
     FileBlocks split_range_into_cells(const Key& key, const TUniqueId& 
query_id, bool is_persistent,
                                       size_t offset, size_t size, 
FileBlock::State state,
                                       std::lock_guard<std::mutex>& cache_lock);
diff --git a/be/test/io/cache/file_block_cache_test.cpp 
b/be/test/io/cache/file_block_cache_test.cpp
index d24bcc8732..157e20b2f7 100644
--- a/be/test/io/cache/file_block_cache_test.cpp
+++ b/be/test/io/cache/file_block_cache_test.cpp
@@ -54,7 +54,7 @@ std::vector<io::FileBlockSPtr> fromHolder(const 
io::FileBlocksHolder& holder) {
 std::string getFileBlockPath(const std::string& base_path, const 
io::IFileCache::Key& key,
                              size_t offset) {
     auto key_str = key.to_string();
-    return fs::path(base_path) / key_str / std::to_string(offset);
+    return fs::path(base_path) / key_str.substr(0, 3) / key_str / 
std::to_string(offset);
 }
 
 void download(io::FileBlockSPtr file_segment) {
@@ -62,7 +62,7 @@ void download(io::FileBlockSPtr file_segment) {
     size_t size = file_segment->range().size();
 
     auto key_str = key.to_string();
-    auto subdir = fs::path(cache_base_path) / key_str;
+    auto subdir = fs::path(cache_base_path) / key_str.substr(0, 3) / key_str;
     ASSERT_TRUE(fs::exists(subdir));
 
     std::string data(size, '0');


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to