This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new a5d529f00 refactor: Migrate services cacache to implement Access 
(#6303)
a5d529f00 is described below

commit a5d529f0096067fbda808fe39d640378a893d256
Author: Xuanwo <[email protected]>
AuthorDate: Tue Jun 17 17:20:05 2025 +0800

    refactor: Migrate services cacache to implement Access (#6303)
    
    * refactor: Migrate services cacache to implement Access
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Format code
    
    Signed-off-by: Xuanwo <[email protected]>
    
    * Fix tests
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
---
 core/src/services/cacache/backend.rs            | 153 ++++++++++++++----------
 core/src/services/cacache/core.rs               |  96 +++++++++++++++
 core/src/services/cacache/{mod.rs => delete.rs} |  27 ++++-
 core/src/services/cacache/mod.rs                |   7 ++
 core/src/services/cacache/writer.rs             |  61 ++++++++++
 5 files changed, 275 insertions(+), 69 deletions(-)

diff --git a/core/src/services/cacache/backend.rs 
b/core/src/services/cacache/backend.rs
index ab0da250f..f48610e92 100644
--- a/core/src/services/cacache/backend.rs
+++ b/core/src/services/cacache/backend.rs
@@ -16,20 +16,18 @@
 // under the License.
 
 use std::fmt::Debug;
-use std::fmt::Formatter;
-use std::str;
+use std::sync::Arc;
 
-use cacache;
+use chrono::DateTime;
 
-use crate::raw::adapters::kv;
-use crate::raw::Access;
+use crate::raw::*;
 use crate::services::CacacheConfig;
-use crate::Builder;
-use crate::Error;
-use crate::ErrorKind;
-use crate::Scheme;
 use crate::*;
 
+use super::core::CacacheCore;
+use super::delete::CacacheDeleter;
+use super::writer::CacacheWriter;
+
 impl Configurator for CacacheConfig {
     type Builder = CacacheBuilder;
     fn into_builder(self) -> Self::Builder {
@@ -62,73 +60,102 @@ impl Builder for CacacheBuilder {
                 .with_context("service", Scheme::Cacache)
         })?;
 
-        Ok(CacacheBackend::new(Adapter {
-            datadir: datadir_path,
-        }))
+        let core = CacacheCore {
+            path: datadir_path.clone(),
+        };
+
+        let info = AccessorInfo::default();
+        info.set_scheme(Scheme::Cacache);
+        info.set_name(&datadir_path);
+        info.set_root("/");
+        info.set_native_capability(Capability {
+            read: true,
+            write: true,
+            delete: true,
+            stat: true,
+            rename: false,
+            list: false,
+            shared: false,
+            ..Default::default()
+        });
+
+        Ok(CacacheAccessor {
+            core: Arc::new(core),
+            info: Arc::new(info),
+        })
     }
 }
 
 /// Backend for cacache services.
-pub type CacacheBackend = kv::Backend<Adapter>;
-
-#[derive(Clone)]
-pub struct Adapter {
-    datadir: String,
+#[derive(Debug, Clone)]
+pub struct CacacheAccessor {
+    core: Arc<CacacheCore>,
+    info: Arc<AccessorInfo>,
 }
 
-impl Debug for Adapter {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        let mut ds = f.debug_struct("Adapter");
-        ds.field("path", &self.datadir);
-        ds.finish()
-    }
-}
+impl Access for CacacheAccessor {
+    type Reader = Buffer;
+    type Writer = CacacheWriter;
+    type Lister = ();
+    type Deleter = oio::OneShotDeleter<CacacheDeleter>;
 
-impl kv::Adapter for Adapter {
-    type Scanner = ();
-
-    fn info(&self) -> kv::Info {
-        kv::Info::new(
-            Scheme::Cacache,
-            &self.datadir,
-            Capability {
-                read: true,
-                write: true,
-                delete: true,
-                shared: false,
-                ..Default::default()
-            },
-        )
+    fn info(&self) -> Arc<AccessorInfo> {
+        self.info.clone()
     }
 
-    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
-        let result = cacache::read(&self.datadir, path)
-            .await
-            .map_err(parse_error)?;
-        Ok(Some(Buffer::from(result)))
+    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+        let metadata = self.core.metadata(path).await?;
+
+        match metadata {
+            Some(meta) => {
+                let mut md = Metadata::new(EntryMode::FILE);
+                md.set_content_length(meta.size as u64);
+                // Convert u128 milliseconds to DateTime<Utc>
+                let millis = meta.time;
+                let secs = (millis / 1000) as i64;
+                let nanos = ((millis % 1000) * 1_000_000) as u32;
+                if let Some(dt) = DateTime::from_timestamp(secs, nanos) {
+                    md.set_last_modified(dt);
+                }
+                Ok(RpStat::new(md))
+            }
+            None => Err(Error::new(ErrorKind::NotFound, "entry not found")),
+        }
     }
 
-    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
-        cacache::write(&self.datadir, path, value.to_vec())
-            .await
-            .map_err(parse_error)?;
-        Ok(())
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let data = self.core.get(path).await?;
+
+        match data {
+            Some(bytes) => {
+                let range = args.range();
+                let buffer = if range.is_full() {
+                    Buffer::from(bytes)
+                } else {
+                    let start = range.offset() as usize;
+                    let end = match range.size() {
+                        Some(size) => (range.offset() + size) as usize,
+                        None => bytes.len(),
+                    };
+                    Buffer::from(bytes.slice(start..end.min(bytes.len())))
+                };
+                Ok((RpRead::new(), buffer))
+            }
+            None => Err(Error::new(ErrorKind::NotFound, "entry not found")),
+        }
     }
 
-    async fn delete(&self, path: &str) -> Result<()> {
-        cacache::remove(&self.datadir, path)
-            .await
-            .map_err(parse_error)?;
-
-        Ok(())
+    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        Ok((
+            RpWrite::new(),
+            CacacheWriter::new(self.core.clone(), path.to_string()),
+        ))
     }
-}
 
-fn parse_error(err: cacache::Error) -> Error {
-    let kind = match err {
-        cacache::Error::EntryNotFound(_, _) => ErrorKind::NotFound,
-        _ => ErrorKind::Unexpected,
-    };
-
-    Error::new(kind, "error from cacache").set_source(err)
+    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
+        Ok((
+            RpDelete::default(),
+            oio::OneShotDeleter::new(CacacheDeleter::new(self.core.clone())),
+        ))
+    }
 }
