This is an automated email from the ASF dual-hosted git repository. koushiro pushed a commit to branch refactor-mongodb in repository https://gitbox.apache.org/repos/asf/opendal.git
commit 3322fd9d3bb40d73ef4f31948c814884a95b4644 Author: koushiro <[email protected]> AuthorDate: Wed Oct 22 14:40:46 2025 +0800 refactor: migrate mongodb service from adapter::kv to impl Access directly --- core/src/services/mongodb/backend.rs | 181 ++++++++++------------- core/src/services/mongodb/config.rs | 22 +-- core/src/services/mongodb/core.rs | 115 ++++++++++++++ core/src/services/mongodb/{mod.rs => deleter.rs} | 28 +++- core/src/services/mongodb/docs.md | 5 +- core/src/services/mongodb/mod.rs | 4 + core/src/services/mongodb/writer.rs | 59 ++++++++ 7 files changed, 286 insertions(+), 128 deletions(-) diff --git a/core/src/services/mongodb/backend.rs b/core/src/services/mongodb/backend.rs index 1cb9e8511..d92eab42e 100644 --- a/core/src/services/mongodb/backend.rs +++ b/core/src/services/mongodb/backend.rs @@ -15,18 +15,15 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::Debug; -use std::fmt::Formatter; +use std::sync::Arc; -use mongodb::bson::Binary; -use mongodb::bson::Document; -use mongodb::bson::doc; -use mongodb::options::ClientOptions; use tokio::sync::OnceCell; -use crate::raw::adapters::kv; +use super::config::MongodbConfig; +use super::core::*; +use super::deleter::MongodbDeleter; +use super::writer::MongodbWriter; use crate::raw::*; -use crate::services::MongodbConfig; use crate::*; #[doc = include_str!("docs.md")] @@ -35,14 +32,6 @@ pub struct MongodbBuilder { pub(super) config: MongodbConfig, } -impl Debug for MongodbBuilder { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("MongodbBuilder") - .field("config", &self.config) - .finish() - } -} - impl MongodbBuilder { /// Set the connection_string of the MongoDB service. /// @@ -164,7 +153,7 @@ impl Builder for MongodbBuilder { .unwrap_or_else(|| "/".to_string()) .as_str(), ); - Ok(MongodbBackend::new(Adapter { + Ok(MongodbBackend::new(MongodbCore { connection_string: conn, database, collection, @@ -176,109 +165,95 @@ impl Builder for MongodbBuilder { } } -pub type MongodbBackend = kv::Backend<Adapter>; - -#[derive(Clone)] -pub struct Adapter { - connection_string: String, - database: String, - collection: String, - collection_instance: OnceCell<mongodb::Collection<Document>>, - key_field: String, - value_field: String, +/// Backend for Mongodb services. +#[derive(Clone, Debug)] +pub struct MongodbBackend { + core: Arc<MongodbCore>, + root: String, + info: Arc<AccessorInfo>, } -impl Debug for Adapter { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("Adapter") - .field("connection_string", &self.connection_string) - .field("database", &self.database) - .field("collection", &self.collection) - .field("key_field", &self.key_field) - .field("value_field", &self.value_field) - .finish() +impl MongodbBackend { + pub fn new(core: MongodbCore) -> Self { + let info = AccessorInfo::default(); + info.set_scheme(Scheme::Mongodb.into_static()); + info.set_name(&format!("{}/{}", core.database, core.collection)); + info.set_root("/"); + info.set_native_capability(Capability { + read: true, + stat: true, + write: true, + write_can_empty: true, + delete: true, + shared: true, + ..Default::default() + }); + + Self { + core: Arc::new(core), + root: "/".to_string(), + info: Arc::new(info), + } } -} -impl Adapter { - async fn get_collection(&self) -> Result<&mongodb::Collection<Document>> { - self.collection_instance - .get_or_try_init(|| async { - let client_options = ClientOptions::parse(&self.connection_string) - .await - .map_err(parse_mongodb_error)?; - let client = - mongodb::Client::with_options(client_options).map_err(parse_mongodb_error)?; - let database = client.database(&self.database); - let collection = database.collection(&self.collection); - Ok(collection) - }) - .await + fn with_normalized_root(mut self, root: String) -> Self { + self.info.set_root(&root); + self.root = root; + self } } -impl kv::Adapter for Adapter { - type Scanner = (); +impl Access for MongodbBackend { + type Reader = Buffer; + type Writer = MongodbWriter; + type Lister = (); + type Deleter = oio::OneShotDeleter<MongodbDeleter>; - fn info(&self) -> kv::Info { - kv::Info::new( - Scheme::Mongodb, - &format!("{}/{}", self.database, self.collection), - Capability { - read: true, - write: true, - shared: true, - ..Default::default() - }, - ) + fn info(&self) -> Arc<AccessorInfo> { + self.info.clone() } - async fn get(&self, path: &str) -> Result<Option<Buffer>> { - let collection = self.get_collection().await?; - let filter = doc! {self.key_field.as_str():path}; - let result = collection - .find_one(filter) - .await - .map_err(parse_mongodb_error)?; - match result { - Some(doc) => { - let value = doc - .get_binary_generic(&self.value_field) - .map_err(parse_bson_error)?; - Ok(Some(Buffer::from(value.to_vec()))) + async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> { + let p = build_abs_path(&self.root, path); + + if p == build_abs_path(&self.root, "") { + Ok(RpStat::new(Metadata::new(EntryMode::DIR))) + } else { + let bs = self.core.get(&p).await?; + match bs { + Some(bs) => Ok(RpStat::new( + Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64), + )), + None => Err(Error::new(ErrorKind::NotFound, "kv not found in mongodb")), } - None => Ok(None), } } - async fn set(&self, path: &str, value: Buffer) -> Result<()> { - let collection = self.get_collection().await?; - let filter = doc! { self.key_field.as_str(): path }; - let update = doc! { "$set": { self.value_field.as_str(): Binary { subtype: mongodb::bson::spec::BinarySubtype::Generic, bytes: value.to_vec() } } }; - collection - .update_one(filter, update) - .upsert(true) - .await - .map_err(parse_mongodb_error)?; - - Ok(()) + async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { + let p = build_abs_path(&self.root, path); + let bs = match self.core.get(&p).await? { + Some(bs) => bs, + None => { + return Err(Error::new(ErrorKind::NotFound, "kv not found in mongodb")); + } + }; + Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize()))) } - async fn delete(&self, path: &str) -> Result<()> { - let collection = self.get_collection().await?; - let filter = doc! {self.key_field.as_str():path}; - collection - .delete_one(filter) - .await - .map_err(parse_mongodb_error)?; - Ok(()) + async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let p = build_abs_path(&self.root, path); + Ok((RpWrite::new(), MongodbWriter::new(self.core.clone(), p))) } -} -fn parse_mongodb_error(err: mongodb::error::Error) -> Error { - Error::new(ErrorKind::Unexpected, "mongodb error").set_source(err) -} + async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { + Ok(( + RpDelete::default(), + oio::OneShotDeleter::new(MongodbDeleter::new(self.core.clone(), self.root.clone())), + )) + } -fn parse_bson_error(err: mongodb::bson::document::ValueAccessError) -> Error { - Error::new(ErrorKind::Unexpected, "bson error").set_source(err) + async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> { + let _ = build_abs_path(&self.root, path); + Ok((RpList::default(), ())) + } } diff --git a/core/src/services/mongodb/config.rs b/core/src/services/mongodb/config.rs index 80e2fc89c..de721bd6d 100644 --- a/core/src/services/mongodb/config.rs +++ b/core/src/services/mongodb/config.rs @@ -15,15 +15,13 @@ // specific language governing permissions and limitations // under the License. -use std::fmt::Debug; -use std::fmt::Formatter; - -use super::backend::MongodbBuilder; use serde::Deserialize; use serde::Serialize; +use super::backend::MongodbBuilder; + /// Config for Mongodb service support. -#[derive(Default, Serialize, Deserialize, Clone, PartialEq, Eq)] +#[derive(Default, Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] #[serde(default)] #[non_exhaustive] pub struct MongodbConfig { @@ -41,21 +39,9 @@ pub struct MongodbConfig { pub value_field: Option<String>, } -impl Debug for MongodbConfig { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("MongodbConfig") - .field("connection_string", &self.connection_string) - .field("database", &self.database) - .field("collection", &self.collection) - .field("root", &self.root) - .field("key_field", &self.key_field) - .field("value_field", &self.value_field) - .finish() - } -} - impl crate::Configurator for MongodbConfig { type Builder = MongodbBuilder; + fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> { let mut map = uri.options().clone(); diff --git a/core/src/services/mongodb/core.rs b/core/src/services/mongodb/core.rs new file mode 100644 index 000000000..5daf2976b --- /dev/null +++ b/core/src/services/mongodb/core.rs @@ -0,0 +1,115 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; +use std::fmt::Formatter; + +use mongodb::bson::Binary; +use mongodb::bson::Document; +use mongodb::bson::doc; +use mongodb::options::ClientOptions; +use tokio::sync::OnceCell; + +use crate::*; + +#[derive(Clone)] +pub struct MongodbCore { + pub connection_string: String, + pub database: String, + pub collection: String, + pub collection_instance: OnceCell<mongodb::Collection<Document>>, + pub key_field: String, + pub value_field: String, +} + +impl Debug for MongodbCore { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MongodbCore") + .field("connection_string", &self.connection_string) + .field("database", &self.database) + .field("collection", &self.collection) + .field("key_field", &self.key_field) + .field("value_field", &self.value_field) + .finish() + } +} + +impl MongodbCore { + async fn get_collection(&self) -> Result<&mongodb::Collection<Document>> { + self.collection_instance + .get_or_try_init(|| async { + let client_options = ClientOptions::parse(&self.connection_string) + .await + .map_err(parse_mongodb_error)?; + let client = + mongodb::Client::with_options(client_options).map_err(parse_mongodb_error)?; + let database = client.database(&self.database); + let collection = database.collection(&self.collection); + Ok(collection) + }) + .await + } + + pub async fn get(&self, path: &str) -> Result<Option<Buffer>> { + let collection = self.get_collection().await?; + let filter = doc! {self.key_field.as_str():path}; + let result = collection + .find_one(filter) + .await + .map_err(parse_mongodb_error)?; + match result { + Some(doc) => { + let value = doc + .get_binary_generic(&self.value_field) + .map_err(parse_bson_error)?; + Ok(Some(Buffer::from(value.to_vec()))) + } + None => Ok(None), + } + } + + pub async fn set(&self, path: &str, value: Buffer) -> Result<()> { + let collection = self.get_collection().await?; + let filter = doc! { self.key_field.as_str(): path }; + let update = doc! { "$set": { self.value_field.as_str(): Binary { subtype: mongodb::bson::spec::BinarySubtype::Generic, bytes: value.to_vec() } } }; + collection + .update_one(filter, update) + .upsert(true) + .await + .map_err(parse_mongodb_error)?; + + Ok(()) + } + + pub async fn delete(&self, path: &str) -> Result<()> { + let collection = self.get_collection().await?; + let filter = doc! {self.key_field.as_str():path}; + collection + .delete_one(filter) + .await + .map_err(parse_mongodb_error)?; + Ok(()) + } +} + +fn parse_mongodb_error(err: mongodb::error::Error) -> Error { + Error::new(ErrorKind::Unexpected, "mongodb error").set_source(err) +} + +fn parse_bson_error(err: mongodb::bson::document::ValueAccessError) -> Error { + Error::new(ErrorKind::Unexpected, "bson error").set_source(err) +} diff --git a/core/src/services/mongodb/mod.rs b/core/src/services/mongodb/deleter.rs similarity index 60% copy from core/src/services/mongodb/mod.rs copy to core/src/services/mongodb/deleter.rs index 1ce9bd23d..2c147453b 100644 --- a/core/src/services/mongodb/mod.rs +++ b/core/src/services/mongodb/deleter.rs @@ -15,8 +15,28 @@ // specific language governing permissions and limitations // under the License. -mod backend; -pub use backend::MongodbBuilder as Mongodb; +use std::sync::Arc; -mod config; -pub use config::MongodbConfig; +use super::core::*; +use crate::raw::oio; +use crate::raw::*; +use crate::*; + +pub struct MongodbDeleter { + core: Arc<MongodbCore>, + root: String, +} + +impl MongodbDeleter { + pub fn new(core: Arc<MongodbCore>, root: String) -> Self { + Self { core, root } + } +} + +impl oio::OneShotDelete for MongodbDeleter { + async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> { + let p = build_abs_path(&self.root, &path); + self.core.delete(&p).await?; + Ok(()) + } +} diff --git a/core/src/services/mongodb/docs.md b/core/src/services/mongodb/docs.md index 2c90995ec..7fb4ea0b1 100644 --- a/core/src/services/mongodb/docs.md +++ b/core/src/services/mongodb/docs.md @@ -2,16 +2,15 @@ This service can be used to: +- [ ] create_dir - [x] stat - [x] read - [x] write -- [x] create_dir - [x] delete - [ ] copy - [ ] rename -- [ ] ~~list~~ +- [ ] list - [ ] ~~presign~~ -- [ ] blocking ## Configuration diff --git a/core/src/services/mongodb/mod.rs b/core/src/services/mongodb/mod.rs index 1ce9bd23d..539ff378b 100644 --- a/core/src/services/mongodb/mod.rs +++ b/core/src/services/mongodb/mod.rs @@ -16,6 +16,10 @@ // under the License. mod backend; +mod core; +mod deleter; +mod writer; + pub use backend::MongodbBuilder as Mongodb; mod config; diff --git a/core/src/services/mongodb/writer.rs b/core/src/services/mongodb/writer.rs new file mode 100644 index 000000000..f9c34ba9c --- /dev/null +++ b/core/src/services/mongodb/writer.rs @@ -0,0 +1,59 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::sync::Arc; + +use super::core::*; +use crate::raw::oio; +use crate::*; + +pub struct MongodbWriter { + core: Arc<MongodbCore>, + path: String, + buffer: oio::QueueBuf, +} + +impl MongodbWriter { + pub fn new(core: Arc<MongodbCore>, path: String) -> Self { + Self { + core, + path, + buffer: oio::QueueBuf::new(), + } + } +} + +impl oio::Write for MongodbWriter { + async fn write(&mut self, bs: Buffer) -> Result<()> { + self.buffer.push(bs); + Ok(()) + } + + async fn close(&mut self) -> Result<Metadata> { + let buf = self.buffer.clone().collect(); + let length = buf.len() as u64; + self.core.set(&self.path, buf).await?; + + let meta = Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length); + Ok(meta) + } + + async fn abort(&mut self) -> Result<()> { + self.buffer.clear(); + Ok(()) + } +}
