This is an automated email from the ASF dual-hosted git repository. koushiro pushed a commit to branch refactor-foundationdb in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 2a31deff67a81b0cc0f699db54da9e84408b4b78 Author: koushiro <[email protected]> AuthorDate: Tue Oct 21 23:04:35 2025 +0800 refactor: migrate foundationdb service from adapter::kv to impl Access directly --- core/src/services/foundationdb/backend.rs | 153 ++++++++++++--------- core/src/services/foundationdb/config.rs | 20 +-- core/src/services/foundationdb/core.rs | 84 +++++++++++ .../services/foundationdb/{mod.rs => deleter.rs} | 28 +++- core/src/services/foundationdb/docs.md | 9 +- core/src/services/foundationdb/mod.rs | 4 + core/src/services/foundationdb/writer.rs | 59 ++++++++ 7 files changed, 267 insertions(+), 90 deletions(-) diff --git a/core/src/services/foundationdb/backend.rs b/core/src/services/foundationdb/backend.rs index 151de6c47..e1fcad793 100644 --- a/core/src/services/foundationdb/backend.rs +++ b/core/src/services/foundationdb/backend.rs @@ -16,19 +16,15 @@ // under the License. use std::fmt::Debug; -use std::fmt::Formatter; use std::sync::Arc; use foundationdb::Database; -use foundationdb::api::NetworkAutoStop; -use crate::Builder; -use crate::Error; -use crate::ErrorKind; -use crate::Scheme; -use crate::raw::adapters::kv; +use super::config::FoundationdbConfig; +use super::core::*; +use super::deleter::FoundationdbDeleter; +use super::writer::FoundationdbWriter; use crate::raw::*; -use crate::services::FoundationdbConfig; use crate::*; #[doc = include_str!("docs.md")] @@ -81,84 +77,111 @@ impl Builder for FoundationdbBuilder { .as_str(), ); - Ok(FoundationdbBackend::new(Adapter { db, _network }).with_normalized_root(root)) + Ok(FoundationdbBackend::new(FoundationdbCore { db, _network }).with_normalized_root(root)) } } /// Backend for Foundationdb services. -pub type FoundationdbBackend = kv::Backend<Adapter>; - -#[derive(Clone)] -pub struct Adapter { - db: Arc<Database>, - _network: Arc<NetworkAutoStop>, +#[derive(Clone, Debug)] +pub struct FoundationdbBackend { + core: Arc<FoundationdbCore>, + root: String, + info: Arc<AccessorInfo>, } -impl Debug for Adapter { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let mut ds = f.debug_struct("Adapter"); - ds.finish() +impl FoundationdbBackend { + pub fn new(core: FoundationdbCore) -> Self { + let info = AccessorInfo::default(); + info.set_scheme(Scheme::Foundationdb.into_static()); + info.set_name("foundationdb"); + info.set_root("/"); + info.set_native_capability(Capability { + read: true, + stat: true, + write: true, + write_can_empty: true, + delete: true, + shared: true, + ..Default::default() + }); + + Self { + core: Arc::new(core), + root: "/".to_string(), + info: Arc::new(info), + } + } + + fn with_normalized_root(mut self, root: String) -> Self { + self.info.set_root(&root); + self.root = root; + self } } -impl kv::Adapter for Adapter { - type Scanner = (); - - fn info(&self) -> kv::Info { - kv::Info::new( - Scheme::Foundationdb, - "foundationdb", - Capability { - read: true, - write: true, - delete: true, - shared: true, - ..Default::default() - }, - ) +impl Access for FoundationdbBackend { + type Reader = Buffer; + type Writer = FoundationdbWriter; + type Lister = (); + type Deleter = oio::OneShotDeleter<FoundationdbDeleter>; + + fn info(&self) -> Arc<AccessorInfo> { + self.info.clone() } - async fn get(&self, path: &str) -> Result<Option<Buffer>> { - let transaction = self.db.create_trx().expect("Unable to create transaction"); + async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> { + let p = build_abs_path(&self.root, path); - match transaction.get(path.as_bytes(), false).await { - Ok(slice) => match slice { - Some(data) => Ok(Some(Buffer::from(data.to_vec()))), + if p == build_abs_path(&self.root, "") { + Ok(RpStat::new(Metadata::new(EntryMode::DIR))) + } else { + let bs = self.core.get(&p).await?; + match bs { + Some(bs) => Ok(RpStat::new( + Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64), + )), None => Err(Error::new( ErrorKind::NotFound, - "foundationdb: key not found", + "kv not found in foundationdb", )), - }, - Err(_) => Err(Error::new( - ErrorKind::NotFound, - "foundationdb: key not found", - )), + } } } - async fn set(&self, path: &str, value: Buffer) -> Result<()> { - let transaction = self.db.create_trx().expect("Unable to create transaction"); - - transaction.set(path.as_bytes(), &value.to_vec()); - - match transaction.commit().await { - Ok(_) => Ok(()), - Err(e) => Err(parse_transaction_commit_error(e)), - } + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let p = build_abs_path(&self.root, path); + let bs = match self.core.get(&p).await? { + Some(bs) => bs, + None => { + return Err(Error::new( + ErrorKind::NotFound, + "kv not found in foundationdb", + )); + } + }; + Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize()))) } - async fn delete(&self, path: &str) -> Result<()> { - let transaction = self.db.create_trx().expect("Unable to create transaction"); - transaction.clear(path.as_bytes()); + async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let p = build_abs_path(&self.root, path); + Ok(( + RpWrite::new(), + FoundationdbWriter::new(self.core.clone(), p), + )) + } - match transaction.commit().await { - Ok(_) => Ok(()), - Err(e) => Err(parse_transaction_commit_error(e)), - } + async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { + Ok(( + RpDelete::default(), + oio::OneShotDeleter::new(FoundationdbDeleter::new( + self.core.clone(), + self.root.clone(), + )), + )) } -} -fn parse_transaction_commit_error(e: foundationdb::TransactionCommitError) -> Error { - Error::new(ErrorKind::Unexpected, e.to_string().as_str()) - .with_context("service", Scheme::Foundationdb) + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> { + let _ = build_abs_path(&self.root, path); + Ok((RpList::default(), ())) + } } diff --git a/core/src/services/foundationdb/config.rs b/core/src/services/foundationdb/config.rs index f335f1dd3..c6c752aa8 100644 --- a/core/src/services/foundationdb/config.rs +++ b/core/src/services/foundationdb/config.rs @@ -15,16 +15,14 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::Debug; -use std::fmt::Formatter; - -use super::backend::FoundationdbBuilder; use serde::Deserialize; use serde::Serialize; +use super::backend::FoundationdbBuilder; + /// [foundationdb](https://www.foundationdb.org/) service support. ///Config for FoundationDB. -#[derive(Default, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(default)] #[non_exhaustive] pub struct FoundationdbConfig { @@ -34,19 +32,9 @@ pub struct FoundationdbConfig { pub config_path: Option<String>, } -impl Debug for FoundationdbConfig { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - let mut ds = f.debug_struct("FoundationConfig"); - - ds.field("root", &self.root); - ds.field("config_path", &self.config_path); - - ds.finish() - } -} - impl crate::Configurator for FoundationdbConfig { type Builder = FoundationdbBuilder; + fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> { let mut map = uri.options().clone(); diff --git a/core/src/services/foundationdb/core.rs b/core/src/services/foundationdb/core.rs new file mode 100644 index 000000000..a1f2d27be --- /dev/null +++ b/core/src/services/foundationdb/core.rs @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; +use std::sync::Arc; + +use foundationdb::Database; +use foundationdb::api::NetworkAutoStop; + +use crate::*; + +#[derive(Clone)] +pub struct FoundationdbCore { + pub db: Arc<Database>, + pub _network: Arc<NetworkAutoStop>, +} + +impl Debug for FoundationdbCore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let mut ds = f.debug_struct("FoundationdbCore"); + ds.finish() + } +} + +impl FoundationdbCore { + pub async fn get(&self, path: &str) -> Result<Option<Buffer>> { + let transaction = self.db.create_trx().expect("Unable to create transaction"); + + match transaction.get(path.as_bytes(), false).await { + Ok(slice) => match slice { + Some(data) => Ok(Some(Buffer::from(data.to_vec()))), + None => Err(Error::new( + ErrorKind::NotFound, + "foundationdb: key not found", + )), + }, + Err(_) => Err(Error::new( + ErrorKind::NotFound, + "foundationdb: key not found", + )), + } + } + + pub async fn set(&self, path: &str, value: Buffer) -> Result<()> { + let transaction = self.db.create_trx().expect("Unable to create transaction"); + + transaction.set(path.as_bytes(), &value.to_vec()); + + match transaction.commit().await { + Ok(_) => Ok(()), + Err(e) => Err(parse_transaction_commit_error(e)), + } + } + + pub async fn delete(&self, path: &str) -> Result<()> { + let transaction = self.db.create_trx().expect("Unable to create transaction"); + transaction.clear(path.as_bytes()); + + match transaction.commit().await { + Ok(_) => Ok(()), + Err(e) => Err(parse_transaction_commit_error(e)), + } + } +} + +fn parse_transaction_commit_error(e: foundationdb::TransactionCommitError) -> Error { + Error::new(ErrorKind::Unexpected, e.to_string().as_str()) + .with_context("service", Scheme::Foundationdb) +} diff --git a/core/src/services/foundationdb/mod.rs b/core/src/services/foundationdb/deleter.rs similarity index 59% copy from core/src/services/foundationdb/mod.rs copy to core/src/services/foundationdb/deleter.rs index 2eede8d02..46f4f9d48 100644 --- a/core/src/services/foundationdb/mod.rs +++ b/core/src/services/foundationdb/deleter.rs @@ -15,8 +15,28 @@ // specific language governing permissions and limitations // under the License. -mod backend; -pub use backend::FoundationdbBuilder as Foundationdb; +use std::sync::Arc; -mod config; -pub use config::FoundationdbConfig; +use super::core::*; +use crate::raw::oio; +use crate::raw::*; +use crate::*; + +pub struct FoundationdbDeleter { + core: Arc<FoundationdbCore>, + root: String, +} + +impl FoundationdbDeleter { + pub fn new(core: Arc<FoundationdbCore>, root: String) -> Self { + Self { core, root } + } +} + +impl oio::OneShotDelete for FoundationdbDeleter { + async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> { + let p = build_abs_path(&self.root, &path); + self.core.delete(&p).await?; + Ok(()) + } +} diff --git a/core/src/services/foundationdb/docs.md b/core/src/services/foundationdb/docs.md index e4ad8a005..d1512ff00 100644 --- a/core/src/services/foundationdb/docs.md +++ b/core/src/services/foundationdb/docs.md @@ -2,16 +2,15 @@ This service can be used to: +- [ ] create_dir - [x] stat - [x] read - [x] write -- [x] create_dir - [x] delete -- [x] copy -- [x] rename -- [ ] ~~list~~ +- [ ] copy +- [ ] rename +- [ ] list - [ ] ~~presign~~ -- [ ] blocking **Note**: As for [Known Limitations - FoundationDB](https://apple.github.io/foundationdb/known-limitations), keys cannot exceed 10,000 bytes in size, and values cannot exceed 100,000 bytes in size. Errors will be raised by OpenDAL if these limits are exceeded. diff --git a/core/src/services/foundationdb/mod.rs b/core/src/services/foundationdb/mod.rs index 2eede8d02..c92ac58f0 100644 --- a/core/src/services/foundationdb/mod.rs +++ b/core/src/services/foundationdb/mod.rs @@ -16,6 +16,10 @@ // under the License. mod backend; +mod core; +mod deleter; +mod writer; + pub use backend::FoundationdbBuilder as Foundationdb; mod config; diff --git a/core/src/services/foundationdb/writer.rs b/core/src/services/foundationdb/writer.rs new file mode 100644 index 000000000..7a8c2be36 --- /dev/null +++ b/core/src/services/foundationdb/writer.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use super::core::*; +use crate::raw::oio; +use crate::*; + +pub struct FoundationdbWriter { + core: Arc<FoundationdbCore>, + path: String, + buffer: oio::QueueBuf, +} + +impl FoundationdbWriter { + pub fn new(core: Arc<FoundationdbCore>, path: String) -> Self { + Self { + core, + path, + buffer: oio::QueueBuf::new(), + } + } +} + +impl oio::Write for FoundationdbWriter { + async fn write(&mut self, bs: Buffer) -> Result<()> { + self.buffer.push(bs); + Ok(()) + } + + async fn close(&mut self) -> Result<Metadata> { + let buf = self.buffer.clone().collect(); + let length = buf.len() as u64; + self.core.set(&self.path, buf).await?; + + let meta = Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length); + Ok(meta) + } + + async fn abort(&mut self) -> Result<()> { + self.buffer.clear(); + Ok(()) + } +}