diff --git a/core/src/services/cacache/core.rs 
b/core/src/services/cacache/core.rs
new file mode 100644
index 000000000..a7cfed059
--- /dev/null
+++ b/core/src/services/cacache/core.rs
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+
+use crate::*;
+
+#[derive(Clone)]
+pub struct CacacheCore {
+    pub path: String,
+}
+
+impl Debug for CacacheCore {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("CacacheCore")
+            .field("path", &self.path)
+            .finish()
+    }
+}
+
+impl CacacheCore {
+    pub async fn get(&self, key: &str) -> Result<Option<bytes::Bytes>> {
+        let cache_path = self.path.clone();
+        let cache_key = key.to_string();
+
+        let data = cacache::read(&cache_path, &cache_key).await;
+
+        match data {
+            Ok(bs) => Ok(Some(bytes::Bytes::from(bs))),
+            Err(cacache::Error::EntryNotFound(_, _)) => Ok(None),
+            Err(err) => Err(Error::new(ErrorKind::Unexpected, "cacache get 
failed")
+                .with_operation("CacacheCore::get")
+                .set_source(err)),
+        }
+    }
+
+    pub async fn set(&self, key: &str, value: bytes::Bytes) -> Result<()> {
+        let cache_path = self.path.clone();
+        let cache_key = key.to_string();
+
+        cacache::write(&cache_path, &cache_key, value.to_vec())
+            .await
+            .map_err(|err| {
+                Error::new(ErrorKind::Unexpected, "cacache set failed")
+                    .with_operation("CacacheCore::set")
+                    .set_source(err)
+            })?;
+
+        Ok(())
+    }
+
+    pub async fn delete(&self, key: &str) -> Result<()> {
+        let cache_path = self.path.clone();
+        let cache_key = key.to_string();
+
+        cacache::remove(&cache_path, &cache_key)
+            .await
+            .map_err(|err| {
+                Error::new(ErrorKind::Unexpected, "cacache delete failed")
+                    .with_operation("CacacheCore::delete")
+                    .set_source(err)
+            })?;
+
+        Ok(())
+    }
+
+    pub async fn metadata(&self, key: &str) -> 
Result<Option<cacache::Metadata>> {
+        let cache_path = self.path.clone();
+        let cache_key = key.to_string();
+
+        let metadata = cacache::metadata(&cache_path, &cache_key).await;
+
+        match metadata {
+            Ok(Some(md)) => Ok(Some(md)),
+            Ok(None) => Ok(None),
+            Err(cacache::Error::EntryNotFound(_, _)) => Ok(None),
+            Err(err) => Err(Error::new(ErrorKind::Unexpected, "cacache 
metadata failed")
+                .with_operation("CacacheCore::metadata")
+                .set_source(err)),
+        }
+    }
+}
diff --git a/core/src/services/cacache/mod.rs 
b/core/src/services/cacache/delete.rs
similarity index 66%
copy from core/src/services/cacache/mod.rs
copy to core/src/services/cacache/delete.rs
index 33c73f972..225fbf207 100644
--- a/core/src/services/cacache/mod.rs
+++ b/core/src/services/cacache/delete.rs
@@ -15,10 +15,25 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#[cfg(feature = "services-cacache")]
-mod backend;
-#[cfg(feature = "services-cacache")]
-pub use backend::CacacheBuilder as Cacache;
+use std::sync::Arc;
 
-mod config;
-pub use config::CacacheConfig;
+use crate::raw::*;
+use crate::*;
+
+use super::core::CacacheCore;
+
+pub struct CacacheDeleter {
+    core: Arc<CacacheCore>,
+}
+
+impl CacacheDeleter {
+    pub fn new(core: Arc<CacacheCore>) -> Self {
+        Self { core }
+    }
+}
+
+impl oio::OneShotDelete for CacacheDeleter {
+    async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
+        self.core.delete(&path).await
+    }
+}
diff --git a/core/src/services/cacache/mod.rs b/core/src/services/cacache/mod.rs
index 33c73f972..79d0faa60 100644
--- a/core/src/services/cacache/mod.rs
+++ b/core/src/services/cacache/mod.rs
@@ -17,6 +17,13 @@
 
 #[cfg(feature = "services-cacache")]
 mod backend;
+#[cfg(feature = "services-cacache")]
+mod core;
+#[cfg(feature = "services-cacache")]
+mod delete;
+#[cfg(feature = "services-cacache")]
+mod writer;
+
 #[cfg(feature = "services-cacache")]
 pub use backend::CacacheBuilder as Cacache;
 
diff --git a/core/src/services/cacache/writer.rs 
b/core/src/services/cacache/writer.rs
new file mode 100644
index 000000000..e82e74aae
--- /dev/null
+++ b/core/src/services/cacache/writer.rs
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::raw::*;
+use crate::*;
+
+use super::core::CacacheCore;
+
+pub struct CacacheWriter {
+    core: Arc<CacacheCore>,
+    path: String,
+    buffer: oio::QueueBuf,
+}
+
+impl CacacheWriter {
+    pub fn new(core: Arc<CacacheCore>, path: String) -> Self {
+        Self {
+            core,
+            path,
+            buffer: oio::QueueBuf::new(),
+        }
+    }
+}
+
+impl oio::Write for CacacheWriter {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
+        self.buffer.push(bs);
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<Metadata> {
+        let buf = self.buffer.clone().collect();
+        let length = buf.len() as u64;
+
+        self.core.set(&self.path, buf.to_bytes()).await?;
+
+        let meta = 
Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
+        Ok(meta)
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        self.buffer.clear();
+        Ok(())
+    }
+}

Reply via email to