Xuanwo commented on code in PR #5906: URL: https://github.com/apache/opendal/pull/5906#discussion_r2020225301
########## core/src/layers/cache.rs: ########## @@ -0,0 +1,206 @@ +use std::sync::Arc; + +use crate::raw::*; +use crate::*; +use bytes::Bytes; +use foyer::{DirectFsDeviceOptions, Engine, HybridCache, HybridCacheBuilder}; +use serde::{Deserialize, Serialize}; + +pub struct CacheLayer { + cache: Arc<HybridCache<CacheKey, CacheValue>>, +} + +impl CacheLayer { + pub async fn new( + disk_cache_dir: &str, + disk_capacity_mb: usize, + memory_capacity_mb: usize, + ) -> Result<Self> { + const MB: usize = 1 << 20; + + let cache = HybridCacheBuilder::new() + .with_name("opendal") + .memory(memory_capacity_mb * MB) + .storage(Engine::Large) + .with_device_options( + DirectFsDeviceOptions::new(disk_cache_dir).with_capacity(disk_capacity_mb * MB), + ) + .build() + .await + .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string()))?; + + Ok(Self { + cache: Arc::new(cache), + }) + } +} + +impl<A: Access> Layer<A> for CacheLayer { + type LayeredAccess = CacheAccessor<A>; + + fn layer(&self, inner: A) -> Self::LayeredAccess { + CacheAccessor { + inner, + cache: Arc::clone(&self.cache), + } + } +} + +#[derive(Debug, Clone, Hash, Eq, PartialEq, Serialize, Deserialize)] +struct CacheKey { + path: String, + args: OpRead, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct CacheValue { + rp: RpRead, + // TODO: store Buffer or Bytes? + bytes: Bytes, +} + +#[derive(Debug)] +pub struct CacheAccessor<A: Access> { + inner: A, + // TODO: if cache should be used for other operations (such as stat or list) + // maybe we should create different caches for each operation? + // So the keys and values does not mix + // Although, we could use an enum as the cachekey and as the cache value, but + // this way we wouldn't be making invalid states unrepresentable, as given + // a Read Cache key, there might be a List cached value associated. + cache: Arc<HybridCache<CacheKey, CacheValue>>, +} + +impl<A: Access> LayeredAccess for CacheAccessor<A> { + type Inner = A; + + fn inner(&self) -> &Self::Inner { + &self.inner + } + + // TODO: add a comment here that we use `oio::Reader` (i.e Box<dyn ReadDyn>) because + // if there is a cache hit, we return Box<Buffer> + // but if there isn't, we return a Box<CacheWrapper<..>> so when the reader + // is read, the value is inserted into the cache. + // This allow to lazy reading and inserting in cache not to be done in the + // `CacheAccessor::read` method, but when calling `reader.read()` on the reader + // output of `CacheAccessor` + type Reader = oio::Reader; + + type Writer = A::Writer; + + // TODO: lister cache? + type Lister = A::Lister; + + type Deleter = A::Deleter; + + type BlockingReader = A::BlockingReader; + + type BlockingWriter = A::BlockingWriter; + + type BlockingLister = A::BlockingLister; + + type BlockingDeleter = A::BlockingDeleter; + + // TODO: stat cache? + + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let cache_key = CacheKey { + path: path.to_string(), + args: args.clone(), + }; + if let Some(entry) = self + .cache + .get(&cache_key) + .await + // TODO: handle this error + .map_err(|e| Error::new(ErrorKind::Unexpected, e.to_string()))? + { Review Comment: Better not. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@opendal.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org