This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c71514dd406 [enhancement](cloud) clarify codes and make TTL expiration
work after abnormal cache type transition (#40226)
c71514dd406 is described below
commit c71514dd4060b1855fab3ca125534a46b30bb521
Author: zhengyu <[email protected]>
AuthorDate: Thu Sep 12 17:42:34 2024 +0800
[enhancement](cloud) clarify codes and make TTL expiration work after
abnormal cache type transition (#40226)
current TTL embeds the expiration time and type into filename and path.
Maintaining both is buggy for lack of atomicity. I simplify this by
using only expiration time to infer the type so that we need only
expiration time.
Signed-off-by: freemandealer <[email protected]>
---
be/src/common/config.cpp | 2 +
be/src/common/config.h | 2 +
be/src/io/cache/block_file_cache.cpp | 49 +++--
be/src/io/cache/block_file_cache.h | 12 +-
be/src/io/cache/file_block.cpp | 40 ++--
be/src/io/cache/file_block.h | 4 +-
be/src/io/cache/file_cache_storage.h | 4 +-
be/src/io/cache/fs_file_cache_storage.cpp | 256 ++++++++++++++---------
be/src/io/cache/fs_file_cache_storage.h | 15 +-
be/src/olap/rowset/segment_v2/segment_writer.cpp | 3 +-
be/test/io/cache/block_file_cache_test.cpp | 246 +++++++++++++++-------
be/test/io/fs/s3_file_writer_test.cpp | 4 +-
12 files changed, 420 insertions(+), 217 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 9a554230ce7..06144dd3142 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -997,6 +997,8 @@ DEFINE_mInt64(file_cache_ttl_valid_check_interval_second,
"0"); // zero for not
// If true, evict the ttl cache using LRU when full.
// Otherwise, only expiration can evict ttl and new data won't add to cache
when full.
DEFINE_Bool(enable_ttl_cache_evict_using_lru, "true");
+// rename ttl filename to new format during read, with some performance cost
+DEFINE_mBool(translate_to_new_ttl_format_during_read, "false");
DEFINE_mInt32(index_cache_entry_stay_time_after_lookup_s, "1800");
DEFINE_mInt32(inverted_index_cache_stale_sweep_time_sec, "600");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 5bca9ac280a..cc26f52abba 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1046,6 +1046,8 @@
DECLARE_mInt64(file_cache_ttl_valid_check_interval_second);
// If true, evict the ttl cache using LRU when full.
// Otherwise, only expiration can evict ttl and new data won't add to cache
when full.
DECLARE_Bool(enable_ttl_cache_evict_using_lru);
+// rename ttl filename to new format during read, with some performance cost
+DECLARE_Bool(translate_to_new_ttl_format_during_read);
// inverted index searcher cache
// cache entry stay time after lookup
diff --git a/be/src/io/cache/block_file_cache.cpp
b/be/src/io/cache/block_file_cache.cpp
index c253130cf3b..f5c0a7c79bf 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -261,7 +261,7 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper&
hash, const CacheConte
/// find list [block1, ..., blockN] of blocks which intersect with given
range.
auto it = _files.find(hash);
if (it == _files.end()) {
- if (_lazy_open_done) {
+ if (_async_open_done) {
return {};
}
FileCacheKey key;
@@ -285,11 +285,10 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper&
hash, const CacheConte
if (!st.ok()) {
LOG_WARNING("Failed to change key meta").error(st);
}
- }
- for (auto& [_, cell] : file_blocks) {
+
FileCacheType origin_type = cell.file_block->cache_type();
if (origin_type == FileCacheType::TTL) continue;
- Status st =
cell.file_block->change_cache_type_by_mgr(FileCacheType::TTL);
+ st =
cell.file_block->change_cache_type_between_ttl_and_others(FileCacheType::TTL);
if (st.ok()) {
auto& queue = get_queue(origin_type);
queue.remove(cell.queue_iterator.value(), cache_lock);
@@ -309,6 +308,7 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper&
hash, const CacheConte
_time_to_key.insert(std::make_pair(context.expiration_time, hash));
}
if (auto iter = _key_to_time.find(hash);
+ // TODO(zhengyu): Why the hell the type is NORMAL while context set
expiration_time?
(context.cache_type == FileCacheType::NORMAL || context.cache_type ==
FileCacheType::TTL) &&
iter != _key_to_time.end() && iter->second != context.expiration_time)
{
// remove from _time_to_key
@@ -330,7 +330,8 @@ FileBlocks BlockFileCache::get_impl(const UInt128Wrapper&
hash, const CacheConte
for (auto& [_, cell] : file_blocks) {
auto cache_type = cell.file_block->cache_type();
if (cache_type != FileCacheType::TTL) continue;
- auto st =
cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL);
+ auto st =
cell.file_block->change_cache_type_between_ttl_and_others(
+ FileCacheType::NORMAL);
if (st.ok()) {
if (config::enable_ttl_cache_evict_using_lru) {
auto& ttl_queue = get_queue(FileCacheType::TTL);
@@ -699,9 +700,9 @@ BlockFileCache::FileBlockCell*
BlockFileCache::add_cell(const UInt128Wrapper& ha
FileBlockCell cell(std::make_shared<FileBlock>(key, size, this, state),
cache_lock);
Status st;
if (context.expiration_time == 0 && context.cache_type ==
FileCacheType::TTL) {
- st = cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL);
+ st =
cell.file_block->change_cache_type_between_ttl_and_others(FileCacheType::NORMAL);
} else if (context.cache_type != FileCacheType::TTL &&
context.expiration_time != 0) {
- st = cell.file_block->change_cache_type_by_mgr(FileCacheType::TTL);
+ st =
cell.file_block->change_cache_type_between_ttl_and_others(FileCacheType::TTL);
}
if (!st.ok()) {
LOG(WARNING) << "Cannot change cache type. expiration_time=" <<
context.expiration_time
@@ -912,8 +913,8 @@ bool BlockFileCache::try_reserve_for_ttl(size_t size,
std::lock_guard<std::mutex
bool BlockFileCache::try_reserve(const UInt128Wrapper& hash, const
CacheContext& context,
size_t offset, size_t size,
std::lock_guard<std::mutex>& cache_lock) {
- if (!_lazy_open_done) {
- return try_reserve_for_lazy_load(size, cache_lock);
+ if (!_async_open_done) {
+ return try_reserve_during_async_load(size, cache_lock);
}
// use this strategy in scenarios where there is insufficient disk
capacity or insufficient number of inodes remaining
@@ -1022,10 +1023,10 @@ bool BlockFileCache::remove_if_ttl_file_unlock(const
UInt128Wrapper& file_key, b
LOG_WARNING("Failed to update expiration time to
0").error(st);
}
}
- }
- for (auto& [_, cell] : _files[file_key]) {
+
if (cell.file_block->cache_type() == FileCacheType::NORMAL)
continue;
- auto st =
cell.file_block->change_cache_type_by_mgr(FileCacheType::NORMAL);
+ auto st =
cell.file_block->change_cache_type_between_ttl_and_others(
+ FileCacheType::NORMAL);
if (st.ok()) {
if (config::enable_ttl_cache_evict_using_lru) {
ttl_queue.remove(cell.queue_iterator.value(),
cache_lock);
@@ -1396,6 +1397,21 @@ std::string
BlockFileCache::dump_structure_unlocked(const UInt128Wrapper& hash,
return result.str();
}
+std::string BlockFileCache::dump_single_cache_type(const UInt128Wrapper& hash,
size_t offset) {
+ std::lock_guard cache_lock(_mutex);
+ return dump_single_cache_type_unlocked(hash, offset, cache_lock);
+}
+
+std::string BlockFileCache::dump_single_cache_type_unlocked(const
UInt128Wrapper& hash,
+ size_t offset,
+
std::lock_guard<std::mutex>&) {
+ std::stringstream result;
+ const auto& cells_by_offset = _files[hash];
+ const auto& cell = cells_by_offset.find(offset);
+
+ return cache_type_to_string(cell->second.file_block->cache_type());
+}
+
void BlockFileCache::change_cache_type(const UInt128Wrapper& hash, size_t
offset,
FileCacheType new_type,
std::lock_guard<std::mutex>&
cache_lock) {
@@ -1621,11 +1637,10 @@ void BlockFileCache::modify_expiration_time(const
UInt128Wrapper& hash,
if (!st.ok()) {
LOG_WARNING("").error(st);
}
- }
- for (auto& [_, cell] : iter->second) {
+
FileCacheType origin_type = cell.file_block->cache_type();
if (origin_type == FileCacheType::TTL) continue;
- auto st =
cell.file_block->change_cache_type_by_mgr(FileCacheType::TTL);
+ st =
cell.file_block->change_cache_type_between_ttl_and_others(FileCacheType::TTL);
if (st.ok()) {
auto& queue = get_queue(origin_type);
queue.remove(cell.queue_iterator.value(), cache_lock);
@@ -1672,8 +1687,8 @@ BlockFileCache::get_hot_blocks_meta(const UInt128Wrapper&
hash) const {
return blocks_meta;
}
-bool BlockFileCache::try_reserve_for_lazy_load(size_t size,
- std::lock_guard<std::mutex>&
cache_lock) {
+bool BlockFileCache::try_reserve_during_async_load(size_t size,
+
std::lock_guard<std::mutex>& cache_lock) {
size_t removed_size = 0;
size_t normal_queue_size = _normal_queue.get_capacity(cache_lock);
size_t disposable_queue_size = _disposable_queue.get_capacity(cache_lock);
diff --git a/be/src/io/cache/block_file_cache.h
b/be/src/io/cache/block_file_cache.h
index cd44e77eaa3..def354b155b 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -104,8 +104,9 @@ public:
std::string reset_capacity(size_t new_capacity);
std::map<size_t, FileBlockSPtr> get_blocks_by_key(const UInt128Wrapper&
hash);
- /// For debug.
+ /// For debug and UT
std::string dump_structure(const UInt128Wrapper& hash);
+ std::string dump_single_cache_type(const UInt128Wrapper& hash, size_t
offset);
[[nodiscard]] size_t get_used_cache_size(FileCacheType type) const;
@@ -130,7 +131,7 @@ public:
[[nodiscard]] std::vector<std::tuple<size_t, size_t, FileCacheType,
uint64_t>>
get_hot_blocks_meta(const UInt128Wrapper& hash) const;
- [[nodiscard]] bool get_lazy_open_success() const { return _lazy_open_done;
}
+ [[nodiscard]] bool get_async_open_success() const { return
_async_open_done; }
BlockFileCache& operator=(const BlockFileCache&) = delete;
BlockFileCache(const BlockFileCache&) = delete;
@@ -338,7 +339,7 @@ private:
const CacheContext& context, size_t offset,
size_t size,
std::lock_guard<std::mutex>& cache_lock);
- bool try_reserve_for_lazy_load(size_t size, std::lock_guard<std::mutex>&
cache_lock);
+ bool try_reserve_during_async_load(size_t size,
std::lock_guard<std::mutex>& cache_lock);
std::vector<FileCacheType> get_other_cache_type(FileCacheType
cur_cache_type);
@@ -358,6 +359,9 @@ private:
std::string dump_structure_unlocked(const UInt128Wrapper& hash,
std::lock_guard<std::mutex>&
cache_lock);
+ std::string dump_single_cache_type_unlocked(const UInt128Wrapper& hash,
size_t offset,
+ std::lock_guard<std::mutex>&
cache_lock);
+
void fill_holes_with_empty_file_blocks(FileBlocks& file_blocks, const
UInt128Wrapper& hash,
const CacheContext& context,
const FileBlock::Range& range,
@@ -413,7 +417,7 @@ private:
std::mutex _close_mtx;
std::condition_variable _close_cv;
std::thread _cache_background_thread;
- std::atomic_bool _lazy_open_done {false};
+ std::atomic_bool _async_open_done {false};
bool _async_clear_file_cache {false};
// disk space or inode is less than the specified value
bool _disk_resource_limit_mode {false};
diff --git a/be/src/io/cache/file_block.cpp b/be/src/io/cache/file_block.cpp
index 6586dcf589b..b015cbd6111 100644
--- a/be/src/io/cache/file_block.cpp
+++ b/be/src/io/cache/file_block.cpp
@@ -161,32 +161,41 @@ Status FileBlock::read(Slice buffer, size_t read_offset) {
return _mgr->_storage->read(_key, read_offset, buffer);
}
-Status FileBlock::change_cache_type_by_mgr(FileCacheType new_type) {
+Status FileBlock::change_cache_type_between_ttl_and_others(FileCacheType
new_type) {
std::lock_guard block_lock(_mutex);
DCHECK(new_type != _key.meta.type);
- if (_download_state == State::DOWNLOADED) {
- KeyMeta new_meta;
- new_meta.expiration_time = _key.meta.expiration_time;
- new_meta.type = new_type;
- auto st = _mgr->_storage->change_key_meta(_key, new_meta);
- TEST_SYNC_POINT_CALLBACK("FileBlock::change_cache_type", &st);
- if (!st.ok()) return st;
+ bool expr = (new_type == FileCacheType::TTL || _key.meta.type ==
FileCacheType::TTL);
+ if (!expr) {
+ LOG(WARNING) << "none of the cache type is TTL"
+ << ", hash: " << _key.hash.to_string() << ", offset: " <<
_key.offset
+ << ", new type: " <<
BlockFileCache::cache_type_to_string(new_type)
+ << ", old type: " <<
BlockFileCache::cache_type_to_string(_key.meta.type);
}
+ DCHECK(expr);
+
+ // change cache type between TTL to others don't need to rename the
filename suffix
_key.meta.type = new_type;
return Status::OK();
}
-Status FileBlock::change_cache_type_self(FileCacheType new_type) {
+Status FileBlock::change_cache_type_between_normal_and_index(FileCacheType
new_type) {
std::lock_guard cache_lock(_mgr->_mutex);
std::lock_guard block_lock(_mutex);
+ bool expr = (new_type != FileCacheType::TTL && _key.meta.type !=
FileCacheType::TTL);
+ if (!expr) {
+ LOG(WARNING) << "one of the cache type is TTL"
+ << ", hash: " << _key.hash.to_string() << ", offset: " <<
_key.offset
+ << ", new type: " <<
BlockFileCache::cache_type_to_string(new_type)
+ << ", old type: " <<
BlockFileCache::cache_type_to_string(_key.meta.type);
+ }
+ DCHECK(expr);
if (_key.meta.type == FileCacheType::TTL || new_type == _key.meta.type) {
return Status::OK();
}
if (_download_state == State::DOWNLOADED) {
- KeyMeta new_meta;
- new_meta.expiration_time = _key.meta.expiration_time;
- new_meta.type = new_type;
- RETURN_IF_ERROR(_mgr->_storage->change_key_meta(_key, new_meta));
+ Status st;
+ TEST_SYNC_POINT_CALLBACK("FileBlock::change_cache_type", &st);
+ RETURN_IF_ERROR(_mgr->_storage->change_key_meta_type(_key, new_type));
}
_mgr->change_cache_type(_key.hash, _block_range.left, new_type,
cache_lock);
_key.meta.type = new_type;
@@ -196,10 +205,7 @@ Status FileBlock::change_cache_type_self(FileCacheType
new_type) {
Status FileBlock::update_expiration_time(uint64_t expiration_time) {
std::lock_guard block_lock(_mutex);
if (_download_state == State::DOWNLOADED) {
- KeyMeta new_meta;
- new_meta.expiration_time = expiration_time;
- new_meta.type = _key.meta.type;
- auto st = _mgr->_storage->change_key_meta(_key, new_meta);
+ auto st = _mgr->_storage->change_key_meta_expiration(_key,
expiration_time);
if (!st.ok() && !st.is<ErrorCode::NOT_FOUND>()) {
return st;
}
diff --git a/be/src/io/cache/file_block.h b/be/src/io/cache/file_block.h
index b4044786dc7..6e49a597b7b 100644
--- a/be/src/io/cache/file_block.h
+++ b/be/src/io/cache/file_block.h
@@ -115,9 +115,9 @@ public:
std::string get_info_for_log() const;
- [[nodiscard]] Status change_cache_type_by_mgr(FileCacheType new_type);
+ [[nodiscard]] Status
change_cache_type_between_ttl_and_others(FileCacheType new_type);
- [[nodiscard]] Status change_cache_type_self(FileCacheType new_type);
+ [[nodiscard]] Status
change_cache_type_between_normal_and_index(FileCacheType new_type);
[[nodiscard]] Status update_expiration_time(uint64_t expiration_time);
diff --git a/be/src/io/cache/file_cache_storage.h
b/be/src/io/cache/file_cache_storage.h
index 64639356f14..4120fe0ca5a 100644
--- a/be/src/io/cache/file_cache_storage.h
+++ b/be/src/io/cache/file_cache_storage.h
@@ -40,7 +40,9 @@ public:
// remove the block
virtual Status remove(const FileCacheKey& key) = 0;
// change the block meta
- virtual Status change_key_meta(const FileCacheKey& key, const KeyMeta&
new_meta) = 0;
+ virtual Status change_key_meta_type(const FileCacheKey& key, const
FileCacheType type) = 0;
+ virtual Status change_key_meta_expiration(const FileCacheKey& key,
+ const uint64_t expiration) = 0;
// use when lazy load cache
virtual void load_blocks_directly_unlocked(BlockFileCache* _mgr, const
FileCacheKey& key,
std::lock_guard<std::mutex>&
cache_lock) {}
diff --git a/be/src/io/cache/fs_file_cache_storage.cpp
b/be/src/io/cache/fs_file_cache_storage.cpp
index 34e62d6fe6f..d2662ba36d0 100644
--- a/be/src/io/cache/fs_file_cache_storage.cpp
+++ b/be/src/io/cache/fs_file_cache_storage.cpp
@@ -100,10 +100,10 @@ size_t FDCache::file_reader_cache_size() {
Status FSFileCacheStorage::init(BlockFileCache* _mgr) {
_cache_base_path = _mgr->_cache_base_path;
- RETURN_IF_ERROR(rebuild_data_structure());
+ RETURN_IF_ERROR(upgrade_cache_dir_if_necessary());
_cache_background_load_thread = std::thread([this, mgr = _mgr]() {
load_cache_info_into_memory(mgr);
- mgr->_lazy_open_done = true;
+ mgr->_async_open_done = true;
LOG_INFO("FileCache {} lazy load done.", _cache_base_path);
});
return Status::OK();
@@ -159,7 +159,27 @@ Status FSFileCacheStorage::read(const FileCacheKey& key,
size_t value_offset, Sl
std::string file =
get_path_in_local_cache(get_path_in_local_cache(key.hash,
key.meta.expiration_time),
key.offset, key.meta.type);
- RETURN_IF_ERROR(fs->open_file(file, &file_reader));
+ Status s = fs->open_file(file, &file_reader);
+ if (!s.ok()) {
+ if (!s.is<ErrorCode::NOT_FOUND>() || key.meta.type !=
FileCacheType::TTL) {
+ return s;
+ }
+ std::string file_old_format =
get_path_in_local_cache_old_ttl_format(
+ get_path_in_local_cache(key.hash,
key.meta.expiration_time), key.offset,
+ key.meta.type);
+ if (config::translate_to_new_ttl_format_during_read) {
+ // try to rename the file with old ttl format to new and retry
+ VLOG(7) << "try to rename the file with old ttl format to new
and retry"
+ << " oldformat=" << file_old_format << " original=" <<
file;
+ RETURN_IF_ERROR(fs->rename(file_old_format, file));
+ RETURN_IF_ERROR(fs->open_file(file, &file_reader));
+ } else {
+ // try to open the file with old ttl format
+ VLOG(7) << "try to open the file with old ttl format"
+ << " oldformat=" << file_old_format << " original=" <<
file;
+ RETURN_IF_ERROR(fs->open_file(file_old_format, &file_reader));
+ }
+ }
FDCache::instance()->insert_file_reader(fd_key, file_reader);
}
size_t bytes_read = 0;
@@ -173,6 +193,19 @@ Status FSFileCacheStorage::remove(const FileCacheKey& key)
{
std::string file = get_path_in_local_cache(dir, key.offset, key.meta.type);
FDCache::instance()->remove_file_reader(std::make_pair(key.hash,
key.offset));
RETURN_IF_ERROR(fs->delete_file(file));
+ // return OK not means the file is deleted, it may be not exist
+ // So for TTL, we make sure the old format will be removed well
+ if (key.meta.type == FileCacheType::TTL) {
+ bool exists {false};
+ // try to detect the file with old ttl format
+ file = get_path_in_local_cache_old_ttl_format(dir, key.offset,
key.meta.type);
+ RETURN_IF_ERROR(fs->exists(file, &exists));
+ if (exists) {
+ VLOG(7) << "try to remove the file with old ttl format"
+ << " file=" << file;
+ RETURN_IF_ERROR(fs->delete_file(file));
+ }
+ }
std::vector<FileInfo> files;
bool exists {false};
RETURN_IF_ERROR(fs->list(dir, true, &files, &exists));
@@ -183,29 +216,58 @@ Status FSFileCacheStorage::remove(const FileCacheKey&
key) {
return Status::OK();
}
-Status FSFileCacheStorage::change_key_meta(const FileCacheKey& key, const
KeyMeta& new_meta) {
- // TTL change
- if (key.meta.expiration_time != new_meta.expiration_time) {
+Status FSFileCacheStorage::change_key_meta_type(const FileCacheKey& key, const
FileCacheType type) {
+ // file operation
+ if (key.meta.type != type) {
+ // TTL type file dose not need to change the suffix
+ bool expr = (key.meta.type != FileCacheType::TTL && type !=
FileCacheType::TTL);
+ if (!expr) {
+ LOG(WARNING) << "TTL type file dose not need to change the suffix"
+ << " key=" << key.hash.to_string() << " offset=" <<
key.offset
+ << " old_type=" <<
BlockFileCache::cache_type_to_string(key.meta.type)
+ << " new_type=" <<
BlockFileCache::cache_type_to_string(type);
+ }
+ DCHECK(expr);
+ std::string dir = get_path_in_local_cache(key.hash,
key.meta.expiration_time);
+ std::string original_file = get_path_in_local_cache(dir, key.offset,
key.meta.type);
+ std::string new_file = get_path_in_local_cache(dir, key.offset, type);
+ RETURN_IF_ERROR(fs->rename(original_file, new_file));
+ }
+ return Status::OK();
+}
+
+Status FSFileCacheStorage::change_key_meta_expiration(const FileCacheKey& key,
+ const uint64_t
expiration) {
+ // directory operation
+ if (key.meta.expiration_time != expiration) {
std::string original_dir = get_path_in_local_cache(key.hash,
key.meta.expiration_time);
- std::string new_dir = get_path_in_local_cache(key.hash,
new_meta.expiration_time);
+ std::string new_dir = get_path_in_local_cache(key.hash, expiration);
// It will be concurrent, but we don't care who rename
Status st = fs->rename(original_dir, new_dir);
if (!st.ok() && !st.is<ErrorCode::NOT_FOUND>()) {
return st;
}
- } else if (key.meta.type != new_meta.type) {
- std::string dir = get_path_in_local_cache(key.hash,
key.meta.expiration_time);
- std::string original_file = get_path_in_local_cache(dir, key.offset,
key.meta.type);
- std::string new_file = get_path_in_local_cache(dir, key.offset,
new_meta.type);
- RETURN_IF_ERROR(fs->rename(original_file, new_file));
}
return Status::OK();
}
std::string FSFileCacheStorage::get_path_in_local_cache(const std::string&
dir, size_t offset,
FileCacheType type,
bool is_tmp) {
- return Path(dir) / (std::to_string(offset) +
- (is_tmp ? "_tmp" :
BlockFileCache::cache_type_to_string(type)));
+ if (is_tmp) {
+ return Path(dir) / (std::to_string(offset) + "_tmp");
+ } else if (type == FileCacheType::TTL) {
+ return Path(dir) / std::to_string(offset);
+ } else {
+ return Path(dir) / (std::to_string(offset) +
BlockFileCache::cache_type_to_string(type));
+ }
+}
+
+std::string FSFileCacheStorage::get_path_in_local_cache_old_ttl_format(const
std::string& dir,
+ size_t
offset,
+
FileCacheType type,
+ bool
is_tmp) {
+ DCHECK(type == FileCacheType::TTL);
+ return Path(dir) / (std::to_string(offset) +
BlockFileCache::cache_type_to_string(type));
}
std::string FSFileCacheStorage::get_path_in_local_cache(const UInt128Wrapper&
value,
@@ -227,7 +289,7 @@ std::string
FSFileCacheStorage::get_path_in_local_cache(const UInt128Wrapper& va
}
}
-Status FSFileCacheStorage::rebuild_data_structure() const {
+Status FSFileCacheStorage::upgrade_cache_dir_if_necessary() const {
/// version 1.0: cache_base_path / key / offset
/// version 2.0: cache_base_path / key_prefix / key / offset
std::string version;
@@ -338,6 +400,72 @@ std::string FSFileCacheStorage::get_version_path() const {
return Path(_cache_base_path) / "version";
}
+Status FSFileCacheStorage::parse_filename_suffix_to_cache_type(
+ const std::shared_ptr<LocalFileSystem>& fs, const Path& file_path,
long expiration_time,
+ size_t size, size_t* offset, bool* is_tmp, FileCacheType* cache_type)
const {
+ std::error_code ec;
+ std::string offset_with_suffix = file_path.native();
+ auto delim_pos1 = offset_with_suffix.find('_');
+ bool parsed = true;
+
+ try {
+ if (delim_pos1 == std::string::npos) {
+ // same as type "normal"
+ *offset = stoull(offset_with_suffix);
+ } else {
+ *offset = stoull(offset_with_suffix.substr(0, delim_pos1));
+ std::string suffix = offset_with_suffix.substr(delim_pos1 + 1);
+ // not need persistent anymore
+ // if suffix is equals to "tmp", it should be removed too.
+ if (suffix == "tmp") [[unlikely]] {
+ *is_tmp = true;
+ } else {
+ *cache_type = BlockFileCache::string_to_cache_type(suffix);
+ }
+ }
+ } catch (...) {
+ parsed = false;
+ }
+
+ // File in dir with expiration time > 0 should all be TTL type
+ // while expiration time == 0 should all be NORMAL type but
+ // in old days, bug happens, thus break such consistency, e.g.
+ // BEs shut down during cache type transition.
+ // Nowadays, we only use expiration time to decide the type,
+ // i.e. whenever expiration time > 0, it IS TTL, otherwise
+ // it is NORMAL or INDEX depending on its suffix.
+ // From now on, the ttl type encoding in file name is only for
+ // compatibility. It won't be build into the filename, and existing
+ // ones will be ignored.
+ if (expiration_time > 0) {
+ *cache_type = FileCacheType::TTL;
+ } else if (*cache_type == FileCacheType::TTL && expiration_time == 0) {
+ *cache_type = FileCacheType::NORMAL;
+ }
+
+ if (!parsed) {
+ LOG(WARNING) << "parse offset err, path=" << file_path.native();
+ return Status::InternalError("parse offset err, path={}",
file_path.native());
+ }
+ TEST_SYNC_POINT_CALLBACK("BlockFileCache::REMOVE_FILE",
&offset_with_suffix);
+
+ if (ec) {
+ LOG(WARNING) << "failed to file_size: file_name=" << offset_with_suffix
+ << "error=" << ec.message();
+ return Status::InternalError("failed to file_size: file_name={},
error={}",
+ offset_with_suffix, ec.message());
+ }
+
+ if (size == 0 && !(*is_tmp)) {
+ auto st = fs->delete_file(file_path);
+ if (!st.ok()) {
+ LOG_WARNING("delete file {} error", file_path.native()).error(st);
+ }
+ return Status::InternalError("file size is 0, file_name={}",
offset_with_suffix);
+ }
+ return Status::OK();
+}
+
void FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr)
const {
int scan_length = 10000;
std::vector<BatchLoadArgs> batch_load_buffer;
@@ -383,50 +511,16 @@ void
FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const
}
CacheContext context;
context.query_id = TUniqueId();
- context.expiration_time = std::stoul(expiration_time_str);
+ long expiration_time = std::stoul(expiration_time_str);
+ context.expiration_time = expiration_time;
for (; offset_it != std::filesystem::directory_iterator();
++offset_it) {
- std::string offset_with_suffix =
offset_it->path().filename().native();
- auto delim_pos1 = offset_with_suffix.find('_');
- FileCacheType cache_type = FileCacheType::NORMAL;
- bool parsed = true;
- bool is_tmp = false;
- size_t offset = 0;
- try {
- if (delim_pos1 == std::string::npos) {
- // same as type "normal"
- offset = stoull(offset_with_suffix);
- } else {
- offset = stoull(offset_with_suffix.substr(0,
delim_pos1));
- std::string suffix =
offset_with_suffix.substr(delim_pos1 + 1);
- // not need persistent anymore
- // if suffix is equals to "tmp", it should be removed
too.
- if (suffix == "tmp") [[unlikely]] {
- is_tmp = true;
- } else {
- cache_type =
BlockFileCache::string_to_cache_type(suffix);
- }
- }
- } catch (...) {
- parsed = false;
- }
-
- if (!parsed) {
- LOG(WARNING) << "parse offset err, path=" <<
offset_it->path().native();
- continue;
- }
- TEST_SYNC_POINT_CALLBACK("BlockFileCache::REMOVE_FILE_2",
&offset_with_suffix);
size_t size = offset_it->file_size(ec);
- if (ec) {
- LOG(WARNING) << "failed to file_size: file_name=" <<
offset_with_suffix
- << "error=" << ec.message();
- continue;
- }
-
- if (size == 0 && !is_tmp) {
- auto st = fs->delete_file(offset_it->path());
- if (!st.ok()) {
- LOG_WARNING("delete file {} error",
offset_it->path().native()).error(st);
- }
+ size_t offset = 0;
+ bool is_tmp = false;
+ FileCacheType cache_type = FileCacheType::NORMAL;
+ if (!parse_filename_suffix_to_cache_type(fs,
offset_it->path().filename().native(),
+ expiration_time,
size, &offset, &is_tmp,
+ &cache_type)) {
continue;
}
context.cache_type = cache_type;
@@ -450,6 +544,7 @@ void
FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const
};
std::error_code ec;
if constexpr (USE_CACHE_VERSION2) {
+ TEST_SYNC_POINT_CALLBACK("BlockFileCache::BeforeScan");
std::filesystem::directory_iterator key_prefix_it {_cache_base_path,
ec};
if (ec) {
LOG(WARNING) << ec.message();
@@ -457,7 +552,7 @@ void
FSFileCacheStorage::load_cache_info_into_memory(BlockFileCache* _mgr) const
}
for (; key_prefix_it != std::filesystem::directory_iterator();
++key_prefix_it) {
if (!key_prefix_it->is_directory()) {
- // maybe version hits file
+ // skip version file
continue;
}
if (key_prefix_it->path().filename().native().size() !=
KEY_PREFIX_LENGTH) {
@@ -516,46 +611,13 @@ void
FSFileCacheStorage::load_blocks_directly_unlocked(BlockFileCache* mgr, cons
return;
}
for (; check_it != std::filesystem::directory_iterator(); ++check_it) {
- uint64_t offset = 0;
- std::string offset_with_suffix = check_it->path().filename().native();
- auto delim_pos1 = offset_with_suffix.find('_');
- FileCacheType cache_type = FileCacheType::NORMAL;
- bool parsed = true;
- bool is_tmp = false;
- try {
- if (delim_pos1 == std::string::npos) {
- // same as type "normal"
- offset = stoull(offset_with_suffix);
- } else {
- offset = stoull(offset_with_suffix.substr(0, delim_pos1));
- std::string suffix = offset_with_suffix.substr(delim_pos1 + 1);
- if (suffix == "tmp") [[unlikely]] {
- is_tmp = true;
- } else {
- cache_type = BlockFileCache::string_to_cache_type(suffix);
- }
- }
- } catch (...) {
- parsed = false;
- }
-
- if (!parsed) [[unlikely]] {
- LOG(WARNING) << "parse offset err, path=" << offset_with_suffix;
- continue;
- }
-
- TEST_SYNC_POINT_CALLBACK("BlockFileCache::REMOVE_FILE_1",
&offset_with_suffix);
- std::error_code ec;
size_t size = check_it->file_size(ec);
- if (ec) {
- LOG(WARNING) << "failed to file_size: error=" << ec.message();
- continue;
- }
- if (size == 0 && !is_tmp) [[unlikely]] {
- auto st = fs->delete_file(check_it->path());
- if (!st.ok()) {
- LOG_WARNING("Failed to delete file {}",
check_it->path().native()).error(st);
- }
+ size_t offset = 0;
+ bool is_tmp = false;
+ FileCacheType cache_type = FileCacheType::NORMAL;
+ if (!parse_filename_suffix_to_cache_type(fs,
check_it->path().filename().native(),
+
context_original.expiration_time, size, &offset,
+ &is_tmp, &cache_type)) {
continue;
}
if (!mgr->_files.contains(key.hash) ||
!mgr->_files[key.hash].contains(offset)) {
diff --git a/be/src/io/cache/fs_file_cache_storage.h
b/be/src/io/cache/fs_file_cache_storage.h
index d3299c6af0e..352b4e21f3f 100644
--- a/be/src/io/cache/fs_file_cache_storage.h
+++ b/be/src/io/cache/fs_file_cache_storage.h
@@ -65,7 +65,8 @@ public:
Status finalize(const FileCacheKey& key) override;
Status read(const FileCacheKey& key, size_t value_offset, Slice buffer)
override;
Status remove(const FileCacheKey& key) override;
- Status change_key_meta(const FileCacheKey& key, const KeyMeta& new_meta)
override;
+ Status change_key_meta_type(const FileCacheKey& key, const FileCacheType
type) override;
+ Status change_key_meta_expiration(const FileCacheKey& key, const uint64_t
expiration) override;
void load_blocks_directly_unlocked(BlockFileCache* _mgr, const
FileCacheKey& key,
std::lock_guard<std::mutex>&
cache_lock) override;
@@ -73,14 +74,24 @@ public:
FileCacheType
type,
bool is_tmp =
false);
+ [[nodiscard]] static std::string
get_path_in_local_cache_old_ttl_format(const std::string& dir,
+
size_t offset,
+
FileCacheType type,
+
bool is_tmp = false);
+
[[nodiscard]] std::string get_path_in_local_cache(const UInt128Wrapper&,
uint64_t
expiration_time) const;
private:
- Status rebuild_data_structure() const;
+ Status upgrade_cache_dir_if_necessary() const;
Status read_file_cache_version(std::string* buffer) const;
+ Status parse_filename_suffix_to_cache_type(const
std::shared_ptr<LocalFileSystem>& fs,
+ const Path& file_path, long
expiration_time,
+ size_t size, size_t* offset,
bool* is_tmp,
+ FileCacheType* cache_type)
const;
+
Status write_file_cache_version() const;
[[nodiscard]] std::string get_version_path() const;
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 759913bcaea..84fa6c9e004 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -1032,7 +1032,8 @@ Status SegmentWriter::finalize(uint64_t*
segment_file_size, uint64_t* index_size
auto size = *index_size + *segment_file_size;
auto holder = cache_builder->allocate_cache_holder(index_start, size);
for (auto& segment : holder->file_blocks) {
-
static_cast<void>(segment->change_cache_type_self(io::FileCacheType::INDEX));
+ static_cast<void>(
+
segment->change_cache_type_between_normal_and_index(io::FileCacheType::INDEX));
}
}
return Status::OK();
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index 77cf48ac8e7..4c3285a27b3 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -248,7 +248,7 @@ void test_file_cache(io::FileCacheType cache_type) {
ASSERT_TRUE(mgr.initialize().ok());
for (int i = 0; i < 100; i++) {
- if (mgr.get_lazy_open_success()) {
+ if (mgr.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -586,7 +586,7 @@ void test_file_cache(io::FileCacheType cache_type) {
io::BlockFileCache cache2(cache_base_path, settings);
ASSERT_TRUE(cache2.initialize().ok());
for (int i = 0; i < 100; i++) {
- if (cache2.get_lazy_open_success()) {
+ if (cache2.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -624,7 +624,7 @@ void test_file_cache(io::FileCacheType cache_type) {
io::BlockFileCache cache2(cache_path2, settings2);
ASSERT_TRUE(cache2.initialize().ok());
for (int i = 0; i < 100; i++) {
- if (cache2.get_lazy_open_success()) {
+ if (cache2.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -677,7 +677,7 @@ TEST_F(BlockFileCacheTest, resize) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -711,12 +711,12 @@ TEST_F(BlockFileCacheTest, max_ttl_size) {
ASSERT_TRUE(cache.initialize());
int i = 0;
for (; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
- ASSERT_TRUE(cache.get_lazy_open_success());
+ ASSERT_TRUE(cache.get_async_open_success());
int64_t offset = 0;
for (; offset < 100000000; offset += 100000) {
auto holder = cache.get_or_set(key1, offset, 100000, context);
@@ -759,7 +759,7 @@ TEST_F(BlockFileCacheTest, query_limit_heap_use_after_free)
{
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -844,7 +844,7 @@ TEST_F(BlockFileCacheTest, query_limit_dcheck) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -961,7 +961,7 @@ TEST_F(BlockFileCacheTest, reset_range) {
EXPECT_EQ(cache.capacity(), 15);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -1011,7 +1011,7 @@ TEST_F(BlockFileCacheTest, change_cache_type) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -1030,7 +1030,8 @@ TEST_F(BlockFileCacheTest, change_cache_type) {
std::string data(size, '0');
Slice result(data.data(), size);
ASSERT_TRUE(blocks[0]->append(result).ok());
-
ASSERT_TRUE(blocks[0]->change_cache_type_self(io::FileCacheType::INDEX));
+ ASSERT_TRUE(
+
blocks[0]->change_cache_type_between_normal_and_index(io::FileCacheType::INDEX));
ASSERT_TRUE(blocks[0]->finalize().ok());
auto key_str = key.to_string();
auto subdir = fs::path(cache_base_path) / key_str.substr(0, 3) /
@@ -1061,7 +1062,7 @@ TEST_F(BlockFileCacheTest, fd_cache_remove) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -1143,7 +1144,7 @@ TEST_F(BlockFileCacheTest, fd_cache_evict) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -1289,7 +1290,7 @@ void
test_file_cache_run_in_resource_limit(io::FileCacheType cache_type) {
cache._index_queue.hot_data_interval = 0;
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -1421,7 +1422,7 @@ TEST_F(BlockFileCacheTest, fix_tmp_file) {
}
}
-TEST_F(BlockFileCacheTest, test_lazy_load) {
+TEST_F(BlockFileCacheTest, test_async_load) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -1471,7 +1472,7 @@ TEST_F(BlockFileCacheTest, test_lazy_load) {
ASSERT_TRUE(blocks[0]->finalize());
flag1 = true;
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -1482,7 +1483,7 @@ TEST_F(BlockFileCacheTest, test_lazy_load) {
}
}
-TEST_F(BlockFileCacheTest, test_lazy_load_with_limit) {
+TEST_F(BlockFileCacheTest, test_async_load_with_limit) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -1533,7 +1534,7 @@ TEST_F(BlockFileCacheTest, test_lazy_load_with_limit) {
ASSERT_TRUE(blocks[0]->finalize());
flag1 = true;
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -1576,7 +1577,7 @@ TEST_F(BlockFileCacheTest, ttl_normal) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -1671,7 +1672,7 @@ TEST_F(BlockFileCacheTest, ttl_modify) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -1746,7 +1747,7 @@ TEST_F(BlockFileCacheTest, ttl_change_to_normal) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -1810,7 +1811,7 @@ TEST_F(BlockFileCacheTest, ttl_change_expiration_time) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -1873,12 +1874,12 @@ TEST_F(BlockFileCacheTest, ttl_reverse) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
- ASSERT_TRUE(cache.get_lazy_open_success());
+ ASSERT_TRUE(cache.get_async_open_success());
for (size_t offset = 0; offset < 30; offset += 6) {
auto holder = cache.get_or_set(key2, offset, 6, context);
auto blocks = fromHolder(holder);
@@ -1925,7 +1926,7 @@ TEST_F(BlockFileCacheTest, io_error) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -2081,7 +2082,7 @@ TEST_F(BlockFileCacheTest,
remove_directly_when_normal_change_to_ttl) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -2157,7 +2158,7 @@ TEST_F(BlockFileCacheTest, recyle_cache_async) {
sp->enable_processing();
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -2228,7 +2229,7 @@ TEST_F(BlockFileCacheTest, recyle_cache_async_ttl) {
sp->enable_processing();
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -2291,7 +2292,7 @@ TEST_F(BlockFileCacheTest, remove_directly) {
context.expiration_time = UnixSeconds() + 3600;
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -2368,7 +2369,7 @@ TEST_F(BlockFileCacheTest, test_factory_1) {
auto cache = FileCacheFactory::instance()->get_by_path(key1);
int i = 0;
while (i++ < 1000) {
- if (cache->get_lazy_open_success()) {
+ if (cache->get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -2434,7 +2435,7 @@ TEST_F(BlockFileCacheTest, test_factory_2) {
auto cache = FileCacheFactory::instance()->get_by_path(key);
int i = 0;
while (i++ < 1000) {
- if (cache->get_lazy_open_success()) {
+ if (cache->get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -2469,7 +2470,7 @@ TEST_F(BlockFileCacheTest, test_factory_3) {
auto cache = FileCacheFactory::instance()->get_by_path(key);
int i = 0;
while (i++ < 1000) {
- if (cache->get_lazy_open_success()) {
+ if (cache->get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -2545,7 +2546,7 @@ TEST_F(BlockFileCacheTest, test_disposable) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -2593,7 +2594,7 @@ TEST_F(BlockFileCacheTest, test_query_limit) {
auto cache = FileCacheFactory::instance()->get_by_path(key);
int i = 0;
while (i++ < 1000) {
- if (cache->get_lazy_open_success()) {
+ if (cache->get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -2656,7 +2657,7 @@ TEST_F(BlockFileCacheTest, append_many_time) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -2676,7 +2677,8 @@ TEST_F(BlockFileCacheTest, append_many_time) {
auto holder = cache.get_or_set(key, 0, 5, context);
auto blocks = fromHolder(holder);
assert_range(1, blocks[0], io::FileBlock::Range(0, 4),
io::FileBlock::State::DOWNLOADED);
-
ASSERT_TRUE(blocks[0]->change_cache_type_self(FileCacheType::INDEX).ok());
+ ASSERT_TRUE(
+
blocks[0]->change_cache_type_between_normal_and_index(FileCacheType::INDEX).ok());
if (auto storage =
dynamic_cast<FSFileCacheStorage*>(cache._storage.get());
storage != nullptr) {
auto dir =
storage->get_path_in_local_cache(blocks[0]->get_hash_value(),
@@ -2684,7 +2686,8 @@ TEST_F(BlockFileCacheTest, append_many_time) {
EXPECT_TRUE(fs::exists(storage->get_path_in_local_cache(dir,
blocks[0]->offset(),
blocks[0]->cache_type())));
}
-
ASSERT_TRUE(blocks[0]->change_cache_type_self(FileCacheType::INDEX).ok());
+ ASSERT_TRUE(
+
blocks[0]->change_cache_type_between_normal_and_index(FileCacheType::INDEX).ok());
auto sp = SyncPoint::get_instance();
sp->enable_processing();
SyncPoint::CallbackGuard guard1;
@@ -2697,15 +2700,9 @@ TEST_F(BlockFileCacheTest, append_many_time) {
},
&guard1);
{
-
ASSERT_FALSE(blocks[0]->change_cache_type_self(FileCacheType::NORMAL).ok());
- EXPECT_EQ(blocks[0]->cache_type(), FileCacheType::INDEX);
- std::string buffer;
- buffer.resize(5);
- EXPECT_TRUE(blocks[0]->read(Slice(buffer.data(), 5), 0).ok());
- EXPECT_EQ(buffer, std::string(5, '0'));
- }
- {
-
EXPECT_FALSE(blocks[0]->change_cache_type_by_mgr(FileCacheType::NORMAL).ok());
+ ASSERT_FALSE(blocks[0]
+
->change_cache_type_between_normal_and_index(FileCacheType::NORMAL)
+ .ok());
EXPECT_EQ(blocks[0]->cache_type(), FileCacheType::INDEX);
std::string buffer;
buffer.resize(5);
@@ -2776,7 +2773,7 @@ TEST_F(BlockFileCacheTest, query_file_cache) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -2787,7 +2784,7 @@ TEST_F(BlockFileCacheTest, query_file_cache) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -2844,7 +2841,7 @@ TEST_F(BlockFileCacheTest, query_file_cache_reserve) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -3085,7 +3082,7 @@ TEST_F(BlockFileCacheTest,
cached_remote_file_reader_error_handle) {
ASSERT_TRUE(FileCacheFactory::instance()->create_file_cache(cache_base_path,
settings).ok());
auto cache = FileCacheFactory::instance()->_caches[0].get();
for (int i = 0; i < 100; i++) {
- if (cache->get_lazy_open_success()) {
+ if (cache->get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -3356,7 +3353,7 @@ TEST_F(BlockFileCacheTest, test_hot_data) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -3418,7 +3415,7 @@ TEST_F(BlockFileCacheTest, test_hot_data) {
EXPECT_EQ(cache.get_hot_blocks_meta(key2).size(), 1);
}
-TEST_F(BlockFileCacheTest, test_lazy_load_with_error_file_1) {
+TEST_F(BlockFileCacheTest, test_async_load_with_error_file_1) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -3439,7 +3436,7 @@ TEST_F(BlockFileCacheTest,
test_lazy_load_with_error_file_1) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -3466,7 +3463,7 @@ TEST_F(BlockFileCacheTest,
test_lazy_load_with_error_file_1) {
ASSERT_TRUE(writer->append(Slice("111", 3)).ok());
ASSERT_TRUE(writer->close().ok());
});
- sp->set_call_back("BlockFileCache::REMOVE_FILE_2", [&](auto&& args) {
+ sp->set_call_back("BlockFileCache::REMOVE_FILE", [&](auto&& args) {
if (*try_any_cast<std::string*>(args[0]) == "30086_idx") {
static_cast<void>(global_local_filesystem()->delete_file(dir /
"30086_idx"));
}
@@ -3491,7 +3488,7 @@ TEST_F(BlockFileCacheTest,
test_lazy_load_with_error_file_1) {
}
}
-TEST_F(BlockFileCacheTest, test_lazy_load_with_error_file_2) {
+TEST_F(BlockFileCacheTest, test_async_load_with_error_file_2) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
}
@@ -3538,7 +3535,7 @@ TEST_F(BlockFileCacheTest,
test_lazy_load_with_error_file_2) {
while (!flag1) {
}
});
- sp->set_call_back("BlockFileCache::REMOVE_FILE_1", [&](auto&& args) {
+ sp->set_call_back("BlockFileCache::REMOVE_FILE", [&](auto&& args) {
if (*try_any_cast<std::string*>(args[0]) == "30086_idx") {
static_cast<void>(global_local_filesystem()->delete_file(dir /
"30086_idx"));
}
@@ -3562,7 +3559,7 @@ TEST_F(BlockFileCacheTest,
test_lazy_load_with_error_file_2) {
ASSERT_TRUE(blocks[0]->finalize());
flag1 = true;
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -3588,7 +3585,7 @@ TEST_F(BlockFileCacheTest,
test_check_disk_reource_limit_1) {
config::file_cache_exit_disk_resource_limit_mode_percent = 50;
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -3618,7 +3615,7 @@ TEST_F(BlockFileCacheTest,
test_check_disk_reource_limit_2) {
config::file_cache_exit_disk_resource_limit_mode_percent = 1;
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -3649,7 +3646,7 @@ TEST_F(BlockFileCacheTest,
test_check_disk_reource_limit_3) {
config::file_cache_exit_disk_resource_limit_mode_percent = 98;
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -3749,7 +3746,7 @@ TEST_F(BlockFileCacheTest,
remove_if_cached_when_isnt_releasable) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -3898,7 +3895,7 @@ TEST_F(BlockFileCacheTest, remove_from_other_queue_1) {
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -3969,7 +3966,7 @@ TEST_F(BlockFileCacheTest, remove_from_other_queue_2) {
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -4088,7 +4085,7 @@ TEST_F(BlockFileCacheTest, recyle_unvalid_ttl_async) {
sp->enable_processing();
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -4143,7 +4140,7 @@ TEST_F(BlockFileCacheTest,
ttl_reserve_wo_evict_using_lru) {
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -4206,7 +4203,7 @@ TEST_F(BlockFileCacheTest,
ttl_reserve_with_evict_using_lru) {
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -4275,7 +4272,7 @@ TEST_F(BlockFileCacheTest,
ttl_reserve_with_evict_using_lru_meet_max_ttl_cache_r
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -4350,7 +4347,7 @@ TEST_F(BlockFileCacheTest, reset_capacity) {
sp->enable_processing();
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -4419,7 +4416,7 @@ TEST_F(BlockFileCacheTest, change_cache_type1) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -4444,7 +4441,7 @@ TEST_F(BlockFileCacheTest, change_cache_type1) {
ASSERT_EQ(segments.size(), 1);
assert_range(1, segments[0], io::FileBlock::Range(50, 59),
io::FileBlock::State::DOWNLOADED);
- EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::TTL);
+ EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::NORMAL);
EXPECT_EQ(segments[0]->expiration_time(), 0);
}
sp->clear_call_back("FileBlock::change_cache_type");
@@ -4493,7 +4490,7 @@ TEST_F(BlockFileCacheTest, change_cache_type2) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -4518,7 +4515,7 @@ TEST_F(BlockFileCacheTest, change_cache_type2) {
ASSERT_EQ(segments.size(), 1);
assert_range(1, segments[0], io::FileBlock::Range(50, 59),
io::FileBlock::State::DOWNLOADED);
- EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::NORMAL);
+ EXPECT_EQ(segments[0]->cache_type(), io::FileCacheType::TTL);
EXPECT_EQ(segments[0]->expiration_time(), context.expiration_time);
}
sp->clear_call_back("FileBlock::change_cache_type");
@@ -4552,6 +4549,7 @@ TEST_F(BlockFileCacheTest, change_cache_type2) {
}
}
+/*
TEST_F(BlockFileCacheTest, load_cache1) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
@@ -4577,7 +4575,7 @@ TEST_F(BlockFileCacheTest, load_cache1) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -4624,7 +4622,7 @@ TEST_F(BlockFileCacheTest, load_cache2) {
io::BlockFileCache cache(cache_base_path, settings);
ASSERT_TRUE(cache.initialize());
for (int i = 0; i < 100; i++) {
- if (cache.get_lazy_open_success()) {
+ if (cache.get_async_open_success()) {
break;
};
std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -4644,5 +4642,105 @@ TEST_F(BlockFileCacheTest, load_cache2) {
key1.to_string() + "_0/" +
std::to_string(offset));
}
}
+*/
+
+TEST_F(BlockFileCacheTest, test_load) {
+ // test both path formats when loading file cache into memory
+ // old file path format, [hash]_[expiration]/[offset]_ttl
+ // new file path format, [hash]_[expiration]/[offset]
+ const int64_t expiration = 1987654321;
+ if (fs::exists(cache_base_path)) {
+ fs::remove_all(cache_base_path);
+ }
+ fs::create_directories(cache_base_path);
+ auto sp = SyncPoint::get_instance();
+ Defer defer {[sp] { sp->clear_all_call_backs(); }};
+ io::FileCacheSettings settings;
+ settings.index_queue_size = 30;
+ settings.index_queue_elements = 5;
+ settings.capacity = 30;
+ settings.max_file_block_size = 30;
+ settings.max_query_cache_size = 30;
+ io::CacheContext context;
+ context.cache_type = io::FileCacheType::TTL;
+ context.expiration_time = expiration;
+ auto key = io::BlockFileCache::hash("key1");
+ io::BlockFileCache cache(cache_base_path, settings);
+ std::string dir = cache_base_path + key.to_string().substr(0, 3) + "/" +
key.to_string() + "_" +
+ std::to_string(expiration);
+ std::cout << dir << std::endl;
+ auto st = global_local_filesystem()->create_directory(dir, false);
+ if (!st.ok()) {
+ std::cout << dir << " create failed";
+ ASSERT_TRUE(false);
+ }
+ sp->set_call_back("BlockFileCache::BeforeScan", [&](auto&&) {
+ FileWriterPtr writer;
+ ASSERT_TRUE(global_local_filesystem()->create_file(dir / "10086_ttl",
&writer).ok());
+ ASSERT_TRUE(writer->append(Slice("111", 3)).ok());
+ ASSERT_TRUE(writer->close().ok());
+
+ // no suffix, but it is not NORMAL, instead it is TTL because the
+ // dirname contains non-zero expiration time
+ ASSERT_TRUE(global_local_filesystem()->create_file(dir / "20086",
&writer).ok());
+ ASSERT_TRUE(writer->append(Slice("222", 3)).ok());
+ ASSERT_TRUE(writer->close().ok());
+
+ ASSERT_TRUE(global_local_filesystem()->create_file(dir / "30086_idx",
&writer).ok());
+ ASSERT_TRUE(writer->append(Slice("333", 3)).ok());
+ ASSERT_TRUE(writer->close().ok());
+ });
+ sp->enable_processing();
+ ASSERT_TRUE(cache.initialize());
+ for (int i = 0; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ };
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+
+ {
+ auto type = cache.dump_single_cache_type(key, 10086);
+ ASSERT_TRUE(type == "_ttl");
+ auto holder = cache.get_or_set(key, 10086, 3, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(1, blocks[0], io::FileBlock::Range(10086, 10086 + 3 - 1),
+ io::FileBlock::State::DOWNLOADED);
+ ASSERT_TRUE(blocks[0]->cache_type() == io::FileCacheType::TTL);
+ // OK, looks like old format is correctly loaded, let's read it
+ std::string buffer;
+ buffer.resize(3);
+ ASSERT_TRUE(blocks[0]->read(Slice(buffer.data(), buffer.size()),
0).ok());
+ ASSERT_EQ(buffer, "111");
+ // OK, read successfully, let's try removing it
+ std::mutex m1, m2;
+ std::lock_guard cache_lock(m1);
+ std::lock_guard block_lock(m2);
+ cache.remove(blocks[0], cache_lock, block_lock);
+ ASSERT_FALSE(fs::exists(dir / "10086_ttl"));
+ }
+ {
+ auto type = cache.dump_single_cache_type(key, 20086);
+ ASSERT_TRUE(type == "_ttl");
+ auto holder = cache.get_or_set(key, 20086, 3, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(1, blocks[0], io::FileBlock::Range(20086, 20086 + 3 - 1),
+ io::FileBlock::State::DOWNLOADED);
+ ASSERT_TRUE(blocks[0]->cache_type() == io::FileCacheType::TTL);
+ // OK, looks like old format is correctly loaded, let's read it
+ std::string buffer;
+ buffer.resize(3);
+ ASSERT_TRUE(blocks[0]->read(Slice(buffer.data(), buffer.size()),
0).ok());
+ ASSERT_EQ(buffer, "222");
+ // OK, read successfully, let's try removing it
+ std::mutex m1, m2;
+ std::lock_guard cache_lock(m1);
+ std::lock_guard block_lock(m2);
+ cache.remove(blocks[0], cache_lock, block_lock);
+ ASSERT_FALSE(fs::exists(dir / "20086"));
+ }
+}
} // namespace doris::io
diff --git a/be/test/io/fs/s3_file_writer_test.cpp
b/be/test/io/fs/s3_file_writer_test.cpp
index ab76fb54347..7021346a704 100644
--- a/be/test/io/fs/s3_file_writer_test.cpp
+++ b/be/test/io/fs/s3_file_writer_test.cpp
@@ -625,7 +625,7 @@ TEST_F(S3FileWriterTest, multi_part_open_error) {
// auto cache = std::make_unique<io::BlockFileCache>(cache_base_path,
settings);
// ASSERT_TRUE(cache->initialize());
// while (true) {
-// if (cache->get_lazy_open_success()) {
+// if (cache->get_async_open_success()) {
// break;
// };
// std::this_thread::sleep_for(std::chrono::milliseconds(1));
@@ -710,7 +710,7 @@ TEST_F(S3FileWriterTest, multi_part_open_error) {
// auto cache = std::make_unique<io::BlockFileCache>(cache_base_path,
settings);
// ASSERT_TRUE(cache->initialize());
// while (true) {
-// if (cache->get_lazy_open_success()) {
+// if (cache->get_async_open_success()) {
// break;
// };
// std::this_thread::sleep_for(std::chrono::milliseconds(1));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]