This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a5bf25f9 refactor: migrate redb service from adapter::kv to impl 
Access directly (#6733)
2a5bf25f9 is described below

commit 2a5bf25f9057f8e6a5b0a719dd765a85f64a2fd3
Author: Qinxuan Chen <[email protected]>
AuthorDate: Fri Oct 24 01:17:52 2025 +0800

    refactor: migrate redb service from adapter::kv to impl Access directly 
(#6733)
---
 core/src/services/redb/backend.rs              | 213 +++++++++----------------
 core/src/services/redb/config.rs               |   8 +-
 core/src/services/redb/{backend.rs => core.rs} | 149 ++---------------
 core/src/services/redb/{mod.rs => deleter.rs}  |  28 +++-
 core/src/services/redb/docs.md                 |   9 +-
 core/src/services/redb/mod.rs                  |   4 +
 core/src/services/redb/writer.rs               |  59 +++++++
 7 files changed, 190 insertions(+), 280 deletions(-)

diff --git a/core/src/services/redb/backend.rs 
b/core/src/services/redb/backend.rs
index 88a291fb2..87893fe11 100644
--- a/core/src/services/redb/backend.rs
+++ b/core/src/services/redb/backend.rs
@@ -15,17 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::fmt::Debug;
-use std::fmt::Formatter;
 use std::sync::Arc;
 
-use crate::Builder;
-use crate::Error;
-use crate::ErrorKind;
-use crate::Scheme;
-use crate::raw::adapters::kv;
+use super::config::RedbConfig;
+use super::core::*;
+use super::deleter::RedbDeleter;
+use super::writer::RedbWriter;
 use crate::raw::*;
-use crate::services::RedbConfig;
 use crate::*;
 
 /// Redb service support.
@@ -112,159 +108,106 @@ impl Builder for RedbBuilder {
 
         create_table(&db, &table_name)?;
 
-        Ok(RedbBackend::new(Adapter {
+        let root = normalize_root(&self.config.root.unwrap_or_default());
+
+        Ok(RedbBackend::new(RedbCore {
             datadir,
             table: table_name,
             db,
         })
-        .with_root(self.config.root.as_deref().unwrap_or_default()))
+        .with_normalized_root(root))
     }
 }
 
 /// Backend for Redb services.
-pub type RedbBackend = kv::Backend<Adapter>;
-
-#[derive(Clone)]
-pub struct Adapter {
-    datadir: Option<String>,
-    table: String,
-    db: Arc<redb::Database>,
+#[derive(Clone, Debug)]
+pub struct RedbBackend {
+    core: Arc<RedbCore>,
+    root: String,
+    info: Arc<AccessorInfo>,
 }
 
-impl Debug for Adapter {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        let mut ds = f.debug_struct("Adapter");
-        ds.field("path", &self.datadir);
-        ds.finish()
+impl RedbBackend {
+    pub fn new(core: RedbCore) -> Self {
+        let info = AccessorInfo::default();
+        info.set_scheme(Scheme::Redb.into_static());
+        info.set_name(&core.table);
+        info.set_root("/");
+        info.set_native_capability(Capability {
+            read: true,
+            stat: true,
+            write: true,
+            write_can_empty: true,
+            delete: true,
+            shared: false,
+            ..Default::default()
+        });
+
+        Self {
+            core: Arc::new(core),
+            root: "/".to_string(),
+            info: Arc::new(info),
+        }
     }
-}
-
-impl kv::Adapter for Adapter {
-    type Scanner = ();
 
-    fn info(&self) -> kv::Info {
-        kv::Info::new(
-            Scheme::Redb,
-            &self.table,
-            Capability {
-                read: true,
-                write: true,
-                shared: false,
-                ..Default::default()
-            },
-        )
+    fn with_normalized_root(mut self, root: String) -> Self {
+        self.info.set_root(&root);
+        self.root = root;
+        self
     }
+}
 
-    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
-        let read_txn = self.db.begin_read().map_err(parse_transaction_error)?;
-
-        let table_define: redb::TableDefinition<&str, &[u8]> =
-            redb::TableDefinition::new(&self.table);
-
-        let table = read_txn
-            .open_table(table_define)
-            .map_err(parse_table_error)?;
+impl Access for RedbBackend {
+    type Reader = Buffer;
+    type Writer = RedbWriter;
+    type Lister = ();
+    type Deleter = oio::OneShotDeleter<RedbDeleter>;
 
-        let result = match table.get(path) {
-            Ok(Some(v)) => Ok(Some(v.value().to_vec())),
-            Ok(None) => Ok(None),
-            Err(e) => Err(parse_storage_error(e)),
-        }?;
-        Ok(result.map(Buffer::from))
+    fn info(&self) -> Arc<AccessorInfo> {
+        self.info.clone()
     }
 
-    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
-        let write_txn = 
self.db.begin_write().map_err(parse_transaction_error)?;
+    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+        let p = build_abs_path(&self.root, path);
 
-        let table_define: redb::TableDefinition<&str, &[u8]> =
-            redb::TableDefinition::new(&self.table);
-
-        {
-            let mut table = write_txn
-                .open_table(table_define)
-                .map_err(parse_table_error)?;
-
-            table
-                .insert(path, &*value.to_vec())
-                .map_err(parse_storage_error)?;
+        if p == build_abs_path(&self.root, "") {
+            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+        } else {
+            let bs = self.core.get(&p)?;
+            match bs {
+                Some(bs) => Ok(RpStat::new(
+                    
Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
+                )),
+                None => Err(Error::new(ErrorKind::NotFound, "kv not found in 
redb")),
+            }
         }
-
-        write_txn.commit().map_err(parse_commit_error)?;
-        Ok(())
     }
 
-    async fn delete(&self, path: &str) -> Result<()> {
-        let write_txn = 
self.db.begin_write().map_err(parse_transaction_error)?;
-
-        let table_define: redb::TableDefinition<&str, &[u8]> =
-            redb::TableDefinition::new(&self.table);
-
-        {
-            let mut table = write_txn
-                .open_table(table_define)
-                .map_err(parse_table_error)?;
-
-            table.remove(path).map_err(parse_storage_error)?;
-        }
-
-        write_txn.commit().map_err(parse_commit_error)?;
-        Ok(())
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let p = build_abs_path(&self.root, path);
+        let bs = match self.core.get(&p)? {
+            Some(bs) => bs,
+            None => {
+                return Err(Error::new(ErrorKind::NotFound, "kv not found in 
redb"));
+            }
+        };
+        Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
     }
-}
 
-fn parse_transaction_error(e: redb::TransactionError) -> Error {
-    Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
-}
-
-fn parse_table_error(e: redb::TableError) -> Error {
-    match e {
-        redb::TableError::TableDoesNotExist(_) => {
-            Error::new(ErrorKind::NotFound, "error from redb").set_source(e)
-        }
-        _ => Error::new(ErrorKind::Unexpected, "error from 
redb").set_source(e),
+    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        let p = build_abs_path(&self.root, path);
+        Ok((RpWrite::new(), RedbWriter::new(self.core.clone(), p)))
     }
-}
-
-fn parse_storage_error(e: redb::StorageError) -> Error {
-    Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
-}
-
-fn parse_database_error(e: redb::DatabaseError) -> Error {
-    Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
-}
-
-fn parse_commit_error(e: redb::CommitError) -> Error {
-    Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
-}
-
-/// Check if a table exists, otherwise create it.
-fn create_table(db: &redb::Database, table: &str) -> Result<()> {
-    // Only one `WriteTransaction` is permitted at same time,
-    // applying new one will block until it available.
-    //
-    // So we first try checking table existence via `ReadTransaction`.
-    {
-        let read_txn = db.begin_read().map_err(parse_transaction_error)?;
-
-        let table_define: redb::TableDefinition<&str, &[u8]> = 
redb::TableDefinition::new(table);
 
-        match read_txn.open_table(table_define) {
-            Ok(_) => return Ok(()),
-            Err(redb::TableError::TableDoesNotExist(_)) => (),
-            Err(e) => return Err(parse_table_error(e)),
-        }
+    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
+        Ok((
+            RpDelete::default(),
+            oio::OneShotDeleter::new(RedbDeleter::new(self.core.clone(), 
self.root.clone())),
+        ))
     }
 
-    {
-        let write_txn = db.begin_write().map_err(parse_transaction_error)?;
-
-        let table_define: redb::TableDefinition<&str, &[u8]> = 
redb::TableDefinition::new(table);
-
-        write_txn
-            .open_table(table_define)
-            .map_err(parse_table_error)?;
-        write_txn.commit().map_err(parse_commit_error)?;
+    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, 
Self::Lister)> {
+        let _ = build_abs_path(&self.root, path);
+        Ok((RpList::default(), ()))
     }
-
-    Ok(())
 }
diff --git a/core/src/services/redb/config.rs b/core/src/services/redb/config.rs
index 167441d1c..45a81a2cf 100644
--- a/core/src/services/redb/config.rs
+++ b/core/src/services/redb/config.rs
@@ -17,10 +17,11 @@
 
 use std::fmt::Debug;
 
-use super::backend::RedbBuilder;
 use serde::Deserialize;
 use serde::Serialize;
 
+use super::backend::RedbBuilder;
+
 /// Config for redb service support.
 #[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
 #[serde(default)]
@@ -28,14 +29,15 @@ use serde::Serialize;
 pub struct RedbConfig {
     /// path to the redb data directory.
     pub datadir: Option<String>,
-    /// The root for redb.
-    pub root: Option<String>,
     /// The table name for redb.
     pub table: Option<String>,
+    /// The root for redb.
+    pub root: Option<String>,
 }
 
 impl crate::Configurator for RedbConfig {
     type Builder = RedbBuilder;
+
     fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> {
         let mut map = uri.options().clone();
 
diff --git a/core/src/services/redb/backend.rs b/core/src/services/redb/core.rs
similarity index 52%
copy from core/src/services/redb/backend.rs
copy to core/src/services/redb/core.rs
index 88a291fb2..05b3c7764 100644
--- a/core/src/services/redb/backend.rs
+++ b/core/src/services/redb/core.rs
@@ -19,143 +19,26 @@ use std::fmt::Debug;
 use std::fmt::Formatter;
 use std::sync::Arc;
 
-use crate::Builder;
-use crate::Error;
-use crate::ErrorKind;
-use crate::Scheme;
-use crate::raw::adapters::kv;
-use crate::raw::*;
-use crate::services::RedbConfig;
 use crate::*;
 
-/// Redb service support.
-#[doc = include_str!("docs.md")]
-#[derive(Default, Debug)]
-pub struct RedbBuilder {
-    pub(super) config: RedbConfig,
-
-    pub(super) database: Option<Arc<redb::Database>>,
-}
-
-impl RedbBuilder {
-    /// Set the database for Redb.
-    ///
-    /// This method should be called when you want to
-    /// use multiple tables of one database because
-    /// Redb doesn't allow opening a database that have been opened.
-    ///
-    /// <div class="warning">
-    ///
-    /// `datadir` and `database` should not be set simultaneously.
-    /// If both are set, `database` will take precedence.
-    ///
-    /// </div>
-    pub fn database(mut self, db: Arc<redb::Database>) -> Self {
-        self.database = Some(db);
-        self
-    }
-
-    /// Set the path to the redb data directory. Will create if not exists.
-    ///
-    ///
-    /// <div class="warning">
-    ///
-    /// Opening redb database via `datadir` takes away the ability to access 
multiple redb tables.
-    /// If you need to access multiple redb tables, the correct solution is to
-    /// create an `Arc<redb::database>` beforehand and then share it via 
[`database`]
-    /// with multiple builders where every builder will open one redb table.
-    ///
-    /// </div>
-    ///
-    /// [`database`]: RedbBuilder::database
-    pub fn datadir(mut self, path: &str) -> Self {
-        self.config.datadir = Some(path.into());
-        self
-    }
-
-    /// Set the table name for Redb. Will create if not exists.
-    pub fn table(mut self, table: &str) -> Self {
-        self.config.table = Some(table.into());
-        self
-    }
-
-    /// Set the root for Redb.
-    pub fn root(mut self, path: &str) -> Self {
-        self.config.root = Some(path.into());
-        self
-    }
-}
-
-impl Builder for RedbBuilder {
-    type Config = RedbConfig;
-
-    fn build(self) -> Result<impl Access> {
-        let table_name = self.config.table.ok_or_else(|| {
-            Error::new(ErrorKind::ConfigInvalid, "table is required but not 
set")
-                .with_context("service", Scheme::Redb)
-        })?;
-
-        let (datadir, db) = if let Some(db) = self.database {
-            (None, db)
-        } else {
-            let datadir = self.config.datadir.ok_or_else(|| {
-                Error::new(ErrorKind::ConfigInvalid, "datadir is required but 
not set")
-                    .with_context("service", Scheme::Redb)
-            })?;
-
-            let db = redb::Database::create(&datadir)
-                .map_err(parse_database_error)?
-                .into();
-
-            (Some(datadir), db)
-        };
-
-        create_table(&db, &table_name)?;
-
-        Ok(RedbBackend::new(Adapter {
-            datadir,
-            table: table_name,
-            db,
-        })
-        .with_root(self.config.root.as_deref().unwrap_or_default()))
-    }
-}
-
-/// Backend for Redb services.
-pub type RedbBackend = kv::Backend<Adapter>;
-
 #[derive(Clone)]
-pub struct Adapter {
-    datadir: Option<String>,
-    table: String,
-    db: Arc<redb::Database>,
+pub struct RedbCore {
+    pub db: Arc<redb::Database>,
+    pub datadir: Option<String>,
+    pub table: String,
 }
 
-impl Debug for Adapter {
+impl Debug for RedbCore {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        let mut ds = f.debug_struct("Adapter");
-        ds.field("path", &self.datadir);
-        ds.finish()
+        let mut ds = f.debug_struct("RedbCore");
+        ds.field("path", &self.datadir)
+            .field("table", &self.table)
+            .finish()
     }
 }
 
-impl kv::Adapter for Adapter {
-    type Scanner = ();
-
-    fn info(&self) -> kv::Info {
-        kv::Info::new(
-            Scheme::Redb,
-            &self.table,
-            Capability {
-                read: true,
-                write: true,
-                shared: false,
-                ..Default::default()
-            },
-        )
-    }
-
-    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
+impl RedbCore {
+    pub fn get(&self, path: &str) -> Result<Option<Buffer>> {
         let read_txn = self.db.begin_read().map_err(parse_transaction_error)?;
 
         let table_define: redb::TableDefinition<&str, &[u8]> =
@@ -173,7 +56,7 @@ impl kv::Adapter for Adapter {
         Ok(result.map(Buffer::from))
     }
 
-    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
+    pub fn set(&self, path: &str, value: Buffer) -> Result<()> {
         let write_txn = 
self.db.begin_write().map_err(parse_transaction_error)?;
 
         let table_define: redb::TableDefinition<&str, &[u8]> =
@@ -193,7 +76,7 @@ impl kv::Adapter for Adapter {
         Ok(())
     }
 
-    async fn delete(&self, path: &str) -> Result<()> {
+    pub fn delete(&self, path: &str) -> Result<()> {
         let write_txn = 
self.db.begin_write().map_err(parse_transaction_error)?;
 
         let table_define: redb::TableDefinition<&str, &[u8]> =
@@ -229,16 +112,16 @@ fn parse_storage_error(e: redb::StorageError) -> Error {
     Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
 }
 
-fn parse_database_error(e: redb::DatabaseError) -> Error {
+fn parse_commit_error(e: redb::CommitError) -> Error {
     Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
 }
 
-fn parse_commit_error(e: redb::CommitError) -> Error {
+pub fn parse_database_error(e: redb::DatabaseError) -> Error {
     Error::new(ErrorKind::Unexpected, "error from redb").set_source(e)
 }
 
 /// Check if a table exists, otherwise create it.
-fn create_table(db: &redb::Database, table: &str) -> Result<()> {
+pub fn create_table(db: &redb::Database, table: &str) -> Result<()> {
     // Only one `WriteTransaction` is permitted at same time,
     // applying new one will block until it available.
     //
diff --git a/core/src/services/redb/mod.rs b/core/src/services/redb/deleter.rs
similarity index 61%
copy from core/src/services/redb/mod.rs
copy to core/src/services/redb/deleter.rs
index e13f5e6b2..9ab1ef1f1 100644
--- a/core/src/services/redb/mod.rs
+++ b/core/src/services/redb/deleter.rs
@@ -15,8 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod backend;
-pub use backend::RedbBuilder as Redb;
+use std::sync::Arc;
 
-mod config;
-pub use config::RedbConfig;
+use super::core::*;
+use crate::raw::oio;
+use crate::raw::*;
+use crate::*;
+
+pub struct RedbDeleter {
+    core: Arc<RedbCore>,
+    root: String,
+}
+
+impl RedbDeleter {
+    pub fn new(core: Arc<RedbCore>, root: String) -> Self {
+        Self { core, root }
+    }
+}
+
+impl oio::OneShotDelete for RedbDeleter {
+    async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
+        let p = build_abs_path(&self.root, &path);
+        self.core.delete(&p)?;
+        Ok(())
+    }
+}
diff --git a/core/src/services/redb/docs.md b/core/src/services/redb/docs.md
index a964edb28..106b54317 100644
--- a/core/src/services/redb/docs.md
+++ b/core/src/services/redb/docs.md
@@ -2,16 +2,15 @@
 
 This service can be used to:
 
+- [ ] create_dir
 - [x] stat
 - [x] read
 - [x] write
-- [x] create_dir
 - [x] delete
-- [x] copy
-- [x] rename
-- [ ] ~~list~~
+- [ ] copy
+- [ ] rename
+- [ ] list
 - [ ] ~~presign~~
-- [x] blocking
 
 ## Configuration
 
diff --git a/core/src/services/redb/mod.rs b/core/src/services/redb/mod.rs
index e13f5e6b2..43a87539d 100644
--- a/core/src/services/redb/mod.rs
+++ b/core/src/services/redb/mod.rs
@@ -16,6 +16,10 @@
 // under the License.
 
 mod backend;
+mod core;
+mod deleter;
+mod writer;
+
 pub use backend::RedbBuilder as Redb;
 
 mod config;
diff --git a/core/src/services/redb/writer.rs b/core/src/services/redb/writer.rs
new file mode 100644
index 000000000..8e27a9f8d
--- /dev/null
+++ b/core/src/services/redb/writer.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use super::core::*;
+use crate::raw::oio;
+use crate::*;
+
+pub struct RedbWriter {
+    core: Arc<RedbCore>,
+    path: String,
+    buffer: oio::QueueBuf,
+}
+
+impl RedbWriter {
+    pub fn new(core: Arc<RedbCore>, path: String) -> Self {
+        Self {
+            core,
+            path,
+            buffer: oio::QueueBuf::new(),
+        }
+    }
+}
+
+impl oio::Write for RedbWriter {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
+        self.buffer.push(bs);
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<Metadata> {
+        let buf = self.buffer.clone().collect();
+        let length = buf.len() as u64;
+        self.core.set(&self.path, buf)?;
+
+        let meta = 
Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
+        Ok(meta)
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        self.buffer.clear();
+        Ok(())
+    }
+}

Reply via email to