This is an automated email from the ASF dual-hosted git repository.
gavinchou pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 112636b8409 [feature](file cache) Support in-memory filecache for
no-disk/slow-disk system (#38811)
112636b8409 is described below
commit 112636b8409483b6eec01c283c9aad594f402b3f
Author: zhengyu <[email protected]>
AuthorDate: Tue Sep 24 16:26:18 2024 +0800
[feature](file cache) Support in-memory filecache for no-disk/slow-disk
system (#38811)
User can set file_cache_path=[{..., "storage":"memory"}] to use RAM as
file cache storage.
---
be/src/common/config.cpp | 10 +
be/src/common/config.h | 10 +
be/src/io/cache/block_file_cache.cpp | 39 +-
be/src/io/cache/block_file_cache.h | 1 +
be/src/io/cache/block_file_cache_factory.cpp | 67 ++-
be/src/io/cache/file_cache_common.cpp | 16 +-
be/src/io/cache/file_cache_common.h | 7 +-
be/src/io/cache/file_cache_storage.h | 19 +
be/src/io/cache/fs_file_cache_storage.cpp | 21 +
be/src/io/cache/fs_file_cache_storage.h | 16 +-
be/src/io/cache/mem_file_cache_storage.cpp | 131 +++++
be/src/io/cache/mem_file_cache_storage.h | 55 ++
be/src/olap/options.cpp | 20 +-
be/src/olap/options.h | 7 +-
be/test/io/cache/block_file_cache_test.cpp | 750 +++++++++++++++++++++++++++
15 files changed, 1100 insertions(+), 69 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index e1d9c7298ac..8a6ddc398a1 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -995,6 +995,16 @@ DEFINE_Bool(enable_file_cache, "false");
// format:
[{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240}]
// format:
[{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}]
// format: {"path": "/path/to/file_cache", "total_size":53687091200,
"normal_percent":85, "disposable_percent":10, "index_percent":5}
+// format: [{"path": "xxx", "total_size":53687091200, "storage": "memory"}]
+// Note1: storage is "disk" by default
+// Note2: when the storage is "memory", the path is ignored. So you can set
xxx to anything you like
+// and doris will just reset the path to "memory" internally.
+// In a very wierd case when your storage is disk, and the directory, by
accident, is named
+// "memory" for some reason, you should write the path as:
+// {"path": "memory", "total_size":53687091200, "storage": "disk"}
+// or use the default storage value:
+// {"path": "memory", "total_size":53687091200}
+// Both will use the directory "memory" on the disk instead of the real RAM.
DEFINE_String(file_cache_path, "");
DEFINE_Int64(file_cache_each_block_size, "1048576"); // 1MB
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 262b5d66971..d69e8a94b9e 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1046,6 +1046,16 @@ DECLARE_Bool(enable_file_cache);
// format:
[{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240}]
// format:
[{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240},{"path":"/path/to/file_cache2","total_size":21474836480,"query_limit":10737418240}]
// format:
[{"path":"/path/to/file_cache","total_size":21474836480,"query_limit":10737418240,"normal_percent":85,
"disposable_percent":10, "index_percent":5}]
+// format: [{"path": "xxx", "total_size":53687091200, "storage": "memory"}]
+// Note1: storage is "disk" by default
+// Note2: when the storage is "memory", the path is ignored. So you can set
xxx to anything you like
+// and doris will just reset the path to "memory" internally.
+// In a very wierd case when your storage is disk, and the directory, by
accident, is named
+// "memory" for some reason, you should write the path as:
+// {"path": "memory", "total_size":53687091200, "storage": "disk"}
+// or use the default storage value:
+// {"path": "memory", "total_size":53687091200}
+// Both will use the directory "memory" on the disk instead of the real RAM.
DECLARE_String(file_cache_path);
DECLARE_Int64(file_cache_each_block_size);
DECLARE_Bool(clear_file_cache);
diff --git a/be/src/io/cache/block_file_cache.cpp
b/be/src/io/cache/block_file_cache.cpp
index f5c0a7c79bf..2ff374442d1 100644
--- a/be/src/io/cache/block_file_cache.cpp
+++ b/be/src/io/cache/block_file_cache.cpp
@@ -38,6 +38,7 @@
#include "io/cache/file_block.h"
#include "io/cache/file_cache_common.h"
#include "io/cache/fs_file_cache_storage.h"
+#include "io/cache/mem_file_cache_storage.h"
#include "util/time.h"
#include "vec/common/sip_hash.h"
#include "vec/common/uint128.h"
@@ -91,14 +92,14 @@ BlockFileCache::BlockFileCache(const std::string&
cache_base_path,
_ttl_queue = LRUQueue(std::numeric_limits<int>::max(),
std::numeric_limits<int>::max(),
std::numeric_limits<int>::max());
- LOG(INFO) << fmt::format(
- "file cache path={}, disposable queue size={} elements={}, index
queue size={} "
- "elements={}, query queue "
- "size={} elements={}",
- cache_base_path, cache_settings.disposable_queue_size,
- cache_settings.disposable_queue_elements,
cache_settings.index_queue_size,
- cache_settings.index_queue_elements,
cache_settings.query_queue_size,
- cache_settings.query_queue_elements);
+ if (cache_settings.storage == "memory") {
+ _storage = std::make_unique<MemFileCacheStorage>();
+ _cache_base_path = "memory";
+ } else {
+ _storage = std::make_unique<FSFileCacheStorage>();
+ }
+
+ LOG(INFO) << "file cache path= " << _cache_base_path << " " <<
cache_settings.to_string();
}
UInt128Wrapper BlockFileCache::hash(const std::string& path) {
@@ -206,7 +207,6 @@ Status BlockFileCache::initialize() {
Status BlockFileCache::initialize_unlocked(std::lock_guard<std::mutex>&
cache_lock) {
DCHECK(!_is_initialized);
_is_initialized = true;
- _storage = std::make_unique<FSFileCacheStorage>();
RETURN_IF_ERROR(_storage->init(this));
_cache_background_thread =
std::thread(&BlockFileCache::run_background_operation, this);
@@ -1512,6 +1512,9 @@ std::string BlockFileCache::reset_capacity(size_t
new_capacity) {
}
void BlockFileCache::check_disk_resource_limit() {
+ if (_storage->get_type() != FileCacheStorageType::DISK) {
+ return;
+ }
if (_capacity > _cur_cache_size) {
_disk_resource_limit_mode = false;
}
@@ -1739,20 +1742,12 @@ std::string BlockFileCache::clear_file_cache_directly()
{
std::lock_guard cache_lock(_mutex);
LOG_INFO("start clear_file_cache_directly").tag("path", _cache_base_path);
- auto st = global_local_filesystem()->delete_directory(_cache_base_path);
- if (!st.ok()) {
- ss << " failed to clear_file_cache_directly, path=" << _cache_base_path
- << " delete dir failed: " << st;
- LOG(WARNING) << ss.str();
- return ss.str();
- }
- st = global_local_filesystem()->create_directory(_cache_base_path);
- if (!st.ok()) {
- ss << " failed to clear_file_cache_directly, path=" << _cache_base_path
- << " create dir failed: " << st;
- LOG(WARNING) << ss.str();
- return ss.str();
+ std::string clear_msg;
+ auto s = _storage->clear(clear_msg);
+ if (!s.ok()) {
+ return clear_msg;
}
+
int64_t num_files = _files.size();
int64_t cache_size = _cur_cache_size;
int64_t index_queue_size = _index_queue.get_elements_num(cache_lock);
diff --git a/be/src/io/cache/block_file_cache.h
b/be/src/io/cache/block_file_cache.h
index def354b155b..c0a2bce76b1 100644
--- a/be/src/io/cache/block_file_cache.h
+++ b/be/src/io/cache/block_file_cache.h
@@ -40,6 +40,7 @@ class FSFileCacheStorage;
// The current strategies are lru and ttl.
class BlockFileCache {
friend class FSFileCacheStorage;
+ friend class MemFileCacheStorage;
friend class FileBlock;
friend struct FileBlocksHolder;
diff --git a/be/src/io/cache/block_file_cache_factory.cpp
b/be/src/io/cache/block_file_cache_factory.cpp
index 2c15d440be1..ac16bbefa58 100644
--- a/be/src/io/cache/block_file_cache_factory.cpp
+++ b/be/src/io/cache/block_file_cache_factory.cpp
@@ -63,32 +63,45 @@ size_t FileCacheFactory::try_release(const std::string&
base_path) {
Status FileCacheFactory::create_file_cache(const std::string& cache_base_path,
FileCacheSettings
file_cache_settings) {
- const auto& fs = global_local_filesystem();
- bool exists = false;
- RETURN_IF_ERROR(fs->exists(cache_base_path, &exists));
- if (!exists) {
- auto st = fs->create_directory(cache_base_path);
- LOG(INFO) << "path " << cache_base_path << " does not exist, create "
<< st.msg();
- RETURN_IF_ERROR(st);
- } else if (config::clear_file_cache) {
- RETURN_IF_ERROR(fs->delete_directory(cache_base_path));
- RETURN_IF_ERROR(fs->create_directory(cache_base_path));
- }
+ if (file_cache_settings.storage == "memory") {
+ if (cache_base_path != "memory") {
+ LOG(WARNING) << "memory storage must use memory path";
+ return Status::InvalidArgument("memory storage must use memory
path");
+ }
+ } else {
+ const auto& fs = global_local_filesystem();
+ bool exists = false;
+ RETURN_IF_ERROR(fs->exists(cache_base_path, &exists));
+ if (!exists) {
+ auto st = fs->create_directory(cache_base_path);
+ LOG(INFO) << "path " << cache_base_path << " does not exist,
create " << st.msg();
+ RETURN_IF_ERROR(st);
+ } else if (config::clear_file_cache) {
+ RETURN_IF_ERROR(fs->delete_directory(cache_base_path));
+ RETURN_IF_ERROR(fs->create_directory(cache_base_path));
+ }
- struct statfs stat;
- if (statfs(cache_base_path.c_str(), &stat) < 0) {
- LOG_ERROR("").tag("file cache path", cache_base_path).tag("error",
strerror(errno));
- return Status::IOError("{} statfs error {}", cache_base_path,
strerror(errno));
- }
- size_t disk_capacity = static_cast<size_t>(
- static_cast<size_t>(stat.f_blocks) *
static_cast<size_t>(stat.f_bsize) *
-
(static_cast<double>(config::file_cache_enter_disk_resource_limit_mode_percent)
/ 100));
- if (file_cache_settings.capacity == 0 || disk_capacity <
file_cache_settings.capacity) {
- LOG_INFO("The cache {} config size {} is larger than {}% disk size {}
or zero, recalc it.",
- cache_base_path, file_cache_settings.capacity,
- config::file_cache_enter_disk_resource_limit_mode_percent,
disk_capacity);
- file_cache_settings =
- get_file_cache_settings(disk_capacity,
file_cache_settings.max_query_cache_size);
+ struct statfs stat;
+ if (statfs(cache_base_path.c_str(), &stat) < 0) {
+ LOG_ERROR("").tag("file cache path", cache_base_path).tag("error",
strerror(errno));
+ return Status::IOError("{} statfs error {}", cache_base_path,
strerror(errno));
+ }
+ size_t disk_capacity = static_cast<size_t>(
+ static_cast<size_t>(stat.f_blocks) *
static_cast<size_t>(stat.f_bsize) *
+
(static_cast<double>(config::file_cache_enter_disk_resource_limit_mode_percent)
/
+ 100));
+ if (file_cache_settings.capacity == 0 || disk_capacity <
file_cache_settings.capacity) {
+ LOG_INFO(
+ "The cache {} config size {} is larger than {}% disk size
{} or zero, recalc "
+ "it.",
+ cache_base_path, file_cache_settings.capacity,
+ config::file_cache_enter_disk_resource_limit_mode_percent,
disk_capacity);
+ file_cache_settings = get_file_cache_settings(disk_capacity,
+
file_cache_settings.max_query_cache_size);
+ }
+ LOG(INFO) << "[FileCache] path: " << cache_base_path
+ << " total_size: " << file_cache_settings.capacity
+ << " disk_total_size: " << disk_capacity;
}
auto cache = std::make_unique<BlockFileCache>(cache_base_path,
file_cache_settings);
RETURN_IF_ERROR(cache->initialize());
@@ -98,9 +111,7 @@ Status FileCacheFactory::create_file_cache(const
std::string& cache_base_path,
_caches.push_back(std::move(cache));
_capacity += file_cache_settings.capacity;
}
- LOG(INFO) << "[FileCache] path: " << cache_base_path
- << " total_size: " << file_cache_settings.capacity
- << " disk_total_size: " << disk_capacity;
+
return Status::OK();
}
diff --git a/be/src/io/cache/file_cache_common.cpp
b/be/src/io/cache/file_cache_common.cpp
index 61e873e04c6..c569ace0011 100644
--- a/be/src/io/cache/file_cache_common.cpp
+++ b/be/src/io/cache/file_cache_common.cpp
@@ -26,9 +26,22 @@
namespace doris::io {
+std::string FileCacheSettings::to_string() const {
+ std::stringstream ss;
+ ss << "capacity: " << capacity << ", max_file_block_size: " <<
max_file_block_size
+ << ", max_query_cache_size: " << max_query_cache_size
+ << ", disposable_queue_size: " << disposable_queue_size
+ << ", disposable_queue_elements: " << disposable_queue_elements
+ << ", index_queue_size: " << index_queue_size
+ << ", index_queue_elements: " << index_queue_elements
+ << ", query_queue_size: " << query_queue_size
+ << ", query_queue_elements: " << query_queue_elements << ", storage: "
<< storage;
+ return ss.str();
+}
+
FileCacheSettings get_file_cache_settings(size_t capacity, size_t
max_query_cache_size,
size_t normal_percent, size_t
disposable_percent,
- size_t index_percent) {
+ size_t index_percent, const
std::string& storage) {
io::FileCacheSettings settings;
if (capacity == 0) return settings;
settings.capacity = capacity;
@@ -50,6 +63,7 @@ FileCacheSettings get_file_cache_settings(size_t capacity,
size_t max_query_cach
settings.query_queue_elements =
std::max(settings.query_queue_size / settings.max_file_block_size,
REMOTE_FS_OBJECTS_CACHE_DEFAULT_ELEMENTS);
+ settings.storage = storage;
return settings;
}
diff --git a/be/src/io/cache/file_cache_common.h
b/be/src/io/cache/file_cache_common.h
index e660a38e03f..21309831a82 100644
--- a/be/src/io/cache/file_cache_common.h
+++ b/be/src/io/cache/file_cache_common.h
@@ -95,12 +95,17 @@ struct FileCacheSettings {
size_t query_queue_elements {0};
size_t max_file_block_size {0};
size_t max_query_cache_size {0};
+ std::string storage;
+
+ // to string
+ std::string to_string() const;
};
FileCacheSettings get_file_cache_settings(size_t capacity, size_t
max_query_cache_size,
size_t normal_percent =
DEFAULT_NORMAL_PERCENT,
size_t disposable_percent =
DEFAULT_DISPOSABLE_PERCENT,
- size_t index_percent =
DEFAULT_INDEX_PERCENT);
+ size_t index_percent =
DEFAULT_INDEX_PERCENT,
+ const std::string& storage = "disk");
struct CacheContext {
CacheContext(const IOContext* io_context) {
diff --git a/be/src/io/cache/file_cache_storage.h
b/be/src/io/cache/file_cache_storage.h
index 4120fe0ca5a..642c4711cf6 100644
--- a/be/src/io/cache/file_cache_storage.h
+++ b/be/src/io/cache/file_cache_storage.h
@@ -24,6 +24,22 @@ namespace doris::io {
class BlockFileCache;
+using FileWriterMapKey = std::pair<UInt128Wrapper, size_t>;
+
+enum FileCacheStorageType { DISK = 0, MEMORY = 1 };
+
+struct FileWriterMapKeyHash {
+ std::size_t operator()(const FileWriterMapKey& w) const {
+ char* v1 = (char*)&w.first.value_;
+ char* v2 = (char*)&w.second;
+ char buf[24];
+ memcpy(buf, v1, 16);
+ memcpy(buf + 16, v2, 8);
+ std::string_view str(buf, 24);
+ return std::hash<std::string_view> {}(str);
+ }
+};
+
// The interface is for organizing datas in disk
class FileCacheStorage {
public:
@@ -46,6 +62,9 @@ public:
// use when lazy load cache
virtual void load_blocks_directly_unlocked(BlockFileCache* _mgr, const
FileCacheKey& key,
std::lock_guard<std::mutex>&
cache_lock) {}
+ // force clear all current data in the cache
+ virtual Status clear(std::string& msg) = 0;
+ virtual FileCacheStorageType get_type() = 0;
};
} // namespace doris::io
diff --git a/be/src/io/cache/fs_file_cache_storage.cpp
b/be/src/io/cache/fs_file_cache_storage.cpp
index d2662ba36d0..ecdf04c8830 100644
--- a/be/src/io/cache/fs_file_cache_storage.cpp
+++ b/be/src/io/cache/fs_file_cache_storage.cpp
@@ -638,6 +638,27 @@ void
FSFileCacheStorage::load_blocks_directly_unlocked(BlockFileCache* mgr, cons
}
}
+Status FSFileCacheStorage::clear(std::string& msg) {
+ std::stringstream ss;
+ auto st = global_local_filesystem()->delete_directory(_cache_base_path);
+ if (!st.ok()) {
+ ss << "failed to clear_file_cache_directly, path=" << _cache_base_path
+ << " delete dir failed: " << st;
+ LOG(WARNING) << ss.str();
+ msg = ss.str();
+ return Status::InternalError(ss.str());
+ }
+ st = global_local_filesystem()->create_directory(_cache_base_path);
+ if (!st.ok()) {
+ ss << "failed to clear_file_cache_directly, path=" << _cache_base_path
+ << " create dir failed: " << st;
+ LOG(WARNING) << ss.str();
+ msg = ss.str();
+ return Status::InternalError(ss.str());
+ }
+ return Status::OK();
+}
+
FSFileCacheStorage::~FSFileCacheStorage() {
if (_cache_background_load_thread.joinable()) {
_cache_background_load_thread.join();
diff --git a/be/src/io/cache/fs_file_cache_storage.h
b/be/src/io/cache/fs_file_cache_storage.h
index 352b4e21f3f..23e98f422ac 100644
--- a/be/src/io/cache/fs_file_cache_storage.h
+++ b/be/src/io/cache/fs_file_cache_storage.h
@@ -69,6 +69,7 @@ public:
Status change_key_meta_expiration(const FileCacheKey& key, const uint64_t
expiration) override;
void load_blocks_directly_unlocked(BlockFileCache* _mgr, const
FileCacheKey& key,
std::lock_guard<std::mutex>&
cache_lock) override;
+ Status clear(std::string& msg) override;
[[nodiscard]] static std::string get_path_in_local_cache(const
std::string& dir, size_t offset,
FileCacheType
type,
@@ -82,6 +83,8 @@ public:
[[nodiscard]] std::string get_path_in_local_cache(const UInt128Wrapper&,
uint64_t
expiration_time) const;
+ FileCacheStorageType get_type() override { return DISK; }
+
private:
Status upgrade_cache_dir_if_necessary() const;
@@ -98,19 +101,6 @@ private:
void load_cache_info_into_memory(BlockFileCache* _mgr) const;
- using FileWriterMapKey = std::pair<UInt128Wrapper, size_t>;
- struct FileWriterMapKeyHash {
- std::size_t operator()(const FileWriterMapKey& w) const {
- char* v1 = (char*)&w.first.value_;
- char* v2 = (char*)&w.second;
- char buf[24];
- memcpy(buf, v1, 16);
- memcpy(buf + 16, v2, 8);
- std::string_view str(buf, 24);
- return std::hash<std::string_view> {}(str);
- }
- };
-
std::string _cache_base_path;
std::thread _cache_background_load_thread;
const std::shared_ptr<LocalFileSystem>& fs = global_local_filesystem();
diff --git a/be/src/io/cache/mem_file_cache_storage.cpp
b/be/src/io/cache/mem_file_cache_storage.cpp
new file mode 100644
index 00000000000..bffa75ae305
--- /dev/null
+++ b/be/src/io/cache/mem_file_cache_storage.cpp
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "io/cache/mem_file_cache_storage.h"
+
+#include <filesystem>
+#include <mutex>
+#include <system_error>
+
+#include "common/logging.h"
+#include "io/cache/block_file_cache.h"
+#include "io/cache/file_block.h"
+#include "io/cache/file_cache_common.h"
+#include "runtime/exec_env.h"
+#include "vec/common/hex.h"
+
+namespace doris::io {
+
+MemFileCacheStorage::~MemFileCacheStorage() {}
+
+Status MemFileCacheStorage::init(BlockFileCache* _mgr) {
+ LOG_INFO("init in-memory file cache storage");
+ _mgr->_async_open_done = true; // no data to load for memory storage
+ return Status::OK();
+}
+
+Status MemFileCacheStorage::append(const FileCacheKey& key, const Slice&
value) {
+ std::lock_guard<std::mutex> lock(_cache_map_mtx);
+
+ auto map_key = std::make_pair(key.hash, key.offset);
+ auto iter = _cache_map.find(map_key);
+ if (iter != _cache_map.end()) {
+ // despite the name append, it is indeed a put, so the key should not
exist
+ LOG_WARNING("key already exists in in-memory cache map")
+ .tag("hash", key.hash.to_string())
+ .tag("offset", key.offset);
+ DCHECK(false);
+ return Status::IOError("key already exists in in-memory cache map");
+ }
+ // TODO(zhengyu): allocate in mempool
+ auto mem_block =
+ MemBlock {std::shared_ptr<char[]>(new char[value.size],
std::default_delete<char[]>())};
+ DCHECK(mem_block.addr != nullptr);
+ _cache_map[map_key] = mem_block;
+ char* dst = mem_block.addr.get();
+ // TODO(zhengyu): zero copy!
+ memcpy(dst, value.data, value.size);
+
+ return Status::OK();
+}
+
+Status MemFileCacheStorage::finalize(const FileCacheKey& key) {
+ // do nothing for in memory cache coz nothing to persist
+ // download state in FileBlock::finalize will inform the readers when
finish
+ return Status::OK();
+}
+
+Status MemFileCacheStorage::read(const FileCacheKey& key, size_t value_offset,
Slice buffer) {
+ std::lock_guard<std::mutex> lock(_cache_map_mtx);
+ auto map_key = std::make_pair(key.hash, key.offset);
+ auto iter = _cache_map.find(map_key);
+ if (iter == _cache_map.end()) {
+ LOG_WARNING("key not found in cache map")
+ .tag("hash", key.hash.to_string())
+ .tag("offset", key.offset);
+ return Status::IOError("key not found in in-memory cache map when
read");
+ }
+ auto mem_block = iter->second;
+ DCHECK(mem_block.addr != nullptr);
+ char* src = mem_block.addr.get();
+ char* dst = buffer.data;
+ size_t size = buffer.size;
+ memcpy(dst, src, size);
+ return Status::OK();
+}
+
+Status MemFileCacheStorage::remove(const FileCacheKey& key) {
+ // find and clear the one in _cache_map
+ std::lock_guard<std::mutex> lock(_cache_map_mtx);
+ auto map_key = std::make_pair(key.hash, key.offset);
+ auto iter = _cache_map.find(map_key);
+ if (iter == _cache_map.end()) {
+ LOG_WARNING("key not found in cache map")
+ .tag("hash", key.hash.to_string())
+ .tag("offset", key.offset);
+ return Status::IOError("key not found in in-memory cache map when
remove");
+ }
+ _cache_map.erase(iter);
+
+ return Status::OK();
+}
+
+Status MemFileCacheStorage::change_key_meta_type(const FileCacheKey& key,
+ const FileCacheType type) {
+ // do nothing for in memory cache coz nothing to persist
+ return Status::OK();
+}
+
+Status MemFileCacheStorage::change_key_meta_expiration(const FileCacheKey& key,
+ const uint64_t
expiration) {
+ // do nothing for in memory cache coz nothing to persist
+ return Status::OK();
+}
+
+void MemFileCacheStorage::load_blocks_directly_unlocked(BlockFileCache* _mgr,
+ const FileCacheKey&
key,
+
std::lock_guard<std::mutex>& cache_lock) {
+ // load nothing for in memory cache coz nothing is persisted
+}
+
+Status MemFileCacheStorage::clear(std::string& msg) {
+ std::lock_guard<std::mutex> lock(_cache_map_mtx);
+ _cache_map.clear();
+ return Status::OK();
+}
+
+} // namespace doris::io
diff --git a/be/src/io/cache/mem_file_cache_storage.h
b/be/src/io/cache/mem_file_cache_storage.h
new file mode 100644
index 00000000000..20fdd8ce9f6
--- /dev/null
+++ b/be/src/io/cache/mem_file_cache_storage.h
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <shared_mutex>
+#include <thread>
+
+#include "io/cache/file_cache_common.h"
+#include "io/cache/file_cache_storage.h"
+
+namespace doris::io {
+
+struct MemBlock {
+ std::shared_ptr<char[]> addr;
+};
+
+class MemFileCacheStorage : public FileCacheStorage {
+public:
+ MemFileCacheStorage() = default;
+ ~MemFileCacheStorage() override;
+ Status init(BlockFileCache* _mgr) override;
+ Status append(const FileCacheKey& key, const Slice& value) override;
+ Status finalize(const FileCacheKey& key) override;
+ Status read(const FileCacheKey& key, size_t value_offset, Slice buffer)
override;
+ Status remove(const FileCacheKey& key) override;
+ Status change_key_meta_type(const FileCacheKey& key, const FileCacheType
type) override;
+ Status change_key_meta_expiration(const FileCacheKey& key, const uint64_t
expiration) override;
+ void load_blocks_directly_unlocked(BlockFileCache* _mgr, const
FileCacheKey& key,
+ std::lock_guard<std::mutex>&
cache_lock) override;
+ Status clear(std::string& msg) override;
+
+ FileCacheStorageType get_type() override { return MEMORY; }
+
+private:
+ std::unordered_map<FileWriterMapKey, MemBlock, FileWriterMapKeyHash>
_cache_map;
+ std::mutex _cache_map_mtx;
+};
+
+} // namespace doris::io
diff --git a/be/src/olap/options.cpp b/be/src/olap/options.cpp
index cd53e6c0b1f..9c500c10993 100644
--- a/be/src/olap/options.cpp
+++ b/be/src/olap/options.cpp
@@ -56,6 +56,9 @@ static std::string CACHE_QUERY_LIMIT_SIZE = "query_limit";
static std::string CACHE_NORMAL_PERCENT = "normal_percent";
static std::string CACHE_DISPOSABLE_PERCENT = "disposable_percent";
static std::string CACHE_INDEX_PERCENT = "index_percent";
+static std::string CACHE_STORAGE = "storage";
+static std::string CACHE_STORAGE_DISK = "disk";
+static std::string CACHE_STORAGE_MEMORY = "memory";
// TODO: should be a general util method
// static std::string to_upper(const std::string& str) {
@@ -204,6 +207,7 @@ void parse_conf_broken_store_paths(const string&
config_path, std::set<std::stri
* {"path": "storage1", "total_size":53687091200,"query_limit":
"10737418240"},
* {"path": "storage2", "total_size":53687091200},
* {"path": "storage3", "total_size":53687091200, "normal_percent":85,
"disposable_percent":10, "index_percent":5}
+ * {"path": "xxx", "total_size":53687091200, "storage": "memory"}
* ]
*/
Status parse_conf_cache_paths(const std::string& config_path,
std::vector<CachePath>& paths) {
@@ -215,6 +219,18 @@ Status parse_conf_cache_paths(const std::string&
config_path, std::vector<CacheP
auto map = config.GetObject();
DCHECK(map.HasMember(CACHE_PATH.c_str()));
std::string path =
map.FindMember(CACHE_PATH.c_str())->value.GetString();
+ std::string storage = CACHE_STORAGE_DISK; // disk storage by default
+ if (map.HasMember(CACHE_STORAGE.c_str())) {
+ storage = map.FindMember(CACHE_STORAGE.c_str())->value.GetString();
+ if (storage != CACHE_STORAGE_DISK && storage !=
CACHE_STORAGE_MEMORY) [[unlikely]] {
+ return Status::InvalidArgument("invalid file cache storage
type: " + storage);
+ }
+ if (storage == CACHE_STORAGE_MEMORY) {
+ // set path to "memory" for memory storage
+ // so that we can track it by path (use _path_to_cache map)
+ path = CACHE_STORAGE_MEMORY;
+ }
+ }
int64_t total_size = 0, query_limit_bytes = 0;
if (map.HasMember(CACHE_TOTAL_SIZE.c_str())) {
auto& value = map.FindMember(CACHE_TOTAL_SIZE.c_str())->value;
@@ -268,7 +284,7 @@ Status parse_conf_cache_paths(const std::string&
config_path, std::vector<CacheP
}
paths.emplace_back(std::move(path), total_size, query_limit_bytes,
normal_percent,
- disposable_percent, index_percent);
+ disposable_percent, index_percent, storage);
}
if (paths.empty()) {
return Status::InvalidArgument("fail to parse storage_root_path
config. value={}",
@@ -279,7 +295,7 @@ Status parse_conf_cache_paths(const std::string&
config_path, std::vector<CacheP
io::FileCacheSettings CachePath::init_settings() const {
return io::get_file_cache_settings(total_bytes, query_limit_bytes,
normal_percent,
- disposable_percent, index_percent);
+ disposable_percent, index_percent,
storage);
}
} // end namespace doris
diff --git a/be/src/olap/options.h b/be/src/olap/options.h
index 3e53faee075..7fec49468f8 100644
--- a/be/src/olap/options.h
+++ b/be/src/olap/options.h
@@ -53,13 +53,15 @@ struct CachePath {
io::FileCacheSettings init_settings() const;
CachePath(std::string path, int64_t total_bytes, int64_t query_limit_bytes,
- size_t normal_percent, size_t disposable_percent, size_t
index_percent)
+ size_t normal_percent, size_t disposable_percent, size_t
index_percent,
+ std::string storage)
: path(std::move(path)),
total_bytes(total_bytes),
query_limit_bytes(query_limit_bytes),
normal_percent(normal_percent),
disposable_percent(disposable_percent),
- index_percent(index_percent) {}
+ index_percent(index_percent),
+ storage(storage) {}
std::string path;
int64_t total_bytes = 0;
@@ -67,6 +69,7 @@ struct CachePath {
size_t normal_percent = io::DEFAULT_NORMAL_PERCENT;
size_t disposable_percent = io::DEFAULT_DISPOSABLE_PERCENT;
size_t index_percent = io::DEFAULT_INDEX_PERCENT;
+ std::string storage = "disk";
};
Status parse_conf_cache_paths(const std::string& config_path,
std::vector<CachePath>& path);
diff --git a/be/test/io/cache/block_file_cache_test.cpp
b/be/test/io/cache/block_file_cache_test.cpp
index 4c3285a27b3..f77dc439e95 100644
--- a/be/test/io/cache/block_file_cache_test.cpp
+++ b/be/test/io/cache/block_file_cache_test.cpp
@@ -110,6 +110,17 @@ void download(io::FileBlockSPtr file_block, size_t size =
0) {
ASSERT_TRUE(fs::exists(subdir));
}
+void download_into_memory(io::FileBlockSPtr file_block, size_t size = 0) {
+ if (size == 0) {
+ size = file_block->range().size();
+ }
+
+ std::string data(size, '0');
+ Slice result(data.data(), size);
+ EXPECT_TRUE(file_block->append(result).ok());
+ EXPECT_TRUE(file_block->finalize().ok());
+}
+
void complete(const io::FileBlocksHolder& holder) {
for (const auto& file_block : holder.file_blocks) {
ASSERT_TRUE(file_block->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
@@ -117,6 +128,13 @@ void complete(const io::FileBlocksHolder& holder) {
}
}
+void complete_into_memory(const io::FileBlocksHolder& holder) {
+ for (const auto& file_block : holder.file_blocks) {
+ ASSERT_TRUE(file_block->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download_into_memory(file_block);
+ }
+}
+
class BlockFileCacheTest : public testing::Test {
public:
static void SetUpTestSuite() {
@@ -639,6 +657,397 @@ void test_file_cache(io::FileCacheType cache_type) {
}
}
+void test_file_cache_memory_storage(io::FileCacheType cache_type) {
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+
+ TUniqueId other_query_id;
+ other_query_id.hi = 2;
+ other_query_id.lo = 2;
+
+ io::FileCacheSettings settings;
+ switch (cache_type) {
+ case io::FileCacheType::INDEX:
+ settings.index_queue_elements = 5;
+ settings.index_queue_size = 30;
+ break;
+ case io::FileCacheType::NORMAL:
+ settings.query_queue_size = 30;
+ settings.query_queue_elements = 5;
+ break;
+ case io::FileCacheType::DISPOSABLE:
+ settings.disposable_queue_size = 30;
+ settings.disposable_queue_elements = 5;
+ break;
+ default:
+ break;
+ }
+ settings.capacity = 30;
+ settings.max_file_block_size = 30;
+ settings.max_query_cache_size = 30;
+ settings.storage = "memory";
+ io::CacheContext context, other_context;
+ context.cache_type = other_context.cache_type = cache_type;
+ context.query_id = query_id;
+ other_context.query_id = other_query_id;
+ auto key = io::BlockFileCache::hash("key1");
+ {
+ io::BlockFileCache mgr(cache_base_path, settings);
+ ASSERT_TRUE(mgr.initialize().ok());
+
+ for (int i = 0; i < 100; i++) {
+ if (mgr.get_async_open_success()) {
+ break;
+ };
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ {
+ auto holder = mgr.get_or_set(key, 0, 10, context); /// Add range
[0, 9]
+ auto blocks = fromHolder(holder);
+ /// Range was not present in mgr. It should be added in mgr as one
while file block.
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(1, blocks[0], io::FileBlock::Range(0, 9),
io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ assert_range(2, blocks[0], io::FileBlock::Range(0, 9),
+ io::FileBlock::State::DOWNLOADING);
+ download_into_memory(blocks[0]);
+ assert_range(3, blocks[0], io::FileBlock::Range(0, 9),
+ io::FileBlock::State::DOWNLOADED);
+ }
+ /// Current mgr: [__________]
+ /// ^ ^
+ /// 0 9
+ ASSERT_EQ(mgr.get_file_blocks_num(cache_type), 1);
+ ASSERT_EQ(mgr.get_used_cache_size(cache_type), 10);
+ {
+ /// Want range [5, 14], but [0, 9] already in mgr, so only [10,
14] will be put in mgr.
+ auto holder = mgr.get_or_set(key, 5, 10, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 2);
+
+ assert_range(4, blocks[0], io::FileBlock::Range(0, 9),
+ io::FileBlock::State::DOWNLOADED);
+ assert_range(5, blocks[1], io::FileBlock::Range(10, 14),
io::FileBlock::State::EMPTY);
+
+ ASSERT_TRUE(blocks[1]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download_into_memory(blocks[1]);
+ assert_range(6, blocks[1], io::FileBlock::Range(10, 14),
+ io::FileBlock::State::DOWNLOADED);
+ }
+
+ /// Current mgr: [__________][_____]
+ /// ^ ^^ ^
+ /// 0 910 14
+ ASSERT_EQ(mgr.get_file_blocks_num(cache_type), 2);
+ ASSERT_EQ(mgr.get_used_cache_size(cache_type), 15);
+
+ {
+ auto holder = mgr.get_or_set(key, 9, 1, context); /// Get [9, 9]
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(7, blocks[0], io::FileBlock::Range(0, 9),
+ io::FileBlock::State::DOWNLOADED);
+ }
+ {
+ auto holder = mgr.get_or_set(key, 9, 2, context); /// Get [9, 10]
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 2);
+ assert_range(8, blocks[0], io::FileBlock::Range(0, 9),
+ io::FileBlock::State::DOWNLOADED);
+ assert_range(9, blocks[1], io::FileBlock::Range(10, 14),
+ io::FileBlock::State::DOWNLOADED);
+ }
+ {
+ auto holder = mgr.get_or_set(key, 10, 1, context); /// Get [10, 10]
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(10, blocks[0], io::FileBlock::Range(10, 14),
+ io::FileBlock::State::DOWNLOADED);
+ }
+ complete_into_memory(mgr.get_or_set(key, 17, 4, context)); /// Get
[17, 20]
+ complete_into_memory(mgr.get_or_set(key, 24, 3, context)); /// Get
[24, 26]
+
+ /// Current mgr: [__________][_____] [____] [___]
+ /// ^ ^^ ^ ^ ^ ^ ^
+ /// 0 910 14 17 20 24 26
+ ///
+ ASSERT_EQ(mgr.get_file_blocks_num(cache_type), 4);
+ ASSERT_EQ(mgr.get_used_cache_size(cache_type), 22);
+ {
+ auto holder = mgr.get_or_set(key, 0, 31, context); /// Get [0, 25]
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 7);
+ assert_range(11, blocks[0], io::FileBlock::Range(0, 9),
+ io::FileBlock::State::DOWNLOADED);
+ assert_range(12, blocks[1], io::FileBlock::Range(10, 14),
+ io::FileBlock::State::DOWNLOADED);
+ /// Missing [15, 16] should be added in mgr.
+ assert_range(13, blocks[2], io::FileBlock::Range(15, 16),
io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[2]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download_into_memory(blocks[2]);
+
+ assert_range(14, blocks[3], io::FileBlock::Range(17, 20),
+ io::FileBlock::State::DOWNLOADED);
+
+ assert_range(15, blocks[4], io::FileBlock::Range(21, 23),
io::FileBlock::State::EMPTY);
+
+ assert_range(16, blocks[5], io::FileBlock::Range(24, 26),
+ io::FileBlock::State::DOWNLOADED);
+
+ assert_range(16, blocks[6], io::FileBlock::Range(27, 30),
+ io::FileBlock::State::SKIP_CACHE);
+ /// Current mgr: [__________][_____][ ][____________]
+ /// ^ ^ ^
+ /// 0 20 26
+ ///
+
+ /// Range [27, 30] must be evicted in previous getOrSet [0, 25].
+ /// Let's not invalidate pointers to returned blocks from range
[0, 25] and
+ /// as max elements size is reached, next attempt to put something
in mgr should fail.
+ /// This will also check that [27, 27] was indeed evicted.
+
+ auto holder1 = mgr.get_or_set(key, 27, 4, context);
+ auto blocks_1 = fromHolder(holder1); /// Get [27, 30]
+ ASSERT_EQ(blocks_1.size(), 1);
+ assert_range(17, blocks_1[0], io::FileBlock::Range(27, 30),
+ io::FileBlock::State::SKIP_CACHE);
+ }
+ /// Current mgr: [__________][_____][_][____] [___]
+ /// ^ ^^ ^ ^ ^ ^ ^
+ /// 0 910 14 17 20 24 26
+ ///
+ {
+ auto holder = mgr.get_or_set(key, 12, 10, context); /// Get [12,
21]
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 4);
+
+ assert_range(18, blocks[0], io::FileBlock::Range(10, 14),
+ io::FileBlock::State::DOWNLOADED);
+ assert_range(19, blocks[1], io::FileBlock::Range(15, 16),
+ io::FileBlock::State::DOWNLOADED);
+ assert_range(20, blocks[2], io::FileBlock::Range(17, 20),
+ io::FileBlock::State::DOWNLOADED);
+ assert_range(21, blocks[3], io::FileBlock::Range(21, 21),
io::FileBlock::State::EMPTY);
+
+ ASSERT_TRUE(blocks[3]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download_into_memory(blocks[3]);
+ ASSERT_TRUE(blocks[3]->state() ==
io::FileBlock::State::DOWNLOADED);
+ }
+ /// Current mgr: [__________][_____][_][____][_] [___]
+ /// ^ ^^ ^ ^ ^ ^ ^
+ /// 0 910 14 17 20 24 26
+
+ ASSERT_EQ(mgr.get_file_blocks_num(cache_type), 6);
+ {
+ auto holder = mgr.get_or_set(key, 23, 5, context); /// Get [23, 28]
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 3);
+
+ assert_range(22, blocks[0], io::FileBlock::Range(23, 23),
io::FileBlock::State::EMPTY);
+ assert_range(23, blocks[1], io::FileBlock::Range(24, 26),
+ io::FileBlock::State::DOWNLOADED);
+ assert_range(24, blocks[2], io::FileBlock::Range(27, 27),
io::FileBlock::State::EMPTY);
+
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ ASSERT_TRUE(blocks[2]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download_into_memory(blocks[0]);
+ download_into_memory(blocks[2]);
+ }
+ /// Current mgr: [__________][_____][_][____][_] [_][___][_]
+ /// ^ ^^ ^ ^ ^ ^ ^
+ /// 0 910 14 17 20 24 26
+
+ ASSERT_EQ(mgr.get_file_blocks_num(cache_type), 8);
+ {
+ auto holder5 = mgr.get_or_set(key, 2, 3, context); /// Get [2, 4]
+ auto s5 = fromHolder(holder5);
+ ASSERT_EQ(s5.size(), 1);
+ assert_range(25, s5[0], io::FileBlock::Range(0, 9),
io::FileBlock::State::DOWNLOADED);
+
+ auto holder1 = mgr.get_or_set(key, 30, 2, context); /// Get [30,
31]
+ auto s1 = fromHolder(holder1);
+ ASSERT_EQ(s1.size(), 1);
+ assert_range(26, s1[0], io::FileBlock::Range(30, 31),
io::FileBlock::State::EMPTY);
+
+ ASSERT_TRUE(s1[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download_into_memory(s1[0]);
+
+ /// Current mgr: [__________][_____][_][____][_] [_][___][_]
[__]
+ /// ^ ^^ ^ ^ ^ ^ ^ ^
^ ^
+ /// 0 910 14 17 20 24 26
27 30 31
+
+ auto holder2 = mgr.get_or_set(key, 23, 1, context); /// Get [23,
23]
+ auto s2 = fromHolder(holder2);
+ ASSERT_EQ(s2.size(), 1);
+
+ auto holder3 = mgr.get_or_set(key, 24, 3, context); /// Get [24,
26]
+ auto s3 = fromHolder(holder3);
+ ASSERT_EQ(s3.size(), 1);
+
+ auto holder4 = mgr.get_or_set(key, 27, 1, context); /// Get [27,
27]
+ auto s4 = fromHolder(holder4);
+ ASSERT_EQ(s4.size(), 1);
+
+ /// All mgr is now unreleasable because pointers are still hold
+ auto holder6 = mgr.get_or_set(key, 0, 40, context);
+ auto f = fromHolder(holder6);
+ ASSERT_EQ(f.size(), 12);
+
+ assert_range(29, f[9], io::FileBlock::Range(28, 29),
io::FileBlock::State::SKIP_CACHE);
+ assert_range(30, f[11], io::FileBlock::Range(32, 39),
io::FileBlock::State::SKIP_CACHE);
+ }
+ {
+ auto holder = mgr.get_or_set(key, 2, 3, context); /// Get [2, 4]
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(31, blocks[0], io::FileBlock::Range(0, 9),
+ io::FileBlock::State::DOWNLOADED);
+ }
+ // Current cache: [__________][_____][_][____][_] [_][___][_]
[__]
+ // ^ ^^ ^ ^ ^ ^ ^ ^ ^
^
+ // 0 910 14 17 20 24 26 27
30 31
+
+ {
+ auto holder = mgr.get_or_set(key, 25, 5, context); /// Get [25, 29]
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 3);
+
+ assert_range(32, blocks[0], io::FileBlock::Range(24, 26),
+ io::FileBlock::State::DOWNLOADED);
+ assert_range(33, blocks[1], io::FileBlock::Range(27, 27),
+ io::FileBlock::State::DOWNLOADED);
+
+ assert_range(34, blocks[2], io::FileBlock::Range(28, 29),
io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[2]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ ASSERT_TRUE(blocks[2]->state() ==
io::FileBlock::State::DOWNLOADING);
+
+ bool lets_start_download = false;
+ std::mutex mutex;
+ std::condition_variable cv;
+
+ std::thread other_1([&] {
+ auto holder_2 =
+ mgr.get_or_set(key, 25, 5, other_context); /// Get
[25, 29] once again.
+ auto blocks_2 = fromHolder(holder_2);
+ ASSERT_EQ(blocks.size(), 3);
+
+ assert_range(35, blocks_2[0], io::FileBlock::Range(24, 26),
+ io::FileBlock::State::DOWNLOADED);
+ assert_range(36, blocks_2[1], io::FileBlock::Range(27, 27),
+ io::FileBlock::State::DOWNLOADED);
+ assert_range(37, blocks_2[2], io::FileBlock::Range(28, 29),
+ io::FileBlock::State::DOWNLOADING);
+
+ ASSERT_TRUE(blocks[2]->get_or_set_downloader() !=
io::FileBlock::get_caller_id());
+ ASSERT_TRUE(blocks[2]->state() ==
io::FileBlock::State::DOWNLOADING);
+
+ {
+ std::lock_guard lock(mutex);
+ lets_start_download = true;
+ }
+ cv.notify_one();
+
+ while (blocks_2[2]->wait() ==
io::FileBlock::State::DOWNLOADING) {
+ }
+ ASSERT_TRUE(blocks_2[2]->state() ==
io::FileBlock::State::DOWNLOADED);
+ });
+
+ {
+ std::unique_lock lock(mutex);
+ cv.wait(lock, [&] { return lets_start_download; });
+ }
+
+ download_into_memory(blocks[2]);
+ ASSERT_TRUE(blocks[2]->state() ==
io::FileBlock::State::DOWNLOADED);
+
+ other_1.join();
+ }
+ ASSERT_EQ(mgr.get_file_blocks_num(cache_type), 9);
+ // Current cache: [__________][_____][_][____][_] [_][___][_]
[__]
+ // ^ ^^ ^ ^ ^ ^ ^ ^ ^
^
+ // 0 910 14 17 20 24 26 27
30 31
+
+ {
+ /// Now let's check the similar case but getting ERROR state after
block->wait(), when
+ /// state is changed not manually via block->complete(state) but
from destructor of holder
+ /// and notify_all() is also called from destructor of holder.
+
+ std::optional<io::FileBlocksHolder> holder;
+ holder.emplace(mgr.get_or_set(key, 3, 23, context)); /// Get [3,
25]
+
+ auto blocks = fromHolder(*holder);
+ ASSERT_EQ(blocks.size(), 8);
+
+ assert_range(38, blocks[0], io::FileBlock::Range(0, 9),
+ io::FileBlock::State::DOWNLOADED);
+
+ assert_range(39, blocks[5], io::FileBlock::Range(22, 22),
io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[5]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ ASSERT_TRUE(blocks[5]->state() ==
io::FileBlock::State::DOWNLOADING);
+
+ bool lets_start_download = false;
+ std::mutex mutex;
+ std::condition_variable cv;
+
+ std::thread other_1([&] {
+ auto holder_2 =
+ mgr.get_or_set(key, 3, 23, other_context); /// Get [3,
25] once again
+ auto blocks_2 = fromHolder(*holder);
+ ASSERT_EQ(blocks_2.size(), 8);
+
+ assert_range(41, blocks_2[0], io::FileBlock::Range(0, 9),
+ io::FileBlock::State::DOWNLOADED);
+ assert_range(42, blocks_2[5], io::FileBlock::Range(22, 22),
+ io::FileBlock::State::DOWNLOADING);
+
+ ASSERT_TRUE(blocks_2[5]->get_downloader() !=
io::FileBlock::get_caller_id());
+ ASSERT_TRUE(blocks_2[5]->state() ==
io::FileBlock::State::DOWNLOADING);
+
+ {
+ std::lock_guard lock(mutex);
+ lets_start_download = true;
+ }
+ cv.notify_one();
+
+ while (blocks_2[5]->wait() ==
io::FileBlock::State::DOWNLOADING) {
+ }
+ ASSERT_TRUE(blocks_2[5]->state() ==
io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks_2[5]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download_into_memory(blocks_2[5]);
+ });
+
+ {
+ std::unique_lock lock(mutex);
+ cv.wait(lock, [&] { return lets_start_download; });
+ }
+ holder.reset();
+ other_1.join();
+ ASSERT_TRUE(blocks[5]->state() ==
io::FileBlock::State::DOWNLOADED);
+ }
+ }
+ // Current cache: [__________][_][____][_] [_][___][_] [__]
+ // ^ ^ ^ ^ ^ ^ ^ ^ ^
+ // 0 9 17 20 24 26 27 30 31
+ {
+ /// Test LRUCache::restore().
+ // storage will restore nothing
+
+ io::BlockFileCache cache2(cache_base_path, settings);
+ ASSERT_TRUE(cache2.initialize().ok());
+ for (int i = 0; i < 100; i++) {
+ if (cache2.get_async_open_success()) {
+ break;
+ };
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ auto holder1 = cache2.get_or_set(key, 2, 28, context); /// Get [2, 29]
+
+ auto blocks1 = fromHolder(holder1);
+ ASSERT_EQ(blocks1.size(), 1);
+ }
+}
+
TEST_F(BlockFileCacheTest, normal) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
@@ -657,6 +1066,11 @@ TEST_F(BlockFileCacheTest, normal) {
}
}
+TEST_F(BlockFileCacheTest, normal_memory_storage) {
+ test_file_cache_memory_storage(io::FileCacheType::INDEX);
+ test_file_cache_memory_storage(io::FileCacheType::NORMAL);
+}
+
TEST_F(BlockFileCacheTest, resize) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
@@ -740,6 +1154,53 @@ TEST_F(BlockFileCacheTest, max_ttl_size) {
}
}
+TEST_F(BlockFileCacheTest, max_ttl_size_memory_storage) {
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 100000000;
+ settings.query_queue_elements = 100000;
+ settings.capacity = 100000000;
+ settings.max_file_block_size = 100000;
+ settings.max_query_cache_size = 30;
+ settings.storage = "memory";
+ io::CacheContext context;
+ context.cache_type = io::FileCacheType::TTL;
+ context.query_id = query_id;
+ int64_t cur_time = UnixSeconds();
+ context.expiration_time = cur_time + 120;
+ auto key1 = io::BlockFileCache::hash("key5");
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ int i = 0;
+ for (; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+ int64_t offset = 0;
+ for (; offset < 100000000; offset += 100000) {
+ auto holder = cache.get_or_set(key1, offset, 100000, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ if (offset < 90000000) {
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download_into_memory(blocks[0]);
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::DOWNLOADED);
+ } else {
+ assert_range(1, blocks[0], io::FileBlock::Range(offset, offset +
99999),
+ io::FileBlock::State::SKIP_CACHE);
+ }
+ blocks.clear();
+ }
+}
+
TEST_F(BlockFileCacheTest, query_limit_heap_use_after_free) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
@@ -1043,6 +1504,47 @@ TEST_F(BlockFileCacheTest, change_cache_type) {
}
}
+TEST_F(BlockFileCacheTest, change_cache_type_memory_storage) {
+ doris::config::enable_file_cache_query_limit = true;
+ io::FileCacheSettings settings;
+ settings.index_queue_elements = 5;
+ settings.index_queue_size = 15;
+ settings.disposable_queue_size = 0;
+ settings.disposable_queue_elements = 0;
+ settings.query_queue_size = 15;
+ settings.query_queue_elements = 5;
+ settings.max_file_block_size = 10;
+ settings.max_query_cache_size = 15;
+ settings.capacity = 30;
+ settings.storage = "memory";
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ for (int i = 0; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ };
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ io::CacheContext context;
+ context.cache_type = io::FileCacheType::NORMAL;
+ auto key = io::BlockFileCache::hash("key1");
+ {
+ auto holder = cache.get_or_set(key, 0, 9, context); /// Add range [0,
8]
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(1, blocks[0], io::FileBlock::Range(0, 8),
io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ assert_range(2, blocks[0], io::FileBlock::Range(0, 8),
io::FileBlock::State::DOWNLOADING);
+ size_t size = blocks[0]->range().size();
+ std::string data(size, '0');
+ Slice result(data.data(), size);
+ ASSERT_TRUE(blocks[0]->append(result).ok());
+ ASSERT_TRUE(
+
blocks[0]->change_cache_type_between_normal_and_index(io::FileCacheType::INDEX));
+ ASSERT_TRUE(blocks[0]->finalize().ok());
+ }
+}
+
TEST_F(BlockFileCacheTest, fd_cache_remove) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
@@ -1723,6 +2225,83 @@ TEST_F(BlockFileCacheTest, ttl_modify) {
}
}
+TEST_F(BlockFileCacheTest, ttl_modify_memory_storage) {
+ test_file_cache_memory_storage(io::FileCacheType::NORMAL);
+ auto sp = SyncPoint::get_instance();
+ SyncPoint::CallbackGuard guard1;
+ sp->set_call_back(
+ "BlockFileCache::set_sleep_time",
+ [](auto&& args) { *try_any_cast<int64_t*>(args[0]) = 1; },
&guard1);
+ sp->enable_processing();
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 30;
+ settings.query_queue_elements = 5;
+ settings.capacity = 30;
+ settings.max_file_block_size = 30;
+ settings.max_query_cache_size = 30;
+ settings.storage = "memory";
+ io::CacheContext context;
+ context.cache_type = io::FileCacheType::TTL;
+ context.query_id = query_id;
+ int64_t cur_time = UnixSeconds();
+ context.expiration_time = cur_time + 120;
+ int64_t modify_time = cur_time + 5;
+ auto key1 = io::BlockFileCache::hash("key5");
+ auto key2 = io::BlockFileCache::hash("key6");
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ for (int i = 0; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ };
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ {
+ auto holder = cache.get_or_set(key1, 50, 10, context); /// Add range
[50, 59]
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(1, blocks[0], io::FileBlock::Range(50, 59),
io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download_into_memory(blocks[0]);
+ assert_range(1, blocks[0], io::FileBlock::Range(50, 59),
io::FileBlock::State::DOWNLOADED);
+ EXPECT_EQ(blocks[0]->cache_type(), io::FileCacheType::TTL);
+ }
+ {
+ auto holder = cache.get_or_set(key2, 50, 10, context); /// Add range
[50, 59]
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(1, blocks[0], io::FileBlock::Range(50, 59),
io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download_into_memory(blocks[0]);
+ assert_range(1, blocks[0], io::FileBlock::Range(50, 59),
io::FileBlock::State::DOWNLOADED);
+ EXPECT_EQ(blocks[0]->cache_type(), io::FileCacheType::TTL);
+ }
+ cache.modify_expiration_time(key2, 0);
+ {
+ context.cache_type = io::FileCacheType::INDEX;
+ context.expiration_time = 0;
+ auto holder = cache.get_or_set(key2, 50, 10, context); /// Add range
[50, 59]
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(1, blocks[0], io::FileBlock::Range(50, 59),
io::FileBlock::State::DOWNLOADED);
+ EXPECT_EQ(blocks[0]->cache_type(), io::FileCacheType::NORMAL);
+ EXPECT_EQ(blocks[0]->expiration_time(), 0);
+ std::string buffer(10, '1');
+ EXPECT_TRUE(blocks[0]->read(Slice(buffer.data(), 10), 0).ok());
+ EXPECT_EQ(buffer, std::string(10, '0'));
+ }
+ {
+ cache.modify_expiration_time(key2, modify_time);
+ context.expiration_time = modify_time;
+ auto holder = cache.get_or_set(key2, 50, 10, context); /// Add range
[50, 59]
+ auto blocks = fromHolder(holder);
+ EXPECT_EQ(blocks[0]->expiration_time(), modify_time);
+ }
+}
+
TEST_F(BlockFileCacheTest, ttl_change_to_normal) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
@@ -1786,6 +2365,57 @@ TEST_F(BlockFileCacheTest, ttl_change_to_normal) {
}
}
+TEST_F(BlockFileCacheTest, ttl_change_to_normal_memory_storage) {
+ test_file_cache_memory_storage(io::FileCacheType::NORMAL);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 30;
+ settings.query_queue_elements = 5;
+ settings.capacity = 30;
+ settings.max_file_block_size = 30;
+ settings.max_query_cache_size = 30;
+ settings.storage = "memory";
+ io::CacheContext context;
+ context.cache_type = io::FileCacheType::TTL;
+ context.query_id = query_id;
+ int64_t cur_time = UnixSeconds();
+ context.expiration_time = cur_time + 180;
+ auto key2 = io::BlockFileCache::hash("key2");
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ for (int i = 0; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ };
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ {
+ auto holder = cache.get_or_set(key2, 50, 10, context); /// Add range
[50, 59]
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(1, blocks[0], io::FileBlock::Range(50, 59),
io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download_into_memory(blocks[0]);
+ assert_range(1, blocks[0], io::FileBlock::Range(50, 59),
io::FileBlock::State::DOWNLOADED);
+ EXPECT_EQ(blocks[0]->cache_type(), io::FileCacheType::TTL);
+ }
+ {
+ context.cache_type = io::FileCacheType::NORMAL;
+ context.expiration_time = 0;
+ auto holder = cache.get_or_set(key2, 50, 10, context); /// Add range
[50, 59]
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(1, blocks[0], io::FileBlock::Range(50, 59),
io::FileBlock::State::DOWNLOADED);
+ EXPECT_EQ(blocks[0]->cache_type(), io::FileCacheType::NORMAL);
+ EXPECT_EQ(blocks[0]->expiration_time(), 0);
+ std::string buffer(10, '1');
+ EXPECT_TRUE(blocks[0]->read(Slice(buffer.data(), 10), 0).ok());
+ EXPECT_EQ(buffer, std::string(10, '0'));
+ }
+}
+
TEST_F(BlockFileCacheTest, ttl_change_expiration_time) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
@@ -1850,6 +2480,58 @@ TEST_F(BlockFileCacheTest, ttl_change_expiration_time) {
}
}
+TEST_F(BlockFileCacheTest, ttl_change_expiration_time_memory_storage) {
+ test_file_cache_memory_storage(io::FileCacheType::NORMAL);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 30;
+ settings.query_queue_elements = 5;
+ settings.capacity = 30;
+ settings.max_file_block_size = 30;
+ settings.max_query_cache_size = 30;
+ settings.storage = "memory";
+ io::CacheContext context;
+ context.cache_type = io::FileCacheType::TTL;
+ context.query_id = query_id;
+ int64_t cur_time = UnixSeconds();
+ context.expiration_time = cur_time + 180;
+ int64_t change_time = cur_time + 120;
+ auto key2 = io::BlockFileCache::hash("key2");
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ for (int i = 0; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ };
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ {
+ auto holder = cache.get_or_set(key2, 50, 10, context); /// Add range
[50, 59]
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(1, blocks[0], io::FileBlock::Range(50, 59),
io::FileBlock::State::EMPTY);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download_into_memory(blocks[0]);
+ assert_range(1, blocks[0], io::FileBlock::Range(50, 59),
io::FileBlock::State::DOWNLOADED);
+ EXPECT_EQ(blocks[0]->cache_type(), io::FileCacheType::TTL);
+ }
+ {
+ context.cache_type = io::FileCacheType::TTL;
+ context.expiration_time = change_time;
+ auto holder = cache.get_or_set(key2, 50, 10, context); /// Add range
[50, 59]
+ auto blocks = fromHolder(holder);
+ ASSERT_EQ(blocks.size(), 1);
+ assert_range(1, blocks[0], io::FileBlock::Range(50, 59),
io::FileBlock::State::DOWNLOADED);
+ EXPECT_EQ(blocks[0]->cache_type(), io::FileCacheType::TTL);
+ EXPECT_EQ(blocks[0]->expiration_time(), change_time);
+ std::string buffer(10, '1');
+ EXPECT_TRUE(blocks[0]->read(Slice(buffer.data(), 10), 0).ok());
+ EXPECT_EQ(buffer, std::string(10, '0'));
+ }
+}
+
TEST_F(BlockFileCacheTest, ttl_reverse) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
@@ -1903,6 +2585,52 @@ TEST_F(BlockFileCacheTest, ttl_reverse) {
}
}
+TEST_F(BlockFileCacheTest, ttl_reverse_memory_storage) {
+ test_file_cache_memory_storage(io::FileCacheType::NORMAL);
+ TUniqueId query_id;
+ query_id.hi = 1;
+ query_id.lo = 1;
+ io::FileCacheSettings settings;
+ settings.query_queue_size = 36;
+ settings.query_queue_elements = 5;
+ settings.capacity = 36;
+ settings.max_file_block_size = 7;
+ settings.max_query_cache_size = 30;
+ settings.storage = "memory";
+ io::CacheContext context;
+ context.cache_type = io::FileCacheType::TTL;
+ context.query_id = query_id;
+ int64_t cur_time = UnixSeconds();
+ context.expiration_time = cur_time + 180;
+ auto key2 = io::BlockFileCache::hash("key2");
+ io::BlockFileCache cache(cache_base_path, settings);
+ ASSERT_TRUE(cache.initialize());
+ for (int i = 0; i < 100; i++) {
+ if (cache.get_async_open_success()) {
+ break;
+ };
+ std::this_thread::sleep_for(std::chrono::milliseconds(1));
+ }
+ ASSERT_TRUE(cache.get_async_open_success());
+ for (size_t offset = 0; offset < 30; offset += 6) {
+ auto holder = cache.get_or_set(key2, offset, 6, context);
+ auto blocks = fromHolder(holder);
+ ASSERT_TRUE(blocks[0]->get_or_set_downloader() ==
io::FileBlock::get_caller_id());
+ download_into_memory(blocks[0]);
+ }
+ {
+ auto holder = cache.get_or_set(key2, 50, 7, context); /// Add range
[50, 57]
+ auto blocks = fromHolder(holder);
+ assert_range(1, blocks[0], io::FileBlock::Range(50, 56),
io::FileBlock::State::SKIP_CACHE);
+ }
+ {
+ context.cache_type = io::FileCacheType::NORMAL;
+ auto holder = cache.get_or_set(key2, 50, 7, context); /// Add range
[50, 57]
+ auto blocks = fromHolder(holder);
+ assert_range(1, blocks[0], io::FileBlock::Range(50, 56),
io::FileBlock::State::SKIP_CACHE);
+ }
+}
+
TEST_F(BlockFileCacheTest, io_error) {
if (fs::exists(cache_base_path)) {
fs::remove_all(cache_base_path);
@@ -4743,4 +5471,26 @@ TEST_F(BlockFileCacheTest, test_load) {
}
}
+TEST_F(BlockFileCacheTest, file_cache_path_storage_parse) {
+ {
+ std::string file_cache_path =
+ "[{\"path\": \"xxx\", \"total_size\":102400, \"storage\":
\"memory\"}]";
+ std::vector<doris::CachePath> cache_paths;
+ ASSERT_TRUE(parse_conf_cache_paths(file_cache_path, cache_paths).ok());
+ ASSERT_EQ(cache_paths.size(), 1);
+ ASSERT_TRUE(cache_paths[0].path == "memory");
+ ASSERT_TRUE(cache_paths[0].total_bytes == 102400);
+ ASSERT_TRUE(cache_paths[0].storage == "memory");
+ }
+ {
+ std::string file_cache_path = "[{\"path\": \"memory\",
\"total_size\":102400}]";
+ std::vector<doris::CachePath> cache_paths;
+ ASSERT_TRUE(parse_conf_cache_paths(file_cache_path, cache_paths).ok());
+ ASSERT_EQ(cache_paths.size(), 1);
+ ASSERT_TRUE(cache_paths[0].path == "memory");
+ ASSERT_TRUE(cache_paths[0].total_bytes == 102400);
+ ASSERT_TRUE(cache_paths[0].storage == "disk");
+ }
+}
+
} // namespace doris::io
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]