This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 64a534868 refactor: migrate persy service from adapter::kv to impl
Access directly (#6721)
64a534868 is described below
commit 64a534868a3f308d5a25a5ebdd947e0d1f4d2533
Author: Qinxuan Chen <[email protected]>
AuthorDate: Wed Oct 22 17:23:45 2025 +0800
refactor: migrate persy service from adapter::kv to impl Access directly
(#6721)
---
core/src/services/persy/backend.rs | 165 +++++++++++--------------
core/src/services/persy/config.rs | 4 +-
core/src/services/persy/core.rs | 99 +++++++++++++++
core/src/services/persy/{mod.rs => deleter.rs} | 28 ++++-
core/src/services/persy/docs.md | 3 +-
core/src/services/persy/mod.rs | 4 +
core/src/services/persy/writer.rs | 59 +++++++++
7 files changed, 265 insertions(+), 97 deletions(-)
diff --git a/core/src/services/persy/backend.rs
b/core/src/services/persy/backend.rs
index a55e17e93..0667ac9ea 100644
--- a/core/src/services/persy/backend.rs
+++ b/core/src/services/persy/backend.rs
@@ -16,18 +16,13 @@
// under the License.
use std::fmt::Debug;
-use std::fmt::Formatter;
-use std::str;
+use std::sync::Arc;
-use persy;
-
-use crate::Builder;
-use crate::Error;
-use crate::ErrorKind;
-use crate::Scheme;
-use crate::raw::adapters::kv;
+use super::config::PersyConfig;
+use super::core::*;
+use super::deleter::PersyDeleter;
+use super::writer::PersyWriter;
use crate::raw::*;
-use crate::services::PersyConfig;
use crate::*;
/// persy service support.
@@ -112,7 +107,7 @@ impl Builder for PersyBuilder {
Ok(())
}
- Ok(PersyBackend::new(Adapter {
+ Ok(PersyBackend::new(PersyCore {
datafile: datafile_path,
segment,
index,
@@ -122,98 +117,88 @@ impl Builder for PersyBuilder {
}
/// Backend for persy services.
-pub type PersyBackend = kv::Backend<Adapter>;
-
-#[derive(Clone)]
-pub struct Adapter {
- datafile: String,
- segment: String,
- index: String,
- persy: persy::Persy,
+#[derive(Clone, Debug)]
+pub struct PersyBackend {
+ core: Arc<PersyCore>,
+ root: String,
+ info: Arc<AccessorInfo>,
}
-impl Debug for Adapter {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- let mut ds = f.debug_struct("Adapter");
- ds.field("path", &self.datafile);
- ds.field("segment", &self.segment);
- ds.field("index", &self.index);
- ds.finish()
+impl PersyBackend {
+ pub fn new(core: PersyCore) -> Self {
+ let info = AccessorInfo::default();
+ info.set_scheme(Scheme::Persy.into_static());
+ info.set_name(&core.datafile);
+ info.set_root("/");
+ info.set_native_capability(Capability {
+ read: true,
+ stat: true,
+ write: true,
+ write_can_empty: true,
+ delete: true,
+ shared: false,
+ ..Default::default()
+ });
+
+ Self {
+ core: Arc::new(core),
+ root: "/".to_string(),
+ info: Arc::new(info),
+ }
}
}
-impl kv::Adapter for Adapter {
- type Scanner = ();
-
- fn info(&self) -> kv::Info {
- kv::Info::new(
- Scheme::Persy,
- &self.datafile,
- Capability {
- read: true,
- write: true,
- delete: true,
- shared: false,
- ..Default::default()
- },
- )
+impl Access for PersyBackend {
+ type Reader = Buffer;
+ type Writer = PersyWriter;
+ type Lister = ();
+ type Deleter = oio::OneShotDeleter<PersyDeleter>;
+
+ fn info(&self) -> Arc<AccessorInfo> {
+ self.info.clone()
}
- async fn get(&self, path: &str) -> Result<Option<Buffer>> {
- let mut read_id = self
- .persy
- .get::<String, persy::PersyId>(&self.index, &path.to_string())
- .map_err(parse_error)?;
- if let Some(id) = read_id.next() {
- let value = self.persy.read(&self.segment,
&id).map_err(parse_error)?;
- return Ok(value.map(Buffer::from));
+ async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+ let p = build_abs_path(&self.root, path);
+
+ if p == build_abs_path(&self.root, "") {
+ Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+ } else {
+ let bs = self.core.get(&p)?;
+ match bs {
+ Some(bs) => Ok(RpStat::new(
+
Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
+ )),
+ None => Err(Error::new(ErrorKind::NotFound, "kv not found in
persy")),
+ }
}
-
- Ok(None)
}
- async fn set(&self, path: &str, value: Buffer) -> Result<()> {
- let mut tx = self.persy.begin().map_err(parse_error)?;
- let id = tx
- .insert(&self.segment, &value.to_vec())
- .map_err(parse_error)?;
-
- tx.put::<String, persy::PersyId>(&self.index, path.to_string(), id)
- .map_err(parse_error)?;
- let prepared = tx.prepare().map_err(parse_error)?;
- prepared.commit().map_err(parse_error)?;
-
- Ok(())
+ async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ let p = build_abs_path(&self.root, path);
+ let bs = match self.core.get(&p)? {
+ Some(bs) => bs,
+ None => {
+ return Err(Error::new(ErrorKind::NotFound, "kv not found in
persy"));
+ }
+ };
+ Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
}
- async fn delete(&self, path: &str) -> Result<()> {
- let mut delete_id = self
- .persy
- .get::<String, persy::PersyId>(&self.index, &path.to_string())
- .map_err(parse_error)?;
- if let Some(id) = delete_id.next() {
- // Begin a transaction.
- let mut tx = self.persy.begin().map_err(parse_error)?;
- // Delete the record.
- tx.delete(&self.segment, &id).map_err(parse_error)?;
- // Remove the index.
- tx.remove::<String, persy::PersyId>(&self.index, path.to_string(),
Some(id))
- .map_err(parse_error)?;
- // Commit the tx.
- let prepared = tx.prepare().map_err(parse_error)?;
- prepared.commit().map_err(parse_error)?;
- }
-
- Ok(())
+ async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ let p = build_abs_path(&self.root, path);
+ Ok((RpWrite::new(), PersyWriter::new(self.core.clone(), p)))
}
-}
-fn parse_error<T: Into<persy::PersyError>>(err: persy::PE<T>) -> Error {
- let err: persy::PersyError = err.persy_error();
- let kind = match err {
- persy::PersyError::RecordNotFound(_) => ErrorKind::NotFound,
- _ => ErrorKind::Unexpected,
- };
+ async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
+ Ok((
+ RpDelete::default(),
+ oio::OneShotDeleter::new(PersyDeleter::new(self.core.clone(),
self.root.clone())),
+ ))
+ }
- Error::new(kind, "error from persy").set_source(err)
+ async fn list(&self, path: &str, _: OpList) -> Result<(RpList,
Self::Lister)> {
+ let _ = build_abs_path(&self.root, path);
+ Ok((RpList::default(), ()))
+ }
}
diff --git a/core/src/services/persy/config.rs
b/core/src/services/persy/config.rs
index 203e1ed63..e66326cc0 100644
--- a/core/src/services/persy/config.rs
+++ b/core/src/services/persy/config.rs
@@ -15,10 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use super::backend::PersyBuilder;
use serde::Deserialize;
use serde::Serialize;
+use super::backend::PersyBuilder;
+
/// Config for persy service support.
#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
@@ -34,6 +35,7 @@ pub struct PersyConfig {
impl crate::Configurator for PersyConfig {
type Builder = PersyBuilder;
+
fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> {
let mut map = uri.options().clone();
diff --git a/core/src/services/persy/core.rs b/core/src/services/persy/core.rs
new file mode 100644
index 000000000..32c5135b1
--- /dev/null
+++ b/core/src/services/persy/core.rs
@@ -0,0 +1,99 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+
+use crate::*;
+
+#[derive(Clone)]
+pub struct PersyCore {
+ pub datafile: String,
+ pub segment: String,
+ pub index: String,
+ pub persy: persy::Persy,
+}
+
+impl Debug for PersyCore {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let mut ds = f.debug_struct("Adapter");
+ ds.field("path", &self.datafile);
+ ds.field("segment", &self.segment);
+ ds.field("index", &self.index);
+ ds.finish()
+ }
+}
+
+impl PersyCore {
+ pub fn get(&self, path: &str) -> Result<Option<Buffer>> {
+ let mut read_id = self
+ .persy
+ .get::<String, persy::PersyId>(&self.index, &path.to_string())
+ .map_err(parse_error)?;
+ if let Some(id) = read_id.next() {
+ let value = self.persy.read(&self.segment,
&id).map_err(parse_error)?;
+ return Ok(value.map(Buffer::from));
+ }
+
+ Ok(None)
+ }
+
+ pub fn set(&self, path: &str, value: Buffer) -> Result<()> {
+ let mut tx = self.persy.begin().map_err(parse_error)?;
+ let id = tx
+ .insert(&self.segment, &value.to_vec())
+ .map_err(parse_error)?;
+
+ tx.put::<String, persy::PersyId>(&self.index, path.to_string(), id)
+ .map_err(parse_error)?;
+ let prepared = tx.prepare().map_err(parse_error)?;
+ prepared.commit().map_err(parse_error)?;
+
+ Ok(())
+ }
+
+ pub fn delete(&self, path: &str) -> Result<()> {
+ let mut delete_id = self
+ .persy
+ .get::<String, persy::PersyId>(&self.index, &path.to_string())
+ .map_err(parse_error)?;
+ if let Some(id) = delete_id.next() {
+ // Begin a transaction.
+ let mut tx = self.persy.begin().map_err(parse_error)?;
+ // Delete the record.
+ tx.delete(&self.segment, &id).map_err(parse_error)?;
+ // Remove the index.
+ tx.remove::<String, persy::PersyId>(&self.index, path.to_string(),
Some(id))
+ .map_err(parse_error)?;
+ // Commit the tx.
+ let prepared = tx.prepare().map_err(parse_error)?;
+ prepared.commit().map_err(parse_error)?;
+ }
+
+ Ok(())
+ }
+}
+
+fn parse_error<T: Into<persy::PersyError>>(err: persy::PE<T>) -> Error {
+ let err: persy::PersyError = err.persy_error();
+ let kind = match err {
+ persy::PersyError::RecordNotFound(_) => ErrorKind::NotFound,
+ _ => ErrorKind::Unexpected,
+ };
+
+ Error::new(kind, "error from persy").set_source(err)
+}
diff --git a/core/src/services/persy/mod.rs b/core/src/services/persy/deleter.rs
similarity index 61%
copy from core/src/services/persy/mod.rs
copy to core/src/services/persy/deleter.rs
index 6c387ff15..b6b96a9b3 100644
--- a/core/src/services/persy/mod.rs
+++ b/core/src/services/persy/deleter.rs
@@ -15,8 +15,28 @@
// specific language governing permissions and limitations
// under the License.
-mod backend;
-pub use backend::PersyBuilder as Persy;
+use std::sync::Arc;
-mod config;
-pub use config::PersyConfig;
+use super::core::*;
+use crate::raw::oio;
+use crate::raw::*;
+use crate::*;
+
+pub struct PersyDeleter {
+ core: Arc<PersyCore>,
+ root: String,
+}
+
+impl PersyDeleter {
+ pub fn new(core: Arc<PersyCore>, root: String) -> Self {
+ Self { core, root }
+ }
+}
+
+impl oio::OneShotDelete for PersyDeleter {
+ async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
+ let p = build_abs_path(&self.root, &path);
+ self.core.delete(&p)?;
+ Ok(())
+ }
+}
diff --git a/core/src/services/persy/docs.md b/core/src/services/persy/docs.md
index 97176daca..e3c27bba4 100644
--- a/core/src/services/persy/docs.md
+++ b/core/src/services/persy/docs.md
@@ -2,16 +2,15 @@
This service can be used to:
+- [ ] create_dir
- [x] stat
- [x] read
- [x] write
-- [x] create_dir
- [x] delete
- [ ] copy
- [ ] rename
- [ ] list
- [ ] ~~presign~~
-- [x] blocking
## Configuration
diff --git a/core/src/services/persy/mod.rs b/core/src/services/persy/mod.rs
index 6c387ff15..8dc68c854 100644
--- a/core/src/services/persy/mod.rs
+++ b/core/src/services/persy/mod.rs
@@ -16,6 +16,10 @@
// under the License.
mod backend;
+mod core;
+mod deleter;
+mod writer;
+
pub use backend::PersyBuilder as Persy;
mod config;
diff --git a/core/src/services/persy/writer.rs
b/core/src/services/persy/writer.rs
new file mode 100644
index 000000000..6279b5625
--- /dev/null
+++ b/core/src/services/persy/writer.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use super::core::*;
+use crate::raw::oio;
+use crate::*;
+
+pub struct PersyWriter {
+ core: Arc<PersyCore>,
+ path: String,
+ buffer: oio::QueueBuf,
+}
+
+impl PersyWriter {
+ pub fn new(core: Arc<PersyCore>, path: String) -> Self {
+ Self {
+ core,
+ path,
+ buffer: oio::QueueBuf::new(),
+ }
+ }
+}
+
+impl oio::Write for PersyWriter {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
+ self.buffer.push(bs);
+ Ok(())
+ }
+
+ async fn close(&mut self) -> Result<Metadata> {
+ let buf = self.buffer.clone().collect();
+ let length = buf.len() as u64;
+ self.core.set(&self.path, buf)?;
+
+ let meta =
Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
+ Ok(meta)
+ }
+
+ async fn abort(&mut self) -> Result<()> {
+ self.buffer.clear();
+ Ok(())
+ }
+}