This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 29974a735 refactor: migrate sled service from adapter::kv to impl
Access directly (#6731)
29974a735 is described below
commit 29974a73576965a25c3fbd3f68b33b9db25e4587
Author: Qinxuan Chen <[email protected]>
AuthorDate: Fri Oct 24 01:16:58 2025 +0800
refactor: migrate sled service from adapter::kv to impl Access directly
(#6731)
* refactor: migrate sled service from adapter::kv to impl Access directly
* fix tests
---
core/src/services/sled/backend.rs | 172 ++++++++++++++------------
core/src/services/sled/config.rs | 10 +-
core/src/services/sled/core.rs | 75 +++++++++++
core/src/services/sled/{mod.rs => deleter.rs} | 28 ++++-
core/src/services/sled/docs.md | 9 +-
core/src/services/sled/lister.rs | 59 +++++++++
core/src/services/sled/mod.rs | 5 +
core/src/services/sled/writer.rs | 59 +++++++++
8 files changed, 324 insertions(+), 93 deletions(-)
diff --git a/core/src/services/sled/backend.rs
b/core/src/services/sled/backend.rs
index a85ff8d7e..7dda922f3 100644
--- a/core/src/services/sled/backend.rs
+++ b/core/src/services/sled/backend.rs
@@ -15,17 +15,14 @@
// specific language governing permissions and limitations
// under the License.
-use std::fmt::Debug;
-use std::fmt::Formatter;
-use std::str;
-
-use crate::Builder;
-use crate::Error;
-use crate::ErrorKind;
-use crate::Scheme;
-use crate::raw::adapters::kv;
+use std::sync::Arc;
+
+use super::config::SledConfig;
+use super::core::*;
+use super::deleter::SledDeleter;
+use super::lister::SledLister;
+use super::writer::SledWriter;
use crate::raw::*;
-use crate::services::SledConfig;
use crate::*;
//
https://github.com/spacejam/sled/blob/69294e59c718289ab3cb6bd03ac3b9e1e072a1e7/src/db.rs#L5
@@ -38,14 +35,6 @@ pub struct SledBuilder {
pub(super) config: SledConfig,
}
-impl Debug for SledBuilder {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- f.debug_struct("SledBuilder")
- .field("config", &self.config)
- .finish()
- }
-}
-
impl SledBuilder {
/// Set the path to the sled data directory. Will create if not exists.
pub fn datadir(mut self, path: &str) -> Self {
@@ -53,6 +42,12 @@ impl SledBuilder {
self
}
+ /// Set the tree for sled.
+ pub fn tree(mut self, tree: &str) -> Self {
+ self.config.tree = Some(tree.into());
+ self
+ }
+
/// Set the root for sled.
pub fn root(mut self, root: &str) -> Self {
self.config.root = if root.is_empty() {
@@ -63,12 +58,6 @@ impl SledBuilder {
self
}
-
- /// Set the tree for sled.
- pub fn tree(mut self, tree: &str) -> Self {
- self.config.tree = Some(tree.into());
- self
- }
}
impl Builder for SledBuilder {
@@ -101,87 +90,110 @@ impl Builder for SledBuilder {
.set_source(e)
})?;
- Ok(SledBackend::new(Adapter {
+ let root = normalize_root(&self.config.root.unwrap_or_default());
+
+ Ok(SledBackend::new(SledCore {
datadir: datadir_path,
tree,
})
- .with_root(self.config.root.as_deref().unwrap_or("/")))
+ .with_normalized_root(root))
}
}
/// Backend for sled services.
-pub type SledBackend = kv::Backend<Adapter>;
-
-#[derive(Clone)]
-pub struct Adapter {
- datadir: String,
- tree: sled::Tree,
+#[derive(Clone, Debug)]
+pub struct SledBackend {
+ core: Arc<SledCore>,
+ root: String,
+ info: Arc<AccessorInfo>,
}
-impl Debug for Adapter {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- let mut ds = f.debug_struct("Adapter");
- ds.field("path", &self.datadir);
- ds.finish()
- }
-}
-
-impl kv::Adapter for Adapter {
- type Scanner = kv::Scanner;
-
- fn info(&self) -> kv::Info {
- kv::Info::new(
- Scheme::Sled,
- &self.datadir,
- Capability {
+impl SledBackend {
+ pub fn new(core: SledCore) -> Self {
+ let info = AccessorInfo::default();
+ info.set_scheme(Scheme::Sled.into_static())
+ .set_name(&core.datadir)
+ .set_root("/")
+ .set_native_capability(Capability {
read: true,
+ stat: true,
write: true,
+ write_can_empty: true,
+ delete: true,
list: true,
+ list_with_recursive: true,
shared: false,
..Default::default()
- },
- )
- }
+ });
- async fn get(&self, path: &str) -> Result<Option<Buffer>> {
- Ok(self
- .tree
- .get(path)
- .map_err(parse_error)?
- .map(|v| Buffer::from(v.to_vec())))
+ Self {
+ core: Arc::new(core),
+ root: "/".to_string(),
+ info: Arc::new(info),
+ }
}
- async fn set(&self, path: &str, value: Buffer) -> Result<()> {
- self.tree
- .insert(path, value.to_vec())
- .map_err(parse_error)?;
- Ok(())
+ fn with_normalized_root(mut self, root: String) -> Self {
+ self.info.set_root(&root);
+ self.root = root;
+ self
}
+}
- async fn delete(&self, path: &str) -> Result<()> {
- self.tree.remove(path).map_err(parse_error)?;
+impl Access for SledBackend {
+ type Reader = Buffer;
+ type Writer = SledWriter;
+ type Lister = oio::HierarchyLister<SledLister>;
+ type Deleter = oio::OneShotDeleter<SledDeleter>;
- Ok(())
+ fn info(&self) -> Arc<AccessorInfo> {
+ self.info.clone()
}
- async fn scan(&self, path: &str) -> Result<Self::Scanner> {
- let it = self.tree.scan_prefix(path).keys();
- let mut res = Vec::default();
-
- for i in it {
- let bs = i.map_err(parse_error)?.to_vec();
- let v = String::from_utf8(bs).map_err(|err| {
- Error::new(ErrorKind::Unexpected, "store key is not valid
utf-8 string")
- .set_source(err)
- })?;
+ async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+ let p = build_abs_path(&self.root, path);
- res.push(v);
+ if p == build_abs_path(&self.root, "") {
+ Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+ } else {
+ let bs = self.core.get(&p)?;
+ match bs {
+ Some(bs) => Ok(RpStat::new(
+
Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
+ )),
+ None => Err(Error::new(ErrorKind::NotFound, "kv not found in
sled")),
+ }
}
+ }
- Ok(Box::new(kv::ScanStdIter::new(res.into_iter().map(Ok))))
+ async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ let p = build_abs_path(&self.root, path);
+ let bs = match self.core.get(&p)? {
+ Some(bs) => bs,
+ None => {
+ return Err(Error::new(ErrorKind::NotFound, "kv not found in
sled"));
+ }
+ };
+ Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
}
-}
-fn parse_error(err: sled::Error) -> Error {
- Error::new(ErrorKind::Unexpected, "error from sled").set_source(err)
+ async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ let p = build_abs_path(&self.root, path);
+ let writer = SledWriter::new(self.core.clone(), p);
+ Ok((RpWrite::new(), writer))
+ }
+
+ async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
+ let deleter = SledDeleter::new(self.core.clone(), self.root.clone());
+ Ok((RpDelete::default(), oio::OneShotDeleter::new(deleter)))
+ }
+
+ async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Lister)> {
+ let p = build_abs_path(&self.root, path);
+ let lister = SledLister::new(self.core.clone(), self.root.clone(), p)?;
+ Ok((
+ RpList::default(),
+ oio::HierarchyLister::new(lister, path, args.recursive()),
+ ))
+ }
}
diff --git a/core/src/services/sled/config.rs b/core/src/services/sled/config.rs
index 8777e5a29..d99ff8359 100644
--- a/core/src/services/sled/config.rs
+++ b/core/src/services/sled/config.rs
@@ -18,10 +18,11 @@
use std::fmt::Debug;
use std::fmt::Formatter;
-use super::backend::SledBuilder;
use serde::Deserialize;
use serde::Serialize;
+use super::backend::SledBuilder;
+
/// Config for Sled services support.
#[derive(Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
@@ -29,24 +30,25 @@ use serde::Serialize;
pub struct SledConfig {
/// That path to the sled data directory.
pub datadir: Option<String>,
- /// The root for sled.
- pub root: Option<String>,
/// The tree for sled.
pub tree: Option<String>,
+ /// The root for sled.
+ pub root: Option<String>,
}
impl Debug for SledConfig {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SledConfig")
.field("datadir", &self.datadir)
- .field("root", &self.root)
.field("tree", &self.tree)
+ .field("root", &self.root)
.finish()
}
}
impl crate::Configurator for SledConfig {
type Builder = SledBuilder;
+
fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> {
let mut map = uri.options().clone();
diff --git a/core/src/services/sled/core.rs b/core/src/services/sled/core.rs
new file mode 100644
index 000000000..3cbb3203a
--- /dev/null
+++ b/core/src/services/sled/core.rs
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+
+use crate::*;
+
+#[derive(Clone)]
+pub struct SledCore {
+ pub datadir: String,
+ pub tree: sled::Tree,
+}
+
+impl Debug for SledCore {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let mut ds = f.debug_struct("SledCore");
+ ds.field("path", &self.datadir);
+ ds.finish()
+ }
+}
+
+impl SledCore {
+ pub fn get(&self, path: &str) -> Result<Option<Buffer>> {
+ let res = self.tree.get(path).map_err(parse_error)?;
+ Ok(res.map(|v| Buffer::from(v.to_vec())))
+ }
+
+ pub fn set(&self, path: &str, value: Buffer) -> Result<()> {
+ self.tree
+ .insert(path, value.to_vec())
+ .map_err(parse_error)?;
+ Ok(())
+ }
+
+ pub fn delete(&self, path: &str) -> Result<()> {
+ self.tree.remove(path).map_err(parse_error)?;
+ Ok(())
+ }
+
+ pub fn list(&self, path: &str) -> Result<Vec<String>> {
+ let it = self.tree.scan_prefix(path).keys();
+ let mut res = Vec::default();
+
+ for i in it {
+ let bs = i.map_err(parse_error)?.to_vec();
+ let v = String::from_utf8(bs).map_err(|err| {
+ Error::new(ErrorKind::Unexpected, "store key is not valid
utf-8 string")
+ .set_source(err)
+ })?;
+
+ res.push(v);
+ }
+
+ Ok(res)
+ }
+}
+
+fn parse_error(err: sled::Error) -> Error {
+ Error::new(ErrorKind::Unexpected, "error from sled").set_source(err)
+}
diff --git a/core/src/services/sled/mod.rs b/core/src/services/sled/deleter.rs
similarity index 61%
copy from core/src/services/sled/mod.rs
copy to core/src/services/sled/deleter.rs
index 49794745a..15e15f63c 100644
--- a/core/src/services/sled/mod.rs
+++ b/core/src/services/sled/deleter.rs
@@ -15,8 +15,28 @@
// specific language governing permissions and limitations
// under the License.
-mod backend;
-pub use backend::SledBuilder as Sled;
+use std::sync::Arc;
-mod config;
-pub use config::SledConfig;
+use super::core::*;
+use crate::raw::oio;
+use crate::raw::*;
+use crate::*;
+
+pub struct SledDeleter {
+ core: Arc<SledCore>,
+ root: String,
+}
+
+impl SledDeleter {
+ pub fn new(core: Arc<SledCore>, root: String) -> Self {
+ Self { core, root }
+ }
+}
+
+impl oio::OneShotDelete for SledDeleter {
+ async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
+ let p = build_abs_path(&self.root, &path);
+ self.core.delete(&p)?;
+ Ok(())
+ }
+}
diff --git a/core/src/services/sled/docs.md b/core/src/services/sled/docs.md
index 69dd96106..4cb4d05ca 100644
--- a/core/src/services/sled/docs.md
+++ b/core/src/services/sled/docs.md
@@ -2,16 +2,15 @@
This service can be used to:
+- [ ] create_dir
- [x] stat
- [x] read
- [x] write
-- [x] create_dir
- [x] delete
-- [x] copy
-- [x] rename
-- [ ] ~~list~~
+- [ ] copy
+- [ ] rename
+- [x] list
- [ ] ~~presign~~
-- [x] blocking
## Configuration
diff --git a/core/src/services/sled/lister.rs b/core/src/services/sled/lister.rs
new file mode 100644
index 000000000..4529dabe8
--- /dev/null
+++ b/core/src/services/sled/lister.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+use std::vec::IntoIter;
+
+use super::core::*;
+use crate::raw::oio;
+use crate::raw::*;
+use crate::*;
+
+pub struct SledLister {
+ root: String,
+ iter: IntoIter<String>,
+}
+
+impl SledLister {
+ pub fn new(core: Arc<SledCore>, root: String, path: String) ->
Result<Self> {
+ let entries = core.list(&path)?;
+
+ Ok(Self {
+ root,
+ iter: entries.into_iter(),
+ })
+ }
+}
+
+impl oio::List for SledLister {
+ async fn next(&mut self) -> Result<Option<oio::Entry>> {
+ if let Some(key) = self.iter.next() {
+ let path = build_rel_path(&self.root, &key);
+
+ // Determine if it's a file or directory based on trailing slash
+ let mode = if key.ends_with('/') {
+ EntryMode::DIR
+ } else {
+ EntryMode::FILE
+ };
+ let entry = oio::Entry::new(&path, Metadata::new(mode));
+ return Ok(Some(entry));
+ }
+
+ Ok(None)
+ }
+}
diff --git a/core/src/services/sled/mod.rs b/core/src/services/sled/mod.rs
index 49794745a..b40c3ff15 100644
--- a/core/src/services/sled/mod.rs
+++ b/core/src/services/sled/mod.rs
@@ -16,6 +16,11 @@
// under the License.
mod backend;
+mod core;
+mod deleter;
+mod lister;
+mod writer;
+
pub use backend::SledBuilder as Sled;
mod config;
diff --git a/core/src/services/sled/writer.rs b/core/src/services/sled/writer.rs
new file mode 100644
index 000000000..a1178fa87
--- /dev/null
+++ b/core/src/services/sled/writer.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use super::core::*;
+use crate::raw::oio;
+use crate::*;
+
+pub struct SledWriter {
+ core: Arc<SledCore>,
+ path: String,
+ buffer: oio::QueueBuf,
+}
+
+impl SledWriter {
+ pub fn new(core: Arc<SledCore>, path: String) -> Self {
+ Self {
+ core,
+ path,
+ buffer: oio::QueueBuf::new(),
+ }
+ }
+}
+
+impl oio::Write for SledWriter {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
+ self.buffer.push(bs);
+ Ok(())
+ }
+
+ async fn close(&mut self) -> Result<Metadata> {
+ let buf = self.buffer.clone().collect();
+ let length = buf.len() as u64;
+ self.core.set(&self.path, buf)?;
+
+ let meta =
Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
+ Ok(meta)
+ }
+
+ async fn abort(&mut self) -> Result<()> {
+ self.buffer.clear();
+ Ok(())
+ }
+}