This is an automated email from the ASF dual-hosted git repository. koushiro pushed a commit to branch refactor-persy in repository https://gitbox.apache.org/repos/asf/opendal.git
commit c44128efb32424f8d7fd130de4e1a67e430a2bc3 Author: koushiro <[email protected]> AuthorDate: Wed Oct 22 12:37:50 2025 +0800 refactor: migrate persy service from adapter::kv to impl Access directly --- core/src/services/persy/backend.rs | 165 +++++++++++-------------- core/src/services/persy/config.rs | 4 +- core/src/services/persy/core.rs | 99 +++++++++++++++ core/src/services/persy/{mod.rs => deleter.rs} | 28 ++++- core/src/services/persy/docs.md | 3 +- core/src/services/persy/mod.rs | 4 + core/src/services/persy/writer.rs | 59 +++++++++ 7 files changed, 265 insertions(+), 97 deletions(-) diff --git a/core/src/services/persy/backend.rs b/core/src/services/persy/backend.rs index a55e17e93..0667ac9ea 100644 --- a/core/src/services/persy/backend.rs +++ b/core/src/services/persy/backend.rs @@ -16,18 +16,13 @@ // under the License. use std::fmt::Debug; -use std::fmt::Formatter; -use std::str; +use std::sync::Arc; -use persy; - -use crate::Builder; -use crate::Error; -use crate::ErrorKind; -use crate::Scheme; -use crate::raw::adapters::kv; +use super::config::PersyConfig; +use super::core::*; +use super::deleter::PersyDeleter; +use super::writer::PersyWriter; use crate::raw::*; -use crate::services::PersyConfig; use crate::*; /// persy service support. @@ -112,7 +107,7 @@ impl Builder for PersyBuilder { Ok(()) } - Ok(PersyBackend::new(Adapter { + Ok(PersyBackend::new(PersyCore { datafile: datafile_path, segment, index, @@ -122,98 +117,88 @@ impl Builder for PersyBuilder { } /// Backend for persy services. -pub type PersyBackend = kv::Backend<Adapter>; - -#[derive(Clone)] -pub struct Adapter { - datafile: String, - segment: String, - index: String, - persy: persy::Persy, +#[derive(Clone, Debug)] +pub struct PersyBackend { + core: Arc<PersyCore>, + root: String, + info: Arc<AccessorInfo>, } -impl Debug for Adapter { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let mut ds = f.debug_struct("Adapter"); - ds.field("path", &self.datafile); - ds.field("segment", &self.segment); - ds.field("index", &self.index); - ds.finish() +impl PersyBackend { + pub fn new(core: PersyCore) -> Self { + let info = AccessorInfo::default(); + info.set_scheme(Scheme::Persy.into_static()); + info.set_name(&core.datafile); + info.set_root("/"); + info.set_native_capability(Capability { + read: true, + stat: true, + write: true, + write_can_empty: true, + delete: true, + shared: false, + ..Default::default() + }); + + Self { + core: Arc::new(core), + root: "/".to_string(), + info: Arc::new(info), + } } } -impl kv::Adapter for Adapter { - type Scanner = (); - - fn info(&self) -> kv::Info { - kv::Info::new( - Scheme::Persy, - &self.datafile, - Capability { - read: true, - write: true, - delete: true, - shared: false, - ..Default::default() - }, - ) +impl Access for PersyBackend { + type Reader = Buffer; + type Writer = PersyWriter; + type Lister = (); + type Deleter = oio::OneShotDeleter<PersyDeleter>; + + fn info(&self) -> Arc<AccessorInfo> { + self.info.clone() } - async fn get(&self, path: &str) -> Result<Option<Buffer>> { - let mut read_id = self - .persy - .get::<String, persy::PersyId>(&self.index, &path.to_string()) - .map_err(parse_error)?; - if let Some(id) = read_id.next() { - let value = self.persy.read(&self.segment, &id).map_err(parse_error)?; - return Ok(value.map(Buffer::from)); + async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> { + let p = build_abs_path(&self.root, path); + + if p == build_abs_path(&self.root, "") { + Ok(RpStat::new(Metadata::new(EntryMode::DIR))) + } else { + let bs = self.core.get(&p)?; + match bs { + Some(bs) => Ok(RpStat::new( + Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64), + )), + None => Err(Error::new(ErrorKind::NotFound, "kv not found in persy")), + } } - - Ok(None) } - async fn set(&self, path: &str, value: Buffer) -> Result<()> { - let mut tx = self.persy.begin().map_err(parse_error)?; - let id = tx - .insert(&self.segment, &value.to_vec()) - .map_err(parse_error)?; - - tx.put::<String, persy::PersyId>(&self.index, path.to_string(), id) - .map_err(parse_error)?; - let prepared = tx.prepare().map_err(parse_error)?; - prepared.commit().map_err(parse_error)?; - - Ok(()) + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let p = build_abs_path(&self.root, path); + let bs = match self.core.get(&p)? { + Some(bs) => bs, + None => { + return Err(Error::new(ErrorKind::NotFound, "kv not found in persy")); + } + }; + Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize()))) } - async fn delete(&self, path: &str) -> Result<()> { - let mut delete_id = self - .persy - .get::<String, persy::PersyId>(&self.index, &path.to_string()) - .map_err(parse_error)?; - if let Some(id) = delete_id.next() { - // Begin a transaction. - let mut tx = self.persy.begin().map_err(parse_error)?; - // Delete the record. - tx.delete(&self.segment, &id).map_err(parse_error)?; - // Remove the index. - tx.remove::<String, persy::PersyId>(&self.index, path.to_string(), Some(id)) - .map_err(parse_error)?; - // Commit the tx. - let prepared = tx.prepare().map_err(parse_error)?; - prepared.commit().map_err(parse_error)?; - } - - Ok(()) + async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let p = build_abs_path(&self.root, path); + Ok((RpWrite::new(), PersyWriter::new(self.core.clone(), p))) } -} -fn parse_error<T: Into<persy::PersyError>>(err: persy::PE<T>) -> Error { - let err: persy::PersyError = err.persy_error(); - let kind = match err { - persy::PersyError::RecordNotFound(_) => ErrorKind::NotFound, - _ => ErrorKind::Unexpected, - }; + async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { + Ok(( + RpDelete::default(), + oio::OneShotDeleter::new(PersyDeleter::new(self.core.clone(), self.root.clone())), + )) + } - Error::new(kind, "error from persy").set_source(err) + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> { + let _ = build_abs_path(&self.root, path); + Ok((RpList::default(), ())) + } } diff --git a/core/src/services/persy/config.rs b/core/src/services/persy/config.rs index 203e1ed63..e66326cc0 100644 --- a/core/src/services/persy/config.rs +++ b/core/src/services/persy/config.rs @@ -15,10 +15,11 @@ // specific language governing permissions and limitations // under the License. -use super::backend::PersyBuilder; use serde::Deserialize; use serde::Serialize; +use super::backend::PersyBuilder; + /// Config for persy service support. #[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(default)] @@ -34,6 +35,7 @@ pub struct PersyConfig { impl crate::Configurator for PersyConfig { type Builder = PersyBuilder; + fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> { let mut map = uri.options().clone(); diff --git a/core/src/services/persy/core.rs b/core/src/services/persy/core.rs new file mode 100644 index 000000000..32c5135b1 --- /dev/null +++ b/core/src/services/persy/core.rs @@ -0,0 +1,99 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; + +use crate::*; + +#[derive(Clone)] +pub struct PersyCore { + pub datafile: String, + pub segment: String, + pub index: String, + pub persy: persy::Persy, +} + +impl Debug for PersyCore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("Adapter"); + ds.field("path", &self.datafile); + ds.field("segment", &self.segment); + ds.field("index", &self.index); + ds.finish() + } +} + +impl PersyCore { + pub fn get(&self, path: &str) -> Result<Option<Buffer>> { + let mut read_id = self + .persy + .get::<String, persy::PersyId>(&self.index, &path.to_string()) + .map_err(parse_error)?; + if let Some(id) = read_id.next() { + let value = self.persy.read(&self.segment, &id).map_err(parse_error)?; + return Ok(value.map(Buffer::from)); + } + + Ok(None) + } + + pub fn set(&self, path: &str, value: Buffer) -> Result<()> { + let mut tx = self.persy.begin().map_err(parse_error)?; + let id = tx + .insert(&self.segment, &value.to_vec()) + .map_err(parse_error)?; + + tx.put::<String, persy::PersyId>(&self.index, path.to_string(), id) + .map_err(parse_error)?; + let prepared = tx.prepare().map_err(parse_error)?; + prepared.commit().map_err(parse_error)?; + + Ok(()) + } + + pub fn delete(&self, path: &str) -> Result<()> { + let mut delete_id = self + .persy + .get::<String, persy::PersyId>(&self.index, &path.to_string()) + .map_err(parse_error)?; + if let Some(id) = delete_id.next() { + // Begin a transaction. + let mut tx = self.persy.begin().map_err(parse_error)?; + // Delete the record. + tx.delete(&self.segment, &id).map_err(parse_error)?; + // Remove the index. + tx.remove::<String, persy::PersyId>(&self.index, path.to_string(), Some(id)) + .map_err(parse_error)?; + // Commit the tx. + let prepared = tx.prepare().map_err(parse_error)?; + prepared.commit().map_err(parse_error)?; + } + + Ok(()) + } +} + +fn parse_error<T: Into<persy::PersyError>>(err: persy::PE<T>) -> Error { + let err: persy::PersyError = err.persy_error(); + let kind = match err { + persy::PersyError::RecordNotFound(_) => ErrorKind::NotFound, + _ => ErrorKind::Unexpected, + }; + + Error::new(kind, "error from persy").set_source(err) +} diff --git a/core/src/services/persy/mod.rs b/core/src/services/persy/deleter.rs similarity index 61% copy from core/src/services/persy/mod.rs copy to core/src/services/persy/deleter.rs index 6c387ff15..b6b96a9b3 100644 --- a/core/src/services/persy/mod.rs +++ b/core/src/services/persy/deleter.rs @@ -15,8 +15,28 @@ // specific language governing permissions and limitations // under the License. -mod backend; -pub use backend::PersyBuilder as Persy; +use std::sync::Arc; -mod config; -pub use config::PersyConfig; +use super::core::*; +use crate::raw::oio; +use crate::raw::*; +use crate::*; + +pub struct PersyDeleter { + core: Arc<PersyCore>, + root: String, +} + +impl PersyDeleter { + pub fn new(core: Arc<PersyCore>, root: String) -> Self { + Self { core, root } + } +} + +impl oio::OneShotDelete for PersyDeleter { + async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> { + let p = build_abs_path(&self.root, &path); + self.core.delete(&p)?; + Ok(()) + } +} diff --git a/core/src/services/persy/docs.md b/core/src/services/persy/docs.md index 97176daca..e3c27bba4 100644 --- a/core/src/services/persy/docs.md +++ b/core/src/services/persy/docs.md @@ -2,16 +2,15 @@ This service can be used to: +- [ ] create_dir - [x] stat - [x] read - [x] write -- [x] create_dir - [x] delete - [ ] copy - [ ] rename - [ ] list - [ ] ~~presign~~ -- [x] blocking ## Configuration diff --git a/core/src/services/persy/mod.rs b/core/src/services/persy/mod.rs index 6c387ff15..8dc68c854 100644 --- a/core/src/services/persy/mod.rs +++ b/core/src/services/persy/mod.rs @@ -16,6 +16,10 @@ // under the License. mod backend; +mod core; +mod deleter; +mod writer; + pub use backend::PersyBuilder as Persy; mod config; diff --git a/core/src/services/persy/writer.rs b/core/src/services/persy/writer.rs new file mode 100644 index 000000000..6279b5625 --- /dev/null +++ b/core/src/services/persy/writer.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use super::core::*; +use crate::raw::oio; +use crate::*; + +pub struct PersyWriter { + core: Arc<PersyCore>, + path: String, + buffer: oio::QueueBuf, +} + +impl PersyWriter { + pub fn new(core: Arc<PersyCore>, path: String) -> Self { + Self { + core, + path, + buffer: oio::QueueBuf::new(), + } + } +} + +impl oio::Write for PersyWriter { + async fn write(&mut self, bs: Buffer) -> Result<()> { + self.buffer.push(bs); + Ok(()) + } + + async fn close(&mut self) -> Result<Metadata> { + let buf = self.buffer.clone().collect(); + let length = buf.len() as u64; + self.core.set(&self.path, buf)?; + + let meta = Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length); + Ok(meta) + } + + async fn abort(&mut self) -> Result<()> { + self.buffer.clear(); + Ok(()) + } +}
