This is an automated email from the ASF dual-hosted git repository. koushiro pushed a commit to branch refactor-redb in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 2c7c5702d68a0d3c2c3d71d623d28a5c5b97f0f5 Author: koushiro <[email protected]> AuthorDate: Thu Oct 23 16:05:10 2025 +0800 refactor: migrate redb service from adapter::kv to impl Access directly --- core/src/services/redb/backend.rs | 213 +++++++++---------------- core/src/services/redb/config.rs | 8 +- core/src/services/redb/{backend.rs => core.rs} | 149 ++--------------- core/src/services/redb/{mod.rs => deleter.rs} | 28 +++- core/src/services/redb/docs.md | 9 +- core/src/services/redb/mod.rs | 4 + core/src/services/redb/writer.rs | 59 +++++++ 7 files changed, 190 insertions(+), 280 deletions(-) diff --git a/core/src/services/redb/backend.rs b/core/src/services/redb/backend.rs index 88a291fb2..87893fe11 100644 --- a/core/src/services/redb/backend.rs +++ b/core/src/services/redb/backend.rs @@ -15,17 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::Debug; -use std::fmt::Formatter; use std::sync::Arc; -use crate::Builder; -use crate::Error; -use crate::ErrorKind; -use crate::Scheme; -use crate::raw::adapters::kv; +use super::config::RedbConfig; +use super::core::*; +use super::deleter::RedbDeleter; +use super::writer::RedbWriter; use crate::raw::*; -use crate::services::RedbConfig; use crate::*; /// Redb service support. @@ -112,159 +108,106 @@ impl Builder for RedbBuilder { create_table(&db, &table_name)?; - Ok(RedbBackend::new(Adapter { + let root = normalize_root(&self.config.root.unwrap_or_default()); + + Ok(RedbBackend::new(RedbCore { datadir, table: table_name, db, }) - .with_root(self.config.root.as_deref().unwrap_or_default())) + .with_normalized_root(root)) } } /// Backend for Redb services. -pub type RedbBackend = kv::Backend<Adapter>; - -#[derive(Clone)] -pub struct Adapter { - datadir: Option<String>, - table: String, - db: Arc<redb::Database>, +#[derive(Clone, Debug)] +pub struct RedbBackend { + core: Arc<RedbCore>, + root: String, + info: Arc<AccessorInfo>, } -impl Debug for Adapter { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let mut ds = f.debug_struct("Adapter"); - ds.field("path", &self.datadir); - ds.finish() +impl RedbBackend { + pub fn new(core: RedbCore) -> Self { + let info = AccessorInfo::default(); + info.set_scheme(Scheme::Redb.into_static()); + info.set_name(&core.table); + info.set_root("/"); + info.set_native_capability(Capability { + read: true, + stat: true, + write: true, + write_can_empty: true, + delete: true, + shared: false, + ..Default::default() + }); + + Self { + core: Arc::new(core), + root: "/".to_string(), + info: Arc::new(info), + } } -} - -impl kv::Adapter for Adapter { - type Scanner = (); - fn info(&self) -> kv::Info { - kv::Info::new( - Scheme::Redb, - &self.table, - Capability { - read: true, - write: true, - shared: false, - ..Default::default() - }, - ) + fn with_normalized_root(mut self, root: String) -> Self { + self.info.set_root(&root); + self.root = root; + self } +} - async fn get(&self, path: &str) -> Result<Option<Buffer>> { - let read_txn = self.db.begin_read().map_err(parse_transaction_error)?; - - let table_define: redb::TableDefinition<&str, &[u8]> = - redb::TableDefinition::new(&self.table); - - let table = read_txn - .open_table(table_define) - .map_err(parse_table_error)?; +impl Access for RedbBackend { + type Reader = Buffer; + type Writer = RedbWriter; + type Lister = (); + type Deleter = oio::OneShotDeleter<RedbDeleter>; - let result = match table.get(path) { - Ok(Some(v)) => Ok(Some(v.value().to_vec())), - Ok(None) => Ok(None), - Err(e) => Err(parse_storage_error(e)), - }?; - Ok(result.map(Buffer::from)) + fn info(&self) -> Arc<AccessorInfo> { + self.info.clone() } - async fn set(&self, path: &str, value: Buffer) -> Result<()> { - let write_txn = self.db.begin_write().map_err(parse_transaction_error)?; + async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> { + let p = build_abs_path(&self.root, path); - let table_define: redb::TableDefinition<&str, &[u8]> = - redb::TableDefinition::new(&self.table); - - { - let mut table = write_txn - .open_table(table_define) - .map_err(parse_table_error)?; - - table - .insert(path, &*value.to_vec()) - .map_err(parse_storage_error)?; + if p == build_abs_path(&self.root, "") { + Ok(RpStat::new(Metadata::new(EntryMode::DIR))) + } else { + let bs = self.core.get(&p)?; + match bs { + Some(bs) => Ok(RpStat::new( + Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64), + )), + None => Err(Error::new(ErrorKind::NotFound, "kv not found in redb")), + } } - - write_txn.commit().map_err(parse_commit_error)?; - Ok(()) } - async fn delete(&self, path: &str) -> Result<()> { - let write_txn = self.db.begin_write().map_err(parse_transaction_error)?; - - let table_define: redb::TableDefinition<&str, &[u8]> = - redb::TableDefinition::new(&self.table); - - { - let mut table = write_txn - .open_table(table_define) - .map_err(parse_table_error)?; - - table.remove(path).map_err(parse_storage_error)?; - } - - write_txn.commit().map_err(parse_commit_error)?; - Ok(()) + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let p = build_abs_path(&self.root, path); + let bs = match self.core.get(&p)? { + Some(bs) => bs, + None => { + return Err(Error::new(ErrorKind::NotFound, "kv not found in redb")); + } + }; + Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize()))) } -} -fn parse_transaction_error(e: redb::TransactionError) -> Error { - Error::new(ErrorKind::Unexpected, "error from redb").set_source(e) -} - -fn parse_table_error(e: redb::TableError) -> Error { - match e { - redb::TableError::TableDoesNotExist(_) => { - Error::new(ErrorKind::NotFound, "error from redb").set_source(e) - } - _ => Error::new(ErrorKind::Unexpected, "error from redb").set_source(e), + async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let p = build_abs_path(&self.root, path); + Ok((RpWrite::new(), RedbWriter::new(self.core.clone(), p))) } -} - -fn parse_storage_error(e: redb::StorageError) -> Error { - Error::new(ErrorKind::Unexpected, "error from redb").set_source(e) -} - -fn parse_database_error(e: redb::DatabaseError) -> Error { - Error::new(ErrorKind::Unexpected, "error from redb").set_source(e) -} - -fn parse_commit_error(e: redb::CommitError) -> Error { - Error::new(ErrorKind::Unexpected, "error from redb").set_source(e) -} - -/// Check if a table exists, otherwise create it. -fn create_table(db: &redb::Database, table: &str) -> Result<()> { - // Only one `WriteTransaction` is permitted at same time, - // applying new one will block until it available. - // - // So we first try checking table existence via `ReadTransaction`. - { - let read_txn = db.begin_read().map_err(parse_transaction_error)?; - - let table_define: redb::TableDefinition<&str, &[u8]> = redb::TableDefinition::new(table); - match read_txn.open_table(table_define) { - Ok(_) => return Ok(()), - Err(redb::TableError::TableDoesNotExist(_)) => (), - Err(e) => return Err(parse_table_error(e)), - } + async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { + Ok(( + RpDelete::default(), + oio::OneShotDeleter::new(RedbDeleter::new(self.core.clone(), self.root.clone())), + )) } - { - let write_txn = db.begin_write().map_err(parse_transaction_error)?; - - let table_define: redb::TableDefinition<&str, &[u8]> = redb::TableDefinition::new(table); - - write_txn - .open_table(table_define) - .map_err(parse_table_error)?; - write_txn.commit().map_err(parse_commit_error)?; + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> { + let _ = build_abs_path(&self.root, path); + Ok((RpList::default(), ())) } - - Ok(()) } diff --git a/core/src/services/redb/config.rs b/core/src/services/redb/config.rs index 167441d1c..45a81a2cf 100644 --- a/core/src/services/redb/config.rs +++ b/core/src/services/redb/config.rs @@ -17,10 +17,11 @@ use std::fmt::Debug; -use super::backend::RedbBuilder; use serde::Deserialize; use serde::Serialize; +use super::backend::RedbBuilder; + /// Config for redb service support. #[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(default)] @@ -28,14 +29,15 @@ use serde::Serialize; pub struct RedbConfig { /// path to the redb data directory. pub datadir: Option<String>, - /// The root for redb. - pub root: Option<String>, /// The table name for redb. pub table: Option<String>, + /// The root for redb. + pub root: Option<String>, } impl crate::Configurator for RedbConfig { type Builder = RedbBuilder; + fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> { let mut map = uri.options().clone(); diff --git a/core/src/services/redb/backend.rs b/core/src/services/redb/core.rs similarity index 52% copy from core/src/services/redb/backend.rs copy to core/src/services/redb/core.rs index 88a291fb2..05b3c7764 100644 --- a/core/src/services/redb/backend.rs +++ b/core/src/services/redb/core.rs @@ -19,143 +19,26 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; -use crate::Builder; -use crate::Error; -use crate::ErrorKind; -use crate::Scheme; -use crate::raw::adapters::kv; -use crate::raw::*; -use crate::services::RedbConfig; use crate::*; -/// Redb service support. -#[doc = include_str!("docs.md")] -#[derive(Default, Debug)] -pub struct RedbBuilder { - pub(super) config: RedbConfig, - - pub(super) database: Option<Arc<redb::Database>>, -} - -impl RedbBuilder { - /// Set the database for Redb. - /// - /// This method should be called when you want to - /// use multiple tables of one database because - /// Redb doesn't allow opening a database that have been opened. - /// - /// <div class="warning"> - /// - /// `datadir` and `database` should not be set simultaneously. - /// If both are set, `database` will take precedence. - /// - /// </div> - pub fn database(mut self, db: Arc<redb::Database>) -> Self { - self.database = Some(db); - self - } - - /// Set the path to the redb data directory. Will create if not exists. - /// - /// - /// <div class="warning"> - /// - /// Opening redb database via `datadir` takes away the ability to access multiple redb tables. - /// If you need to access multiple redb tables, the correct solution is to - /// create an `Arc<redb::database>` beforehand and then share it via [`database`] - /// with multiple builders where every builder will open one redb table. - /// - /// </div> - /// - /// [`database`]: RedbBuilder::database - pub fn datadir(mut self, path: &str) -> Self { - self.config.datadir = Some(path.into()); - self - } - - /// Set the table name for Redb. Will create if not exists. - pub fn table(mut self, table: &str) -> Self { - self.config.table = Some(table.into()); - self - } - - /// Set the root for Redb. - pub fn root(mut self, path: &str) -> Self { - self.config.root = Some(path.into()); - self - } -} - -impl Builder for RedbBuilder { - type Config = RedbConfig; - - fn build(self) -> Result<impl Access> { - let table_name = self.config.table.ok_or_else(|| { - Error::new(ErrorKind::ConfigInvalid, "table is required but not set") - .with_context("service", Scheme::Redb) - })?; - - let (datadir, db) = if let Some(db) = self.database { - (None, db) - } else { - let datadir = self.config.datadir.ok_or_else(|| { - Error::new(ErrorKind::ConfigInvalid, "datadir is required but not set") - .with_context("service", Scheme::Redb) - })?; - - let db = redb::Database::create(&datadir) - .map_err(parse_database_error)? - .into(); - - (Some(datadir), db) - }; - - create_table(&db, &table_name)?; - - Ok(RedbBackend::new(Adapter { - datadir, - table: table_name, - db, - }) - .with_root(self.config.root.as_deref().unwrap_or_default())) - } -} - -/// Backend for Redb services. -pub type RedbBackend = kv::Backend<Adapter>; - #[derive(Clone)] -pub struct Adapter { - datadir: Option<String>, - table: String, - db: Arc<redb::Database>, +pub struct RedbCore { + pub db: Arc<redb::Database>, + pub datadir: Option<String>, + pub table: String, } -impl Debug for Adapter { +impl Debug for RedbCore { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let mut ds = f.debug_struct("Adapter"); - ds.field("path", &self.datadir); - ds.finish() + let mut ds = f.debug_struct("RedbCore"); + ds.field("path", &self.datadir) + .field("table", &self.table) + .finish() } } -impl kv::Adapter for Adapter { - type Scanner = (); - - fn info(&self) -> kv::Info { - kv::Info::new( - Scheme::Redb, - &self.table, - Capability { - read: true, - write: true, - shared: false, - ..Default::default() - }, - ) - } - - async fn get(&self, path: &str) -> Result<Option<Buffer>> { +impl RedbCore { + pub fn get(&self, path: &str) -> Result<Option<Buffer>> { let read_txn = self.db.begin_read().map_err(parse_transaction_error)?; let table_define: redb::TableDefinition<&str, &[u8]> = @@ -173,7 +56,7 @@ impl kv::Adapter for Adapter { Ok(result.map(Buffer::from)) } - async fn set(&self, path: &str, value: Buffer) -> Result<()> { + pub fn set(&self, path: &str, value: Buffer) -> Result<()> { let write_txn = self.db.begin_write().map_err(parse_transaction_error)?; let table_define: redb::TableDefinition<&str, &[u8]> = @@ -193,7 +76,7 @@ impl kv::Adapter for Adapter { Ok(()) } - async fn delete(&self, path: &str) -> Result<()> { + pub fn delete(&self, path: &str) -> Result<()> { let write_txn = self.db.begin_write().map_err(parse_transaction_error)?; let table_define: redb::TableDefinition<&str, &[u8]> = @@ -229,16 +112,16 @@ fn parse_storage_error(e: redb::StorageError) -> Error { Error::new(ErrorKind::Unexpected, "error from redb").set_source(e) } -fn parse_database_error(e: redb::DatabaseError) -> Error { +fn parse_commit_error(e: redb::CommitError) -> Error { Error::new(ErrorKind::Unexpected, "error from redb").set_source(e) } -fn parse_commit_error(e: redb::CommitError) -> Error { +pub fn parse_database_error(e: redb::DatabaseError) -> Error { Error::new(ErrorKind::Unexpected, "error from redb").set_source(e) } /// Check if a table exists, otherwise create it. -fn create_table(db: &redb::Database, table: &str) -> Result<()> { +pub fn create_table(db: &redb::Database, table: &str) -> Result<()> { // Only one `WriteTransaction` is permitted at same time, // applying new one will block until it available. // diff --git a/core/src/services/redb/mod.rs b/core/src/services/redb/deleter.rs similarity index 61% copy from core/src/services/redb/mod.rs copy to core/src/services/redb/deleter.rs index e13f5e6b2..9ab1ef1f1 100644 --- a/core/src/services/redb/mod.rs +++ b/core/src/services/redb/deleter.rs @@ -15,8 +15,28 @@ // specific language governing permissions and limitations // under the License. -mod backend; -pub use backend::RedbBuilder as Redb; +use std::sync::Arc; -mod config; -pub use config::RedbConfig; +use super::core::*; +use crate::raw::oio; +use crate::raw::*; +use crate::*; + +pub struct RedbDeleter { + core: Arc<RedbCore>, + root: String, +} + +impl RedbDeleter { + pub fn new(core: Arc<RedbCore>, root: String) -> Self { + Self { core, root } + } +} + +impl oio::OneShotDelete for RedbDeleter { + async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> { + let p = build_abs_path(&self.root, &path); + self.core.delete(&p)?; + Ok(()) + } +} diff --git a/core/src/services/redb/docs.md b/core/src/services/redb/docs.md index a964edb28..106b54317 100644 --- a/core/src/services/redb/docs.md +++ b/core/src/services/redb/docs.md @@ -2,16 +2,15 @@ This service can be used to: +- [ ] create_dir - [x] stat - [x] read - [x] write -- [x] create_dir - [x] delete -- [x] copy -- [x] rename -- [ ] ~~list~~ +- [ ] copy +- [ ] rename +- [ ] list - [ ] ~~presign~~ -- [x] blocking ## Configuration diff --git a/core/src/services/redb/mod.rs b/core/src/services/redb/mod.rs index e13f5e6b2..43a87539d 100644 --- a/core/src/services/redb/mod.rs +++ b/core/src/services/redb/mod.rs @@ -16,6 +16,10 @@ // under the License. mod backend; +mod core; +mod deleter; +mod writer; + pub use backend::RedbBuilder as Redb; mod config; diff --git a/core/src/services/redb/writer.rs b/core/src/services/redb/writer.rs new file mode 100644 index 000000000..8e27a9f8d --- /dev/null +++ b/core/src/services/redb/writer.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use super::core::*; +use crate::raw::oio; +use crate::*; + +pub struct RedbWriter { + core: Arc<RedbCore>, + path: String, + buffer: oio::QueueBuf, +} + +impl RedbWriter { + pub fn new(core: Arc<RedbCore>, path: String) -> Self { + Self { + core, + path, + buffer: oio::QueueBuf::new(), + } + } +} + +impl oio::Write for RedbWriter { + async fn write(&mut self, bs: Buffer) -> Result<()> { + self.buffer.push(bs); + Ok(()) + } + + async fn close(&mut self) -> Result<Metadata> { + let buf = self.buffer.clone().collect(); + let length = buf.len() as u64; + self.core.set(&self.path, buf)?; + + let meta = Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length); + Ok(meta) + } + + async fn abort(&mut self) -> Result<()> { + self.buffer.clear(); + Ok(()) + } +}
