This is an automated email from the ASF dual-hosted git repository.

koushiro pushed a commit to branch refactor-persy
in repository https://gitbox.apache.org/repos/asf/opendal.git

commit c44128efb32424f8d7fd130de4e1a67e430a2bc3
Author: koushiro <[email protected]>
AuthorDate: Wed Oct 22 12:37:50 2025 +0800

    refactor: migrate persy service from adapter::kv to impl Access directly
---
 core/src/services/persy/backend.rs             | 165 +++++++++++--------------
 core/src/services/persy/config.rs              |   4 +-
 core/src/services/persy/core.rs                |  99 +++++++++++++++
 core/src/services/persy/{mod.rs => deleter.rs} |  28 ++++-
 core/src/services/persy/docs.md                |   3 +-
 core/src/services/persy/mod.rs                 |   4 +
 core/src/services/persy/writer.rs              |  59 +++++++++
 7 files changed, 265 insertions(+), 97 deletions(-)

diff --git a/core/src/services/persy/backend.rs 
b/core/src/services/persy/backend.rs
index a55e17e93..0667ac9ea 100644
--- a/core/src/services/persy/backend.rs
+++ b/core/src/services/persy/backend.rs
@@ -16,18 +16,13 @@
 // under the License.
 
 use std::fmt::Debug;
-use std::fmt::Formatter;
-use std::str;
+use std::sync::Arc;
 
-use persy;
-
-use crate::Builder;
-use crate::Error;
-use crate::ErrorKind;
-use crate::Scheme;
-use crate::raw::adapters::kv;
+use super::config::PersyConfig;
+use super::core::*;
+use super::deleter::PersyDeleter;
+use super::writer::PersyWriter;
 use crate::raw::*;
-use crate::services::PersyConfig;
 use crate::*;
 
 /// persy service support.
@@ -112,7 +107,7 @@ impl Builder for PersyBuilder {
             Ok(())
         }
 
-        Ok(PersyBackend::new(Adapter {
+        Ok(PersyBackend::new(PersyCore {
             datafile: datafile_path,
             segment,
             index,
@@ -122,98 +117,88 @@ impl Builder for PersyBuilder {
 }
 
 /// Backend for persy services.
-pub type PersyBackend = kv::Backend<Adapter>;
-
-#[derive(Clone)]
-pub struct Adapter {
-    datafile: String,
-    segment: String,
-    index: String,
-    persy: persy::Persy,
+#[derive(Clone, Debug)]
+pub struct PersyBackend {
+    core: Arc<PersyCore>,
+    root: String,
+    info: Arc<AccessorInfo>,
 }
 
-impl Debug for Adapter {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        let mut ds = f.debug_struct("Adapter");
-        ds.field("path", &self.datafile);
-        ds.field("segment", &self.segment);
-        ds.field("index", &self.index);
-        ds.finish()
+impl PersyBackend {
+    pub fn new(core: PersyCore) -> Self {
+        let info = AccessorInfo::default();
+        info.set_scheme(Scheme::Persy.into_static());
+        info.set_name(&core.datafile);
+        info.set_root("/");
+        info.set_native_capability(Capability {
+            read: true,
+            stat: true,
+            write: true,
+            write_can_empty: true,
+            delete: true,
+            shared: false,
+            ..Default::default()
+        });
+
+        Self {
+            core: Arc::new(core),
+            root: "/".to_string(),
+            info: Arc::new(info),
+        }
     }
 }
 
-impl kv::Adapter for Adapter {
-    type Scanner = ();
-
-    fn info(&self) -> kv::Info {
-        kv::Info::new(
-            Scheme::Persy,
-            &self.datafile,
-            Capability {
-                read: true,
-                write: true,
-                delete: true,
-                shared: false,
-                ..Default::default()
-            },
-        )
+impl Access for PersyBackend {
+    type Reader = Buffer;
+    type Writer = PersyWriter;
+    type Lister = ();
+    type Deleter = oio::OneShotDeleter<PersyDeleter>;
+
+    fn info(&self) -> Arc<AccessorInfo> {
+        self.info.clone()
     }
 
-    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
-        let mut read_id = self
-            .persy
-            .get::<String, persy::PersyId>(&self.index, &path.to_string())
-            .map_err(parse_error)?;
-        if let Some(id) = read_id.next() {
-            let value = self.persy.read(&self.segment, 
&id).map_err(parse_error)?;
-            return Ok(value.map(Buffer::from));
+    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+        let p = build_abs_path(&self.root, path);
+
+        if p == build_abs_path(&self.root, "") {
+            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+        } else {
+            let bs = self.core.get(&p)?;
+            match bs {
+                Some(bs) => Ok(RpStat::new(
+                    
Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
+                )),
+                None => Err(Error::new(ErrorKind::NotFound, "kv not found in 
persy")),
+            }
         }
-
-        Ok(None)
     }
 
-    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
-        let mut tx = self.persy.begin().map_err(parse_error)?;
-        let id = tx
-            .insert(&self.segment, &value.to_vec())
-            .map_err(parse_error)?;
-
-        tx.put::<String, persy::PersyId>(&self.index, path.to_string(), id)
-            .map_err(parse_error)?;
-        let prepared = tx.prepare().map_err(parse_error)?;
-        prepared.commit().map_err(parse_error)?;
-
-        Ok(())
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let p = build_abs_path(&self.root, path);
+        let bs = match self.core.get(&p)? {
+            Some(bs) => bs,
+            None => {
+                return Err(Error::new(ErrorKind::NotFound, "kv not found in 
persy"));
+            }
+        };
+        Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
     }
 
-    async fn delete(&self, path: &str) -> Result<()> {
-        let mut delete_id = self
-            .persy
-            .get::<String, persy::PersyId>(&self.index, &path.to_string())
-            .map_err(parse_error)?;
-        if let Some(id) = delete_id.next() {
-            // Begin a transaction.
-            let mut tx = self.persy.begin().map_err(parse_error)?;
-            // Delete the record.
-            tx.delete(&self.segment, &id).map_err(parse_error)?;
-            // Remove the index.
-            tx.remove::<String, persy::PersyId>(&self.index, path.to_string(), 
Some(id))
-                .map_err(parse_error)?;
-            // Commit the tx.
-            let prepared = tx.prepare().map_err(parse_error)?;
-            prepared.commit().map_err(parse_error)?;
-        }
-
-        Ok(())
+    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        let p = build_abs_path(&self.root, path);
+        Ok((RpWrite::new(), PersyWriter::new(self.core.clone(), p)))
     }
-}
 
-fn parse_error<T: Into<persy::PersyError>>(err: persy::PE<T>) -> Error {
-    let err: persy::PersyError = err.persy_error();
-    let kind = match err {
-        persy::PersyError::RecordNotFound(_) => ErrorKind::NotFound,
-        _ => ErrorKind::Unexpected,
-    };
+    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
+        Ok((
+            RpDelete::default(),
+            oio::OneShotDeleter::new(PersyDeleter::new(self.core.clone(), 
self.root.clone())),
+        ))
+    }
 
-    Error::new(kind, "error from persy").set_source(err)
+    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, 
Self::Lister)> {
+        let _ = build_abs_path(&self.root, path);
+        Ok((RpList::default(), ()))
+    }
 }
diff --git a/core/src/services/persy/config.rs 
b/core/src/services/persy/config.rs
index 203e1ed63..e66326cc0 100644
--- a/core/src/services/persy/config.rs
+++ b/core/src/services/persy/config.rs
@@ -15,10 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use super::backend::PersyBuilder;
 use serde::Deserialize;
 use serde::Serialize;
 
+use super::backend::PersyBuilder;
+
 /// Config for persy service support.
 #[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
 #[serde(default)]
@@ -34,6 +35,7 @@ pub struct PersyConfig {
 
 impl crate::Configurator for PersyConfig {
     type Builder = PersyBuilder;
+
     fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> {
         let mut map = uri.options().clone();
 
diff --git a/core/src/services/persy/core.rs b/core/src/services/persy/core.rs
new file mode 100644
index 000000000..32c5135b1
--- /dev/null
+++ b/core/src/services/persy/core.rs
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+
+use crate::*;
+
+#[derive(Clone)]
+pub struct PersyCore {
+    pub datafile: String,
+    pub segment: String,
+    pub index: String,
+    pub persy: persy::Persy,
+}
+
+impl Debug for PersyCore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut ds = f.debug_struct("Adapter");
+        ds.field("path", &self.datafile);
+        ds.field("segment", &self.segment);
+        ds.field("index", &self.index);
+        ds.finish()
+    }
+}
+
+impl PersyCore {
+    pub fn get(&self, path: &str) -> Result<Option<Buffer>> {
+        let mut read_id = self
+            .persy
+            .get::<String, persy::PersyId>(&self.index, &path.to_string())
+            .map_err(parse_error)?;
+        if let Some(id) = read_id.next() {
+            let value = self.persy.read(&self.segment, 
&id).map_err(parse_error)?;
+            return Ok(value.map(Buffer::from));
+        }
+
+        Ok(None)
+    }
+
+    pub fn set(&self, path: &str, value: Buffer) -> Result<()> {
+        let mut tx = self.persy.begin().map_err(parse_error)?;
+        let id = tx
+            .insert(&self.segment, &value.to_vec())
+            .map_err(parse_error)?;
+
+        tx.put::<String, persy::PersyId>(&self.index, path.to_string(), id)
+            .map_err(parse_error)?;
+        let prepared = tx.prepare().map_err(parse_error)?;
+        prepared.commit().map_err(parse_error)?;
+
+        Ok(())
+    }
+
+    pub fn delete(&self, path: &str) -> Result<()> {
+        let mut delete_id = self
+            .persy
+            .get::<String, persy::PersyId>(&self.index, &path.to_string())
+            .map_err(parse_error)?;
+        if let Some(id) = delete_id.next() {
+            // Begin a transaction.
+            let mut tx = self.persy.begin().map_err(parse_error)?;
+            // Delete the record.
+            tx.delete(&self.segment, &id).map_err(parse_error)?;
+            // Remove the index.
+            tx.remove::<String, persy::PersyId>(&self.index, path.to_string(), 
Some(id))
+                .map_err(parse_error)?;
+            // Commit the tx.
+            let prepared = tx.prepare().map_err(parse_error)?;
+            prepared.commit().map_err(parse_error)?;
+        }
+
+        Ok(())
+    }
+}
+
+fn parse_error<T: Into<persy::PersyError>>(err: persy::PE<T>) -> Error {
+    let err: persy::PersyError = err.persy_error();
+    let kind = match err {
+        persy::PersyError::RecordNotFound(_) => ErrorKind::NotFound,
+        _ => ErrorKind::Unexpected,
+    };
+
+    Error::new(kind, "error from persy").set_source(err)
+}
diff --git a/core/src/services/persy/mod.rs b/core/src/services/persy/deleter.rs
similarity index 61%
copy from core/src/services/persy/mod.rs
copy to core/src/services/persy/deleter.rs
index 6c387ff15..b6b96a9b3 100644
--- a/core/src/services/persy/mod.rs
+++ b/core/src/services/persy/deleter.rs
@@ -15,8 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod backend;
-pub use backend::PersyBuilder as Persy;
+use std::sync::Arc;
 
-mod config;
-pub use config::PersyConfig;
+use super::core::*;
+use crate::raw::oio;
+use crate::raw::*;
+use crate::*;
+
+pub struct PersyDeleter {
+    core: Arc<PersyCore>,
+    root: String,
+}
+
+impl PersyDeleter {
+    pub fn new(core: Arc<PersyCore>, root: String) -> Self {
+        Self { core, root }
+    }
+}
+
+impl oio::OneShotDelete for PersyDeleter {
+    async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
+        let p = build_abs_path(&self.root, &path);
+        self.core.delete(&p)?;
+        Ok(())
+    }
+}
diff --git a/core/src/services/persy/docs.md b/core/src/services/persy/docs.md
index 97176daca..e3c27bba4 100644
--- a/core/src/services/persy/docs.md
+++ b/core/src/services/persy/docs.md
@@ -2,16 +2,15 @@
 
 This service can be used to:
 
+- [ ] create_dir
 - [x] stat
 - [x] read
 - [x] write
-- [x] create_dir
 - [x] delete
 - [ ] copy
 - [ ] rename
 - [ ] list
 - [ ] ~~presign~~
-- [x] blocking
 
 ## Configuration
 
diff --git a/core/src/services/persy/mod.rs b/core/src/services/persy/mod.rs
index 6c387ff15..8dc68c854 100644
--- a/core/src/services/persy/mod.rs
+++ b/core/src/services/persy/mod.rs
@@ -16,6 +16,10 @@
 // under the License.
 
 mod backend;
+mod core;
+mod deleter;
+mod writer;
+
 pub use backend::PersyBuilder as Persy;
 
 mod config;
diff --git a/core/src/services/persy/writer.rs 
b/core/src/services/persy/writer.rs
new file mode 100644
index 000000000..6279b5625
--- /dev/null
+++ b/core/src/services/persy/writer.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use super::core::*;
+use crate::raw::oio;
+use crate::*;
+
+pub struct PersyWriter {
+    core: Arc<PersyCore>,
+    path: String,
+    buffer: oio::QueueBuf,
+}
+
+impl PersyWriter {
+    pub fn new(core: Arc<PersyCore>, path: String) -> Self {
+        Self {
+            core,
+            path,
+            buffer: oio::QueueBuf::new(),
+        }
+    }
+}
+
+impl oio::Write for PersyWriter {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
+        self.buffer.push(bs);
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<Metadata> {
+        let buf = self.buffer.clone().collect();
+        let length = buf.len() as u64;
+        self.core.set(&self.path, buf)?;
+
+        let meta = 
Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
+        Ok(meta)
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        self.buffer.clear();
+        Ok(())
+    }
+}

Reply via email to