This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 71e8a3274 refactor: migrate rocksdb service from adapter::kv to impl 
Access directly (#6732)
71e8a3274 is described below

commit 71e8a3274c2b8325f66fe200b8ffa748523d5902
Author: Qinxuan Chen <[email protected]>
AuthorDate: Fri Oct 24 01:17:31 2025 +0800

    refactor: migrate rocksdb service from adapter::kv to impl Access directly 
(#6732)
    
    * refactor: migrate rocksdb service from adapter::kv to impl Access directly
    
    * revert rocksdb: 0.24.0 => 0.21.0
    
    * fix tests
---
 core/src/services/rocksdb/backend.rs             | 141 +++++++++++++----------
 core/src/services/rocksdb/config.rs              |   4 +-
 core/src/services/rocksdb/core.rs                |  74 ++++++++++++
 core/src/services/rocksdb/{mod.rs => deleter.rs} |  28 ++++-
 core/src/services/rocksdb/docs.md                |   9 +-
 core/src/services/rocksdb/lister.rs              |  59 ++++++++++
 core/src/services/rocksdb/mod.rs                 |   5 +
 core/src/services/rocksdb/writer.rs              |  59 ++++++++++
 8 files changed, 310 insertions(+), 69 deletions(-)

diff --git a/core/src/services/rocksdb/backend.rs 
b/core/src/services/rocksdb/backend.rs
index 624d2abe6..454bc1ae0 100644
--- a/core/src/services/rocksdb/backend.rs
+++ b/core/src/services/rocksdb/backend.rs
@@ -15,16 +15,16 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::fmt::Debug;
-use std::fmt::Formatter;
 use std::sync::Arc;
 
 use rocksdb::DB;
 
-use crate::Result;
-use crate::raw::adapters::kv;
+use super::config::RocksdbConfig;
+use super::core::*;
+use super::deleter::RocksdbDeleter;
+use super::lister::RocksdbLister;
+use super::writer::RocksdbWriter;
 use crate::raw::*;
-use crate::services::RocksdbConfig;
 use crate::*;
 
 /// RocksDB service support.
@@ -70,83 +70,106 @@ impl Builder for RocksdbBuilder {
                 .set_source(e)
         })?;
 
-        let root = normalize_root(
-            self.config
-                .root
-                .clone()
-                .unwrap_or_else(|| "/".to_string())
-                .as_str(),
-        );
+        let root = normalize_root(&self.config.root.unwrap_or_default());
 
-        Ok(RocksdbBackend::new(Adapter { db: Arc::new(db) 
}).with_normalized_root(root))
+        Ok(RocksdbBackend::new(RocksdbCore { db: Arc::new(db) 
}).with_normalized_root(root))
     }
 }
 
 /// Backend for rocksdb services.
-pub type RocksdbBackend = kv::Backend<Adapter>;
-
-#[derive(Clone)]
-pub struct Adapter {
-    db: Arc<DB>,
+#[derive(Clone, Debug)]
+pub struct RocksdbBackend {
+    core: Arc<RocksdbCore>,
+    root: String,
+    info: Arc<AccessorInfo>,
 }
 
-impl Debug for Adapter {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        let mut ds = f.debug_struct("Adapter");
-        ds.field("path", &self.db.path());
-        ds.finish()
-    }
-}
-
-impl kv::Adapter for Adapter {
-    type Scanner = kv::Scanner;
-
-    fn info(&self) -> kv::Info {
-        kv::Info::new(
-            Scheme::Rocksdb,
-            &self.db.path().to_string_lossy(),
-            Capability {
+impl RocksdbBackend {
+    pub fn new(core: RocksdbCore) -> Self {
+        let info = AccessorInfo::default();
+        info.set_scheme(Scheme::Rocksdb.into_static())
+            .set_name(&core.db.path().to_string_lossy())
+            .set_root("/")
+            .set_native_capability(Capability {
                 read: true,
+                stat: true,
                 write: true,
+                write_can_empty: true,
+                delete: true,
                 list: true,
+                list_with_recursive: true,
                 shared: false,
                 ..Default::default()
-            },
-        )
-    }
+            });
 
-    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
-        let result = self.db.get(path).map_err(parse_rocksdb_error)?;
-        Ok(result.map(Buffer::from))
+        Self {
+            core: Arc::new(core),
+            root: "/".to_string(),
+            info: Arc::new(info),
+        }
     }
 
-    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
-        self.db
-            .put(path, value.to_vec())
-            .map_err(parse_rocksdb_error)
+    fn with_normalized_root(mut self, root: String) -> Self {
+        self.info.set_root(&root);
+        self.root = root;
+        self
     }
+}
+
+impl Access for RocksdbBackend {
+    type Reader = Buffer;
+    type Writer = RocksdbWriter;
+    type Lister = oio::HierarchyLister<RocksdbLister>;
+    type Deleter = oio::OneShotDeleter<RocksdbDeleter>;
 
-    async fn delete(&self, path: &str) -> Result<()> {
-        self.db.delete(path).map_err(parse_rocksdb_error)
+    fn info(&self) -> Arc<AccessorInfo> {
+        self.info.clone()
     }
 
-    async fn scan(&self, path: &str) -> Result<Self::Scanner> {
-        let it = self.db.prefix_iterator(path).map(|r| r.map(|(k, _)| k));
-        let mut res = Vec::default();
+    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+        let p = build_abs_path(&self.root, path);
 
-        for key in it {
-            let key = key.map_err(parse_rocksdb_error)?;
-            let key = String::from_utf8_lossy(&key);
-            if !key.starts_with(path) {
-                break;
+        if p == build_abs_path(&self.root, "") {
+            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+        } else {
+            let bs = self.core.get(&p)?;
+            match bs {
+                Some(bs) => Ok(RpStat::new(
+                    
Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
+                )),
+                None => Err(Error::new(ErrorKind::NotFound, "kv not found in 
rocksdb")),
             }
-            res.push(key.to_string());
         }
+    }
 
-        Ok(Box::new(kv::ScanStdIter::new(res.into_iter().map(Ok))))
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let p = build_abs_path(&self.root, path);
+        let bs = match self.core.get(&p)? {
+            Some(bs) => bs,
+            None => {
+                return Err(Error::new(ErrorKind::NotFound, "kv not found in 
rocksdb"));
+            }
+        };
+        Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
     }
-}
 
-fn parse_rocksdb_error(e: rocksdb::Error) -> Error {
-    Error::new(ErrorKind::Unexpected, "got rocksdb error").set_source(e)
+    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        let p = build_abs_path(&self.root, path);
+        let writer = RocksdbWriter::new(self.core.clone(), p);
+        Ok((RpWrite::new(), writer))
+    }
+
+    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
+        let deleter = RocksdbDeleter::new(self.core.clone(), 
self.root.clone());
+        Ok((RpDelete::default(), oio::OneShotDeleter::new(deleter)))
+    }
+
+    async fn list(&self, path: &str, args: OpList) -> Result<(RpList, 
Self::Lister)> {
+        let p = build_abs_path(&self.root, path);
+        let lister = RocksdbLister::new(self.core.clone(), self.root.clone(), 
p)?;
+        Ok((
+            RpList::default(),
+            oio::HierarchyLister::new(lister, path, args.recursive()),
+        ))
+    }
 }
diff --git a/core/src/services/rocksdb/config.rs 
b/core/src/services/rocksdb/config.rs
index e1fc5d8ec..0f1b05ffe 100644
--- a/core/src/services/rocksdb/config.rs
+++ b/core/src/services/rocksdb/config.rs
@@ -17,10 +17,11 @@
 
 use std::fmt::Debug;
 
-use super::backend::RocksdbBuilder;
 use serde::Deserialize;
 use serde::Serialize;
 
+use super::backend::RocksdbBuilder;
+
 /// Config for Rocksdb Service.
 #[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
 #[serde(default)]
@@ -36,6 +37,7 @@ pub struct RocksdbConfig {
 
 impl crate::Configurator for RocksdbConfig {
     type Builder = RocksdbBuilder;
+
     fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> {
         let mut map = uri.options().clone();
 
diff --git a/core/src/services/rocksdb/core.rs 
b/core/src/services/rocksdb/core.rs
new file mode 100644
index 000000000..33e4cf875
--- /dev/null
+++ b/core/src/services/rocksdb/core.rs
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::fmt::Debug;
+use std::fmt::Formatter;
+use std::sync::Arc;
+
+use rocksdb::DB;
+
+use crate::*;
+
+#[derive(Clone)]
+pub struct RocksdbCore {
+    pub db: Arc<DB>,
+}
+
+impl Debug for RocksdbCore {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        let mut ds = f.debug_struct("RocksdbCore");
+        ds.field("path", &self.db.path());
+        ds.finish()
+    }
+}
+
+impl RocksdbCore {
+    pub fn get(&self, path: &str) -> Result<Option<Buffer>> {
+        let result = self.db.get(path).map_err(parse_rocksdb_error)?;
+        Ok(result.map(Buffer::from))
+    }
+
+    pub fn set(&self, path: &str, value: Buffer) -> Result<()> {
+        self.db
+            .put(path, value.to_vec())
+            .map_err(parse_rocksdb_error)
+    }
+
+    pub fn delete(&self, path: &str) -> Result<()> {
+        self.db.delete(path).map_err(parse_rocksdb_error)
+    }
+
+    pub fn list(&self, path: &str) -> Result<Vec<String>> {
+        let it = self.db.prefix_iterator(path).map(|r| r.map(|(k, _)| k));
+        let mut res = Vec::default();
+
+        for key in it {
+            let key = key.map_err(parse_rocksdb_error)?;
+            let key = String::from_utf8_lossy(&key);
+            if !key.starts_with(path) {
+                break;
+            }
+            res.push(key.to_string());
+        }
+
+        Ok(res)
+    }
+}
+
+fn parse_rocksdb_error(e: rocksdb::Error) -> Error {
+    Error::new(ErrorKind::Unexpected, "got rocksdb error").set_source(e)
+}
diff --git a/core/src/services/rocksdb/mod.rs 
b/core/src/services/rocksdb/deleter.rs
similarity index 60%
copy from core/src/services/rocksdb/mod.rs
copy to core/src/services/rocksdb/deleter.rs
index 9a55a345e..a42c62144 100644
--- a/core/src/services/rocksdb/mod.rs
+++ b/core/src/services/rocksdb/deleter.rs
@@ -15,8 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod backend;
-pub use backend::RocksdbBuilder as Rocksdb;
+use std::sync::Arc;
 
-mod config;
-pub use config::RocksdbConfig;
+use super::core::*;
+use crate::raw::oio;
+use crate::raw::*;
+use crate::*;
+
+pub struct RocksdbDeleter {
+    core: Arc<RocksdbCore>,
+    root: String,
+}
+
+impl RocksdbDeleter {
+    pub fn new(core: Arc<RocksdbCore>, root: String) -> Self {
+        Self { core, root }
+    }
+}
+
+impl oio::OneShotDelete for RocksdbDeleter {
+    async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
+        let p = build_abs_path(&self.root, &path);
+        self.core.delete(&p)?;
+        Ok(())
+    }
+}
diff --git a/core/src/services/rocksdb/docs.md 
b/core/src/services/rocksdb/docs.md
index 9bf02eabf..5e64ae0bd 100644
--- a/core/src/services/rocksdb/docs.md
+++ b/core/src/services/rocksdb/docs.md
@@ -2,16 +2,15 @@
 
 This service can be used to:
 
+- [ ] create_dir
 - [x] stat
 - [x] read
 - [x] write
-- [x] create_dir
 - [x] delete
-- [x] copy
-- [x] rename
-- [ ] ~~list~~
+- [ ] copy
+- [ ] rename
+- [x] list
 - [ ] ~~presign~~
-- [x] blocking
 
 ## Note
 
diff --git a/core/src/services/rocksdb/lister.rs 
b/core/src/services/rocksdb/lister.rs
new file mode 100644
index 000000000..e5eb0422b
--- /dev/null
+++ b/core/src/services/rocksdb/lister.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+use std::vec::IntoIter;
+
+use super::core::*;
+use crate::raw::oio;
+use crate::raw::*;
+use crate::*;
+
+pub struct RocksdbLister {
+    root: String,
+    iter: IntoIter<String>,
+}
+
+impl RocksdbLister {
+    pub fn new(core: Arc<RocksdbCore>, root: String, path: String) -> 
Result<Self> {
+        let entries = core.list(&path)?;
+
+        Ok(Self {
+            root,
+            iter: entries.into_iter(),
+        })
+    }
+}
+
+impl oio::List for RocksdbLister {
+    async fn next(&mut self) -> Result<Option<oio::Entry>> {
+        if let Some(key) = self.iter.next() {
+            let path = build_rel_path(&self.root, &key);
+
+            // Determine if it's a file or directory based on trailing slash
+            let mode = if key.ends_with('/') {
+                EntryMode::DIR
+            } else {
+                EntryMode::FILE
+            };
+            let entry = oio::Entry::new(&path, Metadata::new(mode));
+            return Ok(Some(entry));
+        }
+
+        Ok(None)
+    }
+}
diff --git a/core/src/services/rocksdb/mod.rs b/core/src/services/rocksdb/mod.rs
index 9a55a345e..48c733f1c 100644
--- a/core/src/services/rocksdb/mod.rs
+++ b/core/src/services/rocksdb/mod.rs
@@ -16,6 +16,11 @@
 // under the License.
 
 mod backend;
+mod core;
+mod deleter;
+mod lister;
+mod writer;
+
 pub use backend::RocksdbBuilder as Rocksdb;
 
 mod config;
diff --git a/core/src/services/rocksdb/writer.rs 
b/core/src/services/rocksdb/writer.rs
new file mode 100644
index 000000000..40c19e9cf
--- /dev/null
+++ b/core/src/services/rocksdb/writer.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use super::core::*;
+use crate::raw::oio;
+use crate::*;
+
+pub struct RocksdbWriter {
+    core: Arc<RocksdbCore>,
+    path: String,
+    buffer: oio::QueueBuf,
+}
+
+impl RocksdbWriter {
+    pub fn new(core: Arc<RocksdbCore>, path: String) -> Self {
+        Self {
+            core,
+            path,
+            buffer: oio::QueueBuf::new(),
+        }
+    }
+}
+
+impl oio::Write for RocksdbWriter {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
+        self.buffer.push(bs);
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<Metadata> {
+        let buf = self.buffer.clone().collect();
+        let length = buf.len() as u64;
+        self.core.set(&self.path, buf)?;
+
+        let meta = 
Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
+        Ok(meta)
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        self.buffer.clear();
+        Ok(())
+    }
+}

Reply via email to