This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new 71e8a3274 refactor: migrate rocksdb service from adapter::kv to impl
Access directly (#6732)
71e8a3274 is described below
commit 71e8a3274c2b8325f66fe200b8ffa748523d5902
Author: Qinxuan Chen <[email protected]>
AuthorDate: Fri Oct 24 01:17:31 2025 +0800
refactor: migrate rocksdb service from adapter::kv to impl Access directly
(#6732)
* refactor: migrate rocksdb service from adapter::kv to impl Access directly
* revert rocksdb: 0.24.0 => 0.21.0
* fix tests
---
core/src/services/rocksdb/backend.rs | 141 +++++++++++++----------
core/src/services/rocksdb/config.rs | 4 +-
core/src/services/rocksdb/core.rs | 74 ++++++++++++
core/src/services/rocksdb/{mod.rs => deleter.rs} | 28 ++++-
core/src/services/rocksdb/docs.md | 9 +-
core/src/services/rocksdb/lister.rs | 59 ++++++++++
core/src/services/rocksdb/mod.rs | 5 +
core/src/services/rocksdb/writer.rs | 59 ++++++++++
8 files changed, 310 insertions(+), 69 deletions(-)
diff --git a/core/src/services/rocksdb/backend.rs
b/core/src/services/rocksdb/backend.rs
index 624d2abe6..454bc1ae0 100644
--- a/core/src/services/rocksdb/backend.rs
+++ b/core/src/services/rocksdb/backend.rs
@@ -15,16 +15,16 @@
// specific language governing permissions and limitations
// under the License.
-use std::fmt::Debug;
-use std::fmt::Formatter;
use std::sync::Arc;
use rocksdb::DB;
-use crate::Result;
-use crate::raw::adapters::kv;
+use super::config::RocksdbConfig;
+use super::core::*;
+use super::deleter::RocksdbDeleter;
+use super::lister::RocksdbLister;
+use super::writer::RocksdbWriter;
use crate::raw::*;
-use crate::services::RocksdbConfig;
use crate::*;
/// RocksDB service support.
@@ -70,83 +70,106 @@ impl Builder for RocksdbBuilder {
.set_source(e)
})?;
- let root = normalize_root(
- self.config
- .root
- .clone()
- .unwrap_or_else(|| "/".to_string())
- .as_str(),
- );
+ let root = normalize_root(&self.config.root.unwrap_or_default());
- Ok(RocksdbBackend::new(Adapter { db: Arc::new(db)
}).with_normalized_root(root))
+ Ok(RocksdbBackend::new(RocksdbCore { db: Arc::new(db)
}).with_normalized_root(root))
}
}
/// Backend for rocksdb services.
-pub type RocksdbBackend = kv::Backend<Adapter>;
-
-#[derive(Clone)]
-pub struct Adapter {
- db: Arc<DB>,
+#[derive(Clone, Debug)]
+pub struct RocksdbBackend {
+ core: Arc<RocksdbCore>,
+ root: String,
+ info: Arc<AccessorInfo>,
}
-impl Debug for Adapter {
- fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
- let mut ds = f.debug_struct("Adapter");
- ds.field("path", &self.db.path());
- ds.finish()
- }
-}
-
-impl kv::Adapter for Adapter {
- type Scanner = kv::Scanner;
-
- fn info(&self) -> kv::Info {
- kv::Info::new(
- Scheme::Rocksdb,
- &self.db.path().to_string_lossy(),
- Capability {
+impl RocksdbBackend {
+ pub fn new(core: RocksdbCore) -> Self {
+ let info = AccessorInfo::default();
+ info.set_scheme(Scheme::Rocksdb.into_static())
+ .set_name(&core.db.path().to_string_lossy())
+ .set_root("/")
+ .set_native_capability(Capability {
read: true,
+ stat: true,
write: true,
+ write_can_empty: true,
+ delete: true,
list: true,
+ list_with_recursive: true,
shared: false,
..Default::default()
- },
- )
- }
+ });
- async fn get(&self, path: &str) -> Result<Option<Buffer>> {
- let result = self.db.get(path).map_err(parse_rocksdb_error)?;
- Ok(result.map(Buffer::from))
+ Self {
+ core: Arc::new(core),
+ root: "/".to_string(),
+ info: Arc::new(info),
+ }
}
- async fn set(&self, path: &str, value: Buffer) -> Result<()> {
- self.db
- .put(path, value.to_vec())
- .map_err(parse_rocksdb_error)
+ fn with_normalized_root(mut self, root: String) -> Self {
+ self.info.set_root(&root);
+ self.root = root;
+ self
}
+}
+
+impl Access for RocksdbBackend {
+ type Reader = Buffer;
+ type Writer = RocksdbWriter;
+ type Lister = oio::HierarchyLister<RocksdbLister>;
+ type Deleter = oio::OneShotDeleter<RocksdbDeleter>;
- async fn delete(&self, path: &str) -> Result<()> {
- self.db.delete(path).map_err(parse_rocksdb_error)
+ fn info(&self) -> Arc<AccessorInfo> {
+ self.info.clone()
}
- async fn scan(&self, path: &str) -> Result<Self::Scanner> {
- let it = self.db.prefix_iterator(path).map(|r| r.map(|(k, _)| k));
- let mut res = Vec::default();
+ async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+ let p = build_abs_path(&self.root, path);
- for key in it {
- let key = key.map_err(parse_rocksdb_error)?;
- let key = String::from_utf8_lossy(&key);
- if !key.starts_with(path) {
- break;
+ if p == build_abs_path(&self.root, "") {
+ Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+ } else {
+ let bs = self.core.get(&p)?;
+ match bs {
+ Some(bs) => Ok(RpStat::new(
+
Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
+ )),
+ None => Err(Error::new(ErrorKind::NotFound, "kv not found in
rocksdb")),
}
- res.push(key.to_string());
}
+ }
- Ok(Box::new(kv::ScanStdIter::new(res.into_iter().map(Ok))))
+ async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead,
Self::Reader)> {
+ let p = build_abs_path(&self.root, path);
+ let bs = match self.core.get(&p)? {
+ Some(bs) => bs,
+ None => {
+ return Err(Error::new(ErrorKind::NotFound, "kv not found in
rocksdb"));
+ }
+ };
+ Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
}
-}
-fn parse_rocksdb_error(e: rocksdb::Error) -> Error {
- Error::new(ErrorKind::Unexpected, "got rocksdb error").set_source(e)
+ async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
+ let p = build_abs_path(&self.root, path);
+ let writer = RocksdbWriter::new(self.core.clone(), p);
+ Ok((RpWrite::new(), writer))
+ }
+
+ async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
+ let deleter = RocksdbDeleter::new(self.core.clone(),
self.root.clone());
+ Ok((RpDelete::default(), oio::OneShotDeleter::new(deleter)))
+ }
+
+ async fn list(&self, path: &str, args: OpList) -> Result<(RpList,
Self::Lister)> {
+ let p = build_abs_path(&self.root, path);
+ let lister = RocksdbLister::new(self.core.clone(), self.root.clone(),
p)?;
+ Ok((
+ RpList::default(),
+ oio::HierarchyLister::new(lister, path, args.recursive()),
+ ))
+ }
}
diff --git a/core/src/services/rocksdb/config.rs
b/core/src/services/rocksdb/config.rs
index e1fc5d8ec..0f1b05ffe 100644
--- a/core/src/services/rocksdb/config.rs
+++ b/core/src/services/rocksdb/config.rs
@@ -17,10 +17,11 @@
use std::fmt::Debug;
-use super::backend::RocksdbBuilder;
use serde::Deserialize;
use serde::Serialize;
+use super::backend::RocksdbBuilder;
+
/// Config for Rocksdb Service.
#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
#[serde(default)]
@@ -36,6 +37,7 @@ pub struct RocksdbConfig {
impl crate::Configurator for RocksdbConfig {
type Builder = RocksdbBuilder;
+
fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> {
let mut map = uri.options().clone();
diff --git a/core/src/services/rocksdb/core.rs
b/core/src/services/rocksdb/core.rs
new file mode 100644
index 000000000..33e4cf875
--- /dev/null
+++ b/core/src/services/rocksdb/core.rs
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use rocksdb::DB;
+
+use crate::*;
+
+#[derive(Clone)]
+pub struct RocksdbCore {
+ pub db: Arc<DB>,
+}
+
+impl Debug for RocksdbCore {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ let mut ds = f.debug_struct("RocksdbCore");
+ ds.field("path", &self.db.path());
+ ds.finish()
+ }
+}
+
+impl RocksdbCore {
+ pub fn get(&self, path: &str) -> Result<Option<Buffer>> {
+ let result = self.db.get(path).map_err(parse_rocksdb_error)?;
+ Ok(result.map(Buffer::from))
+ }
+
+ pub fn set(&self, path: &str, value: Buffer) -> Result<()> {
+ self.db
+ .put(path, value.to_vec())
+ .map_err(parse_rocksdb_error)
+ }
+
+ pub fn delete(&self, path: &str) -> Result<()> {
+ self.db.delete(path).map_err(parse_rocksdb_error)
+ }
+
+ pub fn list(&self, path: &str) -> Result<Vec<String>> {
+ let it = self.db.prefix_iterator(path).map(|r| r.map(|(k, _)| k));
+ let mut res = Vec::default();
+
+ for key in it {
+ let key = key.map_err(parse_rocksdb_error)?;
+ let key = String::from_utf8_lossy(&key);
+ if !key.starts_with(path) {
+ break;
+ }
+ res.push(key.to_string());
+ }
+
+ Ok(res)
+ }
+}
+
+fn parse_rocksdb_error(e: rocksdb::Error) -> Error {
+ Error::new(ErrorKind::Unexpected, "got rocksdb error").set_source(e)
+}
diff --git a/core/src/services/rocksdb/mod.rs
b/core/src/services/rocksdb/deleter.rs
similarity index 60%
copy from core/src/services/rocksdb/mod.rs
copy to core/src/services/rocksdb/deleter.rs
index 9a55a345e..a42c62144 100644
--- a/core/src/services/rocksdb/mod.rs
+++ b/core/src/services/rocksdb/deleter.rs
@@ -15,8 +15,28 @@
// specific language governing permissions and limitations
// under the License.
-mod backend;
-pub use backend::RocksdbBuilder as Rocksdb;
+use std::sync::Arc;
-mod config;
-pub use config::RocksdbConfig;
+use super::core::*;
+use crate::raw::oio;
+use crate::raw::*;
+use crate::*;
+
+pub struct RocksdbDeleter {
+ core: Arc<RocksdbCore>,
+ root: String,
+}
+
+impl RocksdbDeleter {
+ pub fn new(core: Arc<RocksdbCore>, root: String) -> Self {
+ Self { core, root }
+ }
+}
+
+impl oio::OneShotDelete for RocksdbDeleter {
+ async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
+ let p = build_abs_path(&self.root, &path);
+ self.core.delete(&p)?;
+ Ok(())
+ }
+}
diff --git a/core/src/services/rocksdb/docs.md
b/core/src/services/rocksdb/docs.md
index 9bf02eabf..5e64ae0bd 100644
--- a/core/src/services/rocksdb/docs.md
+++ b/core/src/services/rocksdb/docs.md
@@ -2,16 +2,15 @@
This service can be used to:
+- [ ] create_dir
- [x] stat
- [x] read
- [x] write
-- [x] create_dir
- [x] delete
-- [x] copy
-- [x] rename
-- [ ] ~~list~~
+- [ ] copy
+- [ ] rename
+- [x] list
- [ ] ~~presign~~
-- [x] blocking
## Note
diff --git a/core/src/services/rocksdb/lister.rs
b/core/src/services/rocksdb/lister.rs
new file mode 100644
index 000000000..e5eb0422b
--- /dev/null
+++ b/core/src/services/rocksdb/lister.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+use std::vec::IntoIter;
+
+use super::core::*;
+use crate::raw::oio;
+use crate::raw::*;
+use crate::*;
+
+pub struct RocksdbLister {
+ root: String,
+ iter: IntoIter<String>,
+}
+
+impl RocksdbLister {
+ pub fn new(core: Arc<RocksdbCore>, root: String, path: String) ->
Result<Self> {
+ let entries = core.list(&path)?;
+
+ Ok(Self {
+ root,
+ iter: entries.into_iter(),
+ })
+ }
+}
+
+impl oio::List for RocksdbLister {
+ async fn next(&mut self) -> Result<Option<oio::Entry>> {
+ if let Some(key) = self.iter.next() {
+ let path = build_rel_path(&self.root, &key);
+
+ // Determine if it's a file or directory based on trailing slash
+ let mode = if key.ends_with('/') {
+ EntryMode::DIR
+ } else {
+ EntryMode::FILE
+ };
+ let entry = oio::Entry::new(&path, Metadata::new(mode));
+ return Ok(Some(entry));
+ }
+
+ Ok(None)
+ }
+}
diff --git a/core/src/services/rocksdb/mod.rs b/core/src/services/rocksdb/mod.rs
index 9a55a345e..48c733f1c 100644
--- a/core/src/services/rocksdb/mod.rs
+++ b/core/src/services/rocksdb/mod.rs
@@ -16,6 +16,11 @@
// under the License.
mod backend;
+mod core;
+mod deleter;
+mod lister;
+mod writer;
+
pub use backend::RocksdbBuilder as Rocksdb;
mod config;
diff --git a/core/src/services/rocksdb/writer.rs
b/core/src/services/rocksdb/writer.rs
new file mode 100644
index 000000000..40c19e9cf
--- /dev/null
+++ b/core/src/services/rocksdb/writer.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use super::core::*;
+use crate::raw::oio;
+use crate::*;
+
+pub struct RocksdbWriter {
+ core: Arc<RocksdbCore>,
+ path: String,
+ buffer: oio::QueueBuf,
+}
+
+impl RocksdbWriter {
+ pub fn new(core: Arc<RocksdbCore>, path: String) -> Self {
+ Self {
+ core,
+ path,
+ buffer: oio::QueueBuf::new(),
+ }
+ }
+}
+
+impl oio::Write for RocksdbWriter {
+ async fn write(&mut self, bs: Buffer) -> Result<()> {
+ self.buffer.push(bs);
+ Ok(())
+ }
+
+ async fn close(&mut self) -> Result<Metadata> {
+ let buf = self.buffer.clone().collect();
+ let length = buf.len() as u64;
+ self.core.set(&self.path, buf)?;
+
+ let meta =
Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
+ Ok(meta)
+ }
+
+ async fn abort(&mut self) -> Result<()> {
+ self.buffer.clear();
+ Ok(())
+ }
+}