flaneur2020 commented on code in PR #6297:
URL: https://github.com/apache/opendal/pull/6297#discussion_r2659298987


##########
core/core/src/docs/rfcs/6297_cache_layer.md:
##########
@@ -0,0 +1,435 @@
+- Proposal Name: `cache_layer`
+- Start Date: 2025-06-16
+- RFC PR: [apache/opendal#6297](https://github.com/apache/opendal/pull/6297)
+- Tracking Issue: 
[apache/opendal#7107](https://github.com/apache/opendal/issues/7107)
+
+# Summary
+
+This RFC proposes the addition of a Cache Layer to OpenDAL, providing 
transparent read-through and write-through caching capabilities. The Cache 
Layer allows users to improve performance by caching data from a slower storage 
service (e.g., S3, HDFS) to a faster one (e.g., Memory, Moka, Redis).
+
+# Motivation
+
+Storage access performance varies greatly across different storage services.
+Remote object stores like S3 or GCS have much higher latency than local 
storage or in-memory caches.
+In many applications, particularly those with read-heavy workloads or repeated 
access to the same data, caching can significantly improve performance.
+
+Currently, users who want to implement caching with OpenDAL must manually:
+
+1. Check if data exists in cache service
+2. If cache misses, fetch from original storage and manually populate cache
+3. Handle cache invalidation and consistency manually
+
+By introducing a dedicated Cache Layer, we can:
+
+- Provide a unified, transparent caching solution within OpenDAL
+- Eliminate boilerplate code for common caching patterns
+- Allow flexible configuration of caching policies
+- Enable performance optimization with minimal code changes
+- Leverage existing OpenDAL services as cache storage
+
+# Guide-level explanation
+
+The Cache Layer allows you to wrap any existing service with a caching 
mechanism.
+When data is accessed through this layer, it will automatically be cached in 
your specified cache service.
+The cache layer is designed to be straightforward, and delegates cache 
management policies (like TTL, eviction policy) to the underlying cache service.
+
+## Basic Usage
+
+```rust
+use opendal::{layers::CacheLayer, services::Memory, services::S3, Operator};
+
+#[tokio::main]
+async fn main() -> opendal::Result<()> {
+    // Create a memory operator to use as cache
+    let memory = Operator::new(Memory::default())?;
+
+    // Create a primary storage operator (e.g., S3)
+    let s3 = Operator::new(
+        S3::default()
+            .bucket("my-bucket")
+            .region("us-east-1")
+            .build()?
+        )?
+        .finish();
+
+    // Wrap the primary storage with the cache layer
+    let op = s3.layer(CacheLayer::new(memory)).finish();
+
+    // Use the operator as normal - caching is transparent
+    let data = op.read("path/to/file").await?;
+
+    // Later reads will be served from cache if available
+    let cached_data = op.read("path/to/file").await?;
+
+    Ok(())
+}
+```
+
+## Using Different Cache Services
+
+The Cache Layer can use any OpenDAL service as cache service:
+
+```rust
+// Using Redis as cache
+use opendal::services::Redis;
+
+let redis_cache = Operator::new(
+    Redis::default()
+        .endpoint("redis://localhost:6379")
+    )?
+    .finish();
+
+let op = s3.layer(CacheLayer::new(redis_cache)).finish();
+```
+
+```rust
+// Using Moka (in-memory cache with advanced features)
+use opendal::services::Moka;
+
+let moka_cache = Operator::new(
+        Moka::default()
+            .max_capacity(1000)
+            .time_to_live(Duration::from_secs(3600)) // TTL managed by Moka
+    )?
+    .finish();
+
+let op = s3.layer(CacheLayer::new(moka_cache)).finish();
+```
+
+## Multiple Cache Layers
+
+You can stack multiple cache layers for a multi-tier caching strategy:
+
+```rust
+// L1 cache: Fast in-memory cache
+let l1_cache = Operator::new(Memory::default())?.finish();
+
+// L2 cache: Larger but slightly slower cache (e.g., Redis)
+let l2_cache = Operator::new(
+        Redis::default().endpoint("redis://localhost:6379")
+    )?
+    .finish();
+
+// Stack the caches: L1 -> L2 -> S3
+let op = s3
+    .layer(CacheLayer::new(l2_cache))  // L2 cache
+    .layer(CacheLayer::new(l1_cache))  // L1 cache
+    .finish();
+```
+
+## Configuration Options
+
+The Cache Layer provides minimal configuration to keep it simple:
+
+```rust
+let op = s3.layer(
+    CacheLayer::new(memory)
+        .with_options(CacheOptions {
+            // Enable read-through caching (default: true)
+            read: true,
+            // Enable cache promotion during read operations (default: true)
+            read_promotion: true,
+            // Enable write-through caching (default: true)
+            write: true,
+        })
+    )
+    .finish();
+```
+
+- `read` option: When disabled, reads bypass the cache entirely.
+- `read_promotion` option: When disabled, data fetched from the inner service 
on a miss will not be stored back into the cache.
+- `write` option: When enabled, bytes written to the inner service are also 
stored into the cache.
+
+# Reference-level explanation
+
+## Architecture
+
+The Cache Layer implements the `Layer` trait and wraps an underlying `Access` 
implementation with caching capabilities.
+It introduces a `CacheService` trait that defines the interface for cache 
operations.
+
+### Customizable Cache Storage
+
+```rust
+/// `CacheService` defines the backing storage interface for [`CacheLayer`].
+/// It should behave like a simple object store: get/set bytes by key and
+/// expose lightweight metadata for existence checks.
+pub trait CacheService: Clone + Send + Sync + 'static {
+    /// Identifier of the cache backend, used mainly for logging and debugging.
+    fn scheme(&self) -> &'static str;
+
+    /// Read cached content by `key`. Returns `Ok(None)` on cache miss instead 
of `NotFound`.
+    fn read(&self, key: &str) -> impl Future<Output = Result<Option<Buffer>>> 
+ MaybeSend;
+
+    /// Write full bytes for `key`, replacing any existing value.
+    fn write(&self, key: &str, value: Vec<u8>) -> impl Future<Output = 
Result<()>> + MaybeSend;
+
+    /// Fetch metadata for `key`. Should return [`ErrorKind::NotFound`] on 
miss.
+    fn stat(&self, key: &str) -> impl Future<Output = Result<Metadata>> + 
MaybeSend;
+
+    /// Check whether `key` exists in the cache.
+    fn exists(&self, key: &str) -> impl Future<Output = Result<bool>> + 
MaybeSend;
+}
+```
+
+OpenDAL `Operator` implements `CacheService` trait, making any OpenDAL service 
usable as a cache service.
+
+```rust
+impl CacheService for Operator {
+    fn scheme(&self) -> &'static str {
+        self.info().scheme()
+    }
+
+    async fn read(&self, key: &str) -> Result<Option<Buffer>> {
+        let r = Operator::read(self, key).await;
+        match r {
+            Ok(r) => Ok(Some(r)),
+            Err(err) => match err.kind() {
+                ErrorKind::NotFound => Ok(None),
+                _ => Err(err),
+            },
+        }
+    }
+
+    async fn write(&self, key: &str, value: Vec<u8>) -> Result<()> {
+        Operator::write(self, key, value).await.map(|_| ())
+    }
+
+    async fn stat(&self, key: &str) -> Result<Metadata> {
+        Operator::stat(self, key).await
+    }
+
+    async fn exists(&self, key: &str) -> Result<bool> {
+        Operator::exists(self, key).await
+    }
+}
+```
+
+### CacheLayer && CacheAccessor
+
+The layer wraps the underlying access with `CacheAccessor`, which implements 
caching logic for each operation.
+
+```rust
+#[derive(Clone, Copy, Debug)]
+pub struct CacheOptions {
+    /// Enable cache lookups before hitting the inner service.
+    pub read: bool,
+    /// Promote data read from the inner service into the cache (read-through 
fill).
+    ///
+    /// Note: This option only takes effect when [`CacheOptions::read`] is 
enabled.
+    pub read_promotion: bool,
+    /// Write-through caching for data written to the inner service.
+    pub write: bool,
+}
+
+pub struct CacheAccessor<A, S> {
+    inner: A,
+    cache_service: Arc<S>,
+    cache_options: CacheOptions,
+}
+
+impl<A: Access, S: CacheService> LayeredAccess for CacheAccessor<A, S> {
+    type Inner = A;
+    type Reader = CacheReader<A::Reader, S>;
+    type Writer = CacheWriter<A::Writer, S>;
+    type Lister = A::Lister;
+    type Deleter = A::Deleter;
+
+    fn inner(&self) -> &Self::Inner {
+        &self.inner
+    }
+
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let cache_key = path.to_owned();
+
+        // Try cache first if read caching is enabled
+        if self.cache_options.read {
+            match self.cache_service.read(&cache_key).await {
+                Ok(Some(cached_data)) => {
+                    // Cache hit
+                    return Ok((RpRead::new(), 
CacheReader::from_buffer(cached_data)));
+                }
+                Ok(None) => { /* Cache miss, continue to underlying service */ 
}
+                Err(_) => { /* Cache error, continue to underlying service */ }
+            }
+        }
+
+        // Query underlying service
+        let (rp, reader) = self.inner.read(path, args).await?;
+
+        // Create a reader that will cache data as it's read
+        Ok((
+            rp,
+            CacheReader::new(
+                reader,
+                self.cache_service.clone(),
+                cache_key,
+                self.cache_options.read_promotion,
+            ),
+        ))
+    }
+
+    async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        let cache_key = path.to_owned();
+
+        // Always try to write to underlying storage first
+        let (rp, writer) = self.inner.write(path, args).await?;
+
+        // Create a writer that will cache data as it's written
+        Ok((
+            rp,
+            CacheWriter::new(
+                writer,
+                self.cache_service.clone(),
+                cache_key,
+                self.cache_options.write,
+            ),
+        ))
+    }
+
+    async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
+        let cache_key = &path;
+
+        // Check cache first if read caching is enabled
+        if self.cache_options.read {
+            match self.cache_service.stat(cache_key).await {
+                Ok(metadata) => {
+                    // Cache hit - key exists in cache service
+                    return Ok(RpStat::new(metadata));
+                }
+                Err(_) => { /* Cache miss, continue to underlying service */ }
+            }
+        }
+
+        // Fallback to underlying service
+        self.inner.stat(path, args).await
+    }
+
+    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
+        self.inner.delete().await

Review Comment:
   it'll be nice to invalidate the cache on `delete()` is called imo, in this 
manner, we could have a "best effort" cache consistency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to