This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new a5d529f00 refactor: Migrate services cacache to implement Access
(#6303)
a5d529f00 is described below
commit a5d529f0096067fbda808fe39d640378a893d256
Author: Xuanwo <[email protected]>
AuthorDate: Tue Jun 17 17:20:05 2025 +0800
refactor: Migrate services cacache to implement Access (#6303)
* refactor: Migrate services cacache to implement Access
Signed-off-by: Xuanwo <[email protected]>
* Format code
Signed-off-by: Xuanwo <[email protected]>
* Fix tests
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
---
core/src/services/cacache/backend.rs | 153 ++++++++++++++----------
core/src/services/cacache/core.rs | 96 +++++++++++++++
core/src/services/cacache/{mod.rs => delete.rs} | 27 ++++-
core/src/services/cacache/mod.rs | 7 ++
core/src/services/cacache/writer.rs | 61 ++++++++++
5 files changed, 275 insertions(+), 69 deletions(-)
diff --git a/core/src/services/cacache/backend.rs
b/core/src/services/cacache/backend.rs
index ab0da250f..f48610e92 100644
--- a/core/src/services/cacache/backend.rs
+++ b/core/src/services/cacache/backend.rs
@@ -16,20 +16,18 @@
// under the License.
use std::fmt::Debug;
-use std::fmt::Formatter;
-use std::str;
+use std::sync::Arc;
-use cacache;
+use chrono::DateTime;
-use crate::raw::adapters::kv;
-use crate::raw::Access;
+use crate::raw::*;
use crate::services::CacacheConfig;
-use crate::Builder;
-use crate::Error;
-use crate::ErrorKind;
-use crate::Scheme;
use crate::*;
+use super::core::CacacheCore;
+use super::delete::CacacheDeleter;
+use super::writer::CacacheWriter;
+
impl Configurator for CacacheConfig {
type Builder = CacacheBuilder;
fn into_builder(self) -> Self::Builder {
@@ -62,73 +60,102 @@ impl Builder for CacacheBuilder {
.with_context("service", Scheme::Cacache)
})?;
- Ok(CacacheBackend::new(Adapter {
- datadir: datadir_path,
- }))
+ let core = CacacheCore {
+ path: datadir_path.clone(),
+ };
+
+ let info = AccessorInfo::default();
+ info.set_scheme(Scheme::Cacache);
+ info.set_name(&datadir_path);
+ info.set_root("/");
+ info.set_native_capability(Capability {
+ read: true,
+ write: true,
+ delete: true,
+ stat: true,
+ rename: false,
+ list: false,
+ shared: false,
+ ..Default::default()
+ });
+
+ Ok(CacacheAccessor {
+ core: Arc::new(core),
+ info: Arc::new(info),
+ })
}
}
/// Backend for cacache services.
-pub type CacacheBackend = kv::Backend<Adapter>;
-
-#[derive(Clone)]
-pub struct Adapter {
- datadir: String,
+#[derive(Debug, Clone)]
+pub struct CacacheAccessor {
+ core: Arc<CacacheCore>,
+ info: Arc<AccessorInfo>,
}
-impl Debug for Adapter {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- let mut ds = f.debug_struct("Adapter");
- ds.field("path", &self.datadir);
- ds.finish()
- }
-}
+impl Access for CacacheAccessor {
+ type Reader = Buffer;
+ type Writer = CacacheWriter;
+ type Lister = ();
+ type Deleter = oio::OneShotDeleter<CacacheDeleter>;
-impl kv::Adapter for Adapter {
- type Scanner = ();
-
- fn info(&self) -> kv::Info {
- kv::Info::new(
- Scheme::Cacache,
- &self.datadir,
- Capability {
- read: true,
- write: true,
- delete: true,
- shared: false,
- ..Default::default()
- },
- )
+ fn info(&self) -> Arc<AccessorInfo> {
+ self.info.clone()
}
- async fn get(&self, path: &str) -> Result<Option<Buffer>> {
- let result = cacache::read(&self.datadir, path)
- .await
- .map_err(parse_error)?;
- Ok(Some(Buffer::from(result)))
+ async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+ let metadata = self.core.metadata(path).await?;
+
+ match metadata {
+ Some(meta) => {
+ let mut md = Metadata::new(EntryMode::FILE);
+ md.set_content_length(meta.size as u64);
+ // Convert u128 milliseconds to DateTime<Utc>
+ let millis = meta.time;
+ let secs = (millis / 1000) as i64;
+ let nanos = ((millis % 1000) * 1_000_000) as u32;
+ if let Some(dt) = DateTime::from_timestamp(secs, nanos) {
+ md.set_last_modified(dt);
+ }
+ Ok(RpStat::new(md))
+ }
+ None => Err(Error::new(ErrorKind::NotFound, "entry not found")),
+ }
}
- async fn set(&self, path: &str, value: Buffer) -> Result<()> {
- cacache::write(&self.datadir, path, value.to_vec())
- .await
- .map_err(parse_error)?;
- Ok(())
+ async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ let data = self.core.get(path).await?;
+
+ match data {
+ Some(bytes) => {
+ let range = args.range();
+ let buffer = if range.is_full() {
+ Buffer::from(bytes)
+ } else {
+ let start = range.offset() as usize;
+ let end = match range.size() {
+ Some(size) => (range.offset() + size) as usize,
+ None => bytes.len(),
+ };
+ Buffer::from(bytes.slice(start..end.min(bytes.len())))
+ };
+ Ok((RpRead::new(), buffer))
+ }
+ None => Err(Error::new(ErrorKind::NotFound, "entry not found")),
+ }
}
- async fn delete(&self, path: &str) -> Result<()> {
- cacache::remove(&self.datadir, path)
- .await
- .map_err(parse_error)?;
-
- Ok(())
+ async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ Ok((
+ RpWrite::new(),
+ CacacheWriter::new(self.core.clone(), path.to_string()),
+ ))
}
-}
-fn parse_error(err: cacache::Error) -> Error {
- let kind = match err {
- cacache::Error::EntryNotFound(_, _) => ErrorKind::NotFound,
- _ => ErrorKind::Unexpected,
- };
-
- Error::new(kind, "error from cacache").set_source(err)
+ async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
+ Ok((
+ RpDelete::default(),
+ oio::OneShotDeleter::new(CacacheDeleter::new(self.core.clone())),
+ ))
+ }
}
diff --git a/core/src/services/cacache/core.rs
b/core/src/services/cacache/core.rs
new file mode 100644
index 000000000..a7cfed059
--- /dev/null
+++ b/core/src/services/cacache/core.rs
@@ -0,0 +1,96 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+
+use crate::*;
+
+#[derive(Clone)]
+pub struct CacacheCore {
+ pub path: String,
+}
+
+impl Debug for CacacheCore {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("CacacheCore")
+ .field("path", &self.path)
+ .finish()
+ }
+}
+
+impl CacacheCore {
+ pub async fn get(&self, key: &str) -> Result<Option<bytes::Bytes>> {
+ let cache_path = self.path.clone();
+ let cache_key = key.to_string();
+
+ let data = cacache::read(&cache_path, &cache_key).await;
+
+ match data {
+ Ok(bs) => Ok(Some(bytes::Bytes::from(bs))),
+ Err(cacache::Error::EntryNotFound(_, _)) => Ok(None),
+ Err(err) => Err(Error::new(ErrorKind::Unexpected, "cacache get
failed")
+ .with_operation("CacacheCore::get")
+ .set_source(err)),
+ }
+ }
+
+ pub async fn set(&self, key: &str, value: bytes::Bytes) -> Result<()> {
+ let cache_path = self.path.clone();
+ let cache_key = key.to_string();
+
+ cacache::write(&cache_path, &cache_key, value.to_vec())
+ .await
+ .map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "cacache set failed")
+ .with_operation("CacacheCore::set")
+ .set_source(err)
+ })?;
+
+ Ok(())
+ }
+
+ pub async fn delete(&self, key: &str) -> Result<()> {
+ let cache_path = self.path.clone();
+ let cache_key = key.to_string();
+
+ cacache::remove(&cache_path, &cache_key)
+ .await
+ .map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "cacache delete failed")
+ .with_operation("CacacheCore::delete")
+ .set_source(err)
+ })?;
+
+ Ok(())
+ }
+
+ pub async fn metadata(&self, key: &str) ->
Result<Option<cacache::Metadata>> {
+ let cache_path = self.path.clone();
+ let cache_key = key.to_string();
+
+ let metadata = cacache::metadata(&cache_path, &cache_key).await;
+
+ match metadata {
+ Ok(Some(md)) => Ok(Some(md)),
+ Ok(None) => Ok(None),
+ Err(cacache::Error::EntryNotFound(_, _)) => Ok(None),
+ Err(err) => Err(Error::new(ErrorKind::Unexpected, "cacache
metadata failed")
+ .with_operation("CacacheCore::metadata")
+ .set_source(err)),
+ }
+ }
+}
diff --git a/core/src/services/cacache/mod.rs
b/core/src/services/cacache/delete.rs
similarity index 66%
copy from core/src/services/cacache/mod.rs
copy to core/src/services/cacache/delete.rs
index 33c73f972..225fbf207 100644
--- a/core/src/services/cacache/mod.rs
+++ b/core/src/services/cacache/delete.rs
@@ -15,10 +15,25 @@
// specific language governing permissions and limitations
// under the License.
-#[cfg(feature = "services-cacache")]
-mod backend;
-#[cfg(feature = "services-cacache")]
-pub use backend::CacacheBuilder as Cacache;
+use std::sync::Arc;
-mod config;
-pub use config::CacacheConfig;
+use crate::raw::*;
+use crate::*;
+
+use super::core::CacacheCore;
+
+pub struct CacacheDeleter {
+ core: Arc<CacacheCore>,
+}
+
+impl CacacheDeleter {
+ pub fn new(core: Arc<CacacheCore>) -> Self {
+ Self { core }
+ }
+}
+
+impl oio::OneShotDelete for CacacheDeleter {
+ async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
+ self.core.delete(&path).await
+ }
+}
diff --git a/core/src/services/cacache/mod.rs b/core/src/services/cacache/mod.rs
index 33c73f972..79d0faa60 100644
--- a/core/src/services/cacache/mod.rs
+++ b/core/src/services/cacache/mod.rs
@@ -17,6 +17,13 @@
#[cfg(feature = "services-cacache")]
mod backend;
+#[cfg(feature = "services-cacache")]
+mod core;
+#[cfg(feature = "services-cacache")]
+mod delete;
+#[cfg(feature = "services-cacache")]
+mod writer;
+
#[cfg(feature = "services-cacache")]
pub use backend::CacacheBuilder as Cacache;
diff --git a/core/src/services/cacache/writer.rs
b/core/src/services/cacache/writer.rs
new file mode 100644
index 000000000..e82e74aae
--- /dev/null
+++ b/core/src/services/cacache/writer.rs
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::raw::*;
+use crate::*;
+
+use super::core::CacacheCore;
+
+pub struct CacacheWriter {
+ core: Arc<CacacheCore>,
+ path: String,
+ buffer: oio::QueueBuf,
+}
+
+impl CacacheWriter {
+ pub fn new(core: Arc<CacacheCore>, path: String) -> Self {
+ Self {
+ core,
+ path,
+ buffer: oio::QueueBuf::new(),
+ }
+ }
+}
+
+impl oio::Write for CacacheWriter {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
+ self.buffer.push(bs);
+ Ok(())
+ }
+
+ async fn close(&mut self) -> Result<Metadata> {
+ let buf = self.buffer.clone().collect();
+ let length = buf.len() as u64;
+
+ self.core.set(&self.path, buf.to_bytes()).await?;
+
+ let meta =
Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
+ Ok(meta)
+ }
+
+ async fn abort(&mut self) -> Result<()> {
+ self.buffer.clear();
+ Ok(())
+ }
+}