This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git
The following commit(s) were added to refs/heads/main by this push:
new c96a170f1 feat: add mongodb gridfs service support (#3491)
c96a170f1 is described below
commit c96a170f1a878e688f23cdb5b21b21989575a5a7
Author: taobo <[email protected]>
AuthorDate: Tue Nov 7 15:51:41 2023 +0800
feat: add mongodb gridfs service support (#3491)
* feat: add mongodb gridfs service support
* fix: behavior tests error
* fix: optimize code
* refactor: expose service name from GridFs to Gridfs
---
.env.example | 5 +
core/Cargo.toml | 1 +
core/src/services/gridfs/backend.rs | 291 ++++++++++++++++++++++++++++++++++++
core/src/services/gridfs/docs.md | 47 ++++++
core/src/services/gridfs/mod.rs | 19 +++
core/src/services/mod.rs | 5 +
core/src/types/operator/builder.rs | 2 +
core/src/types/scheme.rs | 4 +
website/docs/services/gridfs.mdx | 68 +++++++++
9 files changed, 442 insertions(+)
diff --git a/.env.example b/.env.example
index e57e121c7..32eee467b 100644
--- a/.env.example
+++ b/.env.example
@@ -161,3 +161,8 @@ OPENDAL_SWIFT_ACCOUNT=<account>
OPENDAL_SWIFT_CONTAINER=<container>
OPENDAL_SWIFT_ROOT=/path/to/dir
OPENDAL_SWIFT_TOKEN=<token>
+# gridfs
+OPENDAL_GRIDFS_CONNECTION_STRING=mongodb://localhost:27017
+OPENDAL_GRIDFS_DATABASE=<database>
+OPENDAL_GRIDFS_BUCKET=<fs>
+OPENDAL_GRIDFS_CHUNK_SIZE=<chunk_size>
diff --git a/core/Cargo.toml b/core/Cargo.toml
index b7f44a86d..6917eee7e 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -146,6 +146,7 @@ services-gcs = [
]
services-gdrive = []
services-ghac = []
+services-gridfs = ["dep:mongodb"]
services-hdfs = ["dep:hdrs"]
services-http = []
services-ipfs = ["dep:prost"]
diff --git a/core/src/services/gridfs/backend.rs
b/core/src/services/gridfs/backend.rs
new file mode 100644
index 000000000..606b464d3
--- /dev/null
+++ b/core/src/services/gridfs/backend.rs
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use futures::{AsyncWriteExt, StreamExt};
+use mongodb::bson::doc;
+use mongodb::options::{ClientOptions, GridFsBucketOptions, GridFsFindOptions};
+use mongodb::GridFsBucket;
+use std::fmt::{Debug, Formatter};
+use tokio::sync::OnceCell;
+
+use crate::raw::adapters::kv;
+use crate::raw::new_std_io_error;
+use crate::*;
+
+#[doc = include_str!("docs.md")]
+#[derive(Default)]
+pub struct GridFsBuilder {
+ connection_string: Option<String>,
+ database: Option<String>,
+ bucket: Option<String>,
+ chunk_size: Option<u32>,
+ root: Option<String>,
+}
+
+impl Debug for GridFsBuilder {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("GridFsBuilder")
+ .field("database", &self.database)
+ .field("bucket", &self.bucket)
+ .field("chunk_size", &self.chunk_size)
+ .field("root", &self.root)
+ .finish()
+ }
+}
+
+impl GridFsBuilder {
+ /// Set the connection_string of the MongoDB service.
+ ///
+ /// This connection string is used to connect to the MongoDB service. It
typically follows the format:
+ ///
+ /// ## Format
+ ///
+ ///
`mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]`
+ ///
+ /// Examples:
+ ///
+ /// - Connecting to a local MongoDB instance: `mongodb://localhost:27017`
+ /// - Using authentication:
`mongodb://myUser:myPassword@localhost:27017/myAuthDB`
+ /// - Specifying authentication mechanism:
`mongodb://myUser:myPassword@localhost:27017/myAuthDB?authMechanism=SCRAM-SHA-256`
+ ///
+ /// ## Options
+ ///
+ /// - `authMechanism`: Specifies the authentication method to use.
Examples include `SCRAM-SHA-1`, `SCRAM-SHA-256`, and `MONGODB-AWS`.
+ /// - ... (any other options you wish to highlight)
+ ///
+ /// For more information, please refer to [MongoDB Connection String URI
Format](https://docs.mongodb.com/manual/reference/connection-string/).
+ pub fn connection_string(&mut self, v: &str) -> &mut Self {
+ if !v.is_empty() {
+ self.connection_string = Some(v.to_string());
+ }
+ self
+ }
+
+ /// Set the working directory, all operations will be performed under it.
+ ///
+ /// default: "/"
+ pub fn root(&mut self, root: &str) -> &mut Self {
+ if !root.is_empty() {
+ self.root = Some(root.to_owned());
+ }
+ self
+ }
+
+ /// Set the database name of the MongoDB GridFs service to read/write.
+ pub fn database(&mut self, database: &str) -> &mut Self {
+ if !database.is_empty() {
+ self.database = Some(database.to_string());
+ }
+ self
+ }
+
+ /// Set the buctet name of the MongoDB GridFs service to read/write.
+ ///
+ /// Default to `fs` if not specified.
+ pub fn bucket(&mut self, bucket: &str) -> &mut Self {
+ if !bucket.is_empty() {
+ self.bucket = Some(bucket.to_string());
+ }
+ self
+ }
+
+ /// Set the chunk size of the MongoDB GridFs service used to break the
user file into chunks.
+ ///
+ /// Default to `255 KiB` if not specified.
+ pub fn chunk_size(&mut self, chunk_size: u32) -> &mut Self {
+ if chunk_size > 0 {
+ self.chunk_size = Some(chunk_size);
+ }
+ self
+ }
+}
+
+impl Builder for GridFsBuilder {
+ const SCHEME: Scheme = Scheme::Mongodb;
+
+ type Accessor = GridFsBackend;
+
+ fn from_map(map: std::collections::HashMap<String, String>) -> Self {
+ let mut builder = Self::default();
+
+ map.get("connection_string")
+ .map(|v| builder.connection_string(v));
+ map.get("database").map(|v| builder.database(v));
+ map.get("bucket").map(|v| builder.bucket(v));
+ map.get("chunk_size")
+ .map(|v| builder.chunk_size(v.parse::<u32>().unwrap_or_default()));
+ map.get("root").map(|v| builder.root(v));
+
+ builder
+ }
+
+ fn build(&mut self) -> Result<Self::Accessor> {
+ let conn = match &self.connection_string.clone() {
+ Some(v) => v.clone(),
+ None => {
+ return Err(
+ Error::new(ErrorKind::InvalidInput, "connection_string is
required")
+ .with_context("service", Scheme::Gridfs),
+ )
+ }
+ };
+ let database = match &self.database.clone() {
+ Some(v) => v.clone(),
+ None => {
+ return Err(Error::new(ErrorKind::InvalidInput, "database is
required")
+ .with_context("service", Scheme::Gridfs))
+ }
+ };
+ let bucket = match &self.bucket.clone() {
+ Some(v) => v.clone(),
+ None => "fs".to_string(),
+ };
+ let chunk_size = self.chunk_size.unwrap_or(255);
+
+ Ok(GridFsBackend::new(Adapter {
+ connection_string: conn,
+ database,
+ bucket,
+ chunk_size,
+ bucket_instance: OnceCell::new(),
+ }))
+ }
+}
+
+pub type GridFsBackend = kv::Backend<Adapter>;
+
+#[derive(Clone)]
+pub struct Adapter {
+ connection_string: String,
+ database: String,
+ bucket: String,
+ chunk_size: u32,
+ bucket_instance: OnceCell<GridFsBucket>,
+}
+
+impl Debug for Adapter {
+ fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Adapter")
+ .field("database", &self.database)
+ .field("bucket", &self.bucket)
+ .field("chunk_size", &self.chunk_size)
+ .finish()
+ }
+}
+
+impl Adapter {
+ async fn get_bucket(&self) -> Result<&GridFsBucket> {
+ self.bucket_instance
+ .get_or_try_init(|| async {
+ let client_options =
ClientOptions::parse(&self.connection_string)
+ .await
+ .map_err(parse_mongodb_error)?;
+ let client =
+
mongodb::Client::with_options(client_options).map_err(parse_mongodb_error)?;
+ let bucket_options = GridFsBucketOptions::builder()
+ .bucket_name(Some(self.bucket.clone()))
+ .chunk_size_bytes(Some(self.chunk_size))
+ .build();
+ let bucket = client
+ .database(&self.database)
+ .gridfs_bucket(bucket_options);
+ Ok(bucket)
+ })
+ .await
+ }
+}
+
+#[async_trait]
+impl kv::Adapter for Adapter {
+ fn metadata(&self) -> kv::Metadata {
+ kv::Metadata::new(
+ Scheme::Gridfs,
+ &format!("{}/{}", self.database, self.bucket),
+ Capability {
+ read: true,
+ write: true,
+ ..Default::default()
+ },
+ )
+ }
+
+ async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
+ let bucket = self.get_bucket().await?;
+ let filter = doc! { "filename": path };
+ let options = GridFsFindOptions::builder().limit(Some(1)).build();
+ let mut cursor = bucket
+ .find(filter, options)
+ .await
+ .map_err(parse_mongodb_error)?;
+
+ match cursor.next().await {
+ Some(doc) => {
+ let mut destination = Vec::new();
+ let file_id = doc.map_err(parse_mongodb_error)?.id;
+ bucket
+ .download_to_futures_0_3_writer(file_id, &mut destination)
+ .await
+ .map_err(parse_mongodb_error)?;
+ Ok(Some(destination))
+ }
+ None => Ok(None),
+ }
+ }
+
+ async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
+ let bucket = self.get_bucket().await?;
+ // delete old file if exists
+ let filter = doc! { "filename": path };
+ let options = GridFsFindOptions::builder().limit(Some(1)).build();
+ let mut cursor = bucket
+ .find(filter, options)
+ .await
+ .map_err(parse_mongodb_error)?;
+ if let Some(doc) = cursor.next().await {
+ let file_id = doc.map_err(parse_mongodb_error)?.id;
+ bucket.delete(file_id).await.map_err(parse_mongodb_error)?;
+ }
+ // set new file
+ let mut upload_stream = bucket.open_upload_stream(path, None);
+ upload_stream
+ .write_all(value)
+ .await
+ .map_err(new_std_io_error)?;
+ upload_stream.close().await.map_err(new_std_io_error)?;
+
+ Ok(())
+ }
+
+ async fn delete(&self, path: &str) -> Result<()> {
+ let bucket = self.get_bucket().await?;
+ let filter = doc! { "filename": path };
+ let mut cursor = bucket
+ .find(filter, None)
+ .await
+ .map_err(parse_mongodb_error)?;
+ while let Some(doc) = cursor.next().await {
+ let file_id = doc.map_err(parse_mongodb_error)?.id;
+ bucket.delete(file_id).await.map_err(parse_mongodb_error)?;
+ }
+ Ok(())
+ }
+}
+
+fn parse_mongodb_error(err: mongodb::error::Error) -> Error {
+ Error::new(ErrorKind::Unexpected, "mongodb error").set_source(err)
+}
diff --git a/core/src/services/gridfs/docs.md b/core/src/services/gridfs/docs.md
new file mode 100644
index 000000000..b69a3392b
--- /dev/null
+++ b/core/src/services/gridfs/docs.md
@@ -0,0 +1,47 @@
+## Capabilities
+
+This service can be used to:
+
+- [x] stat
+- [x] read
+- [x] write
+- [x] create_dir
+- [x] delete
+- [ ] copy
+- [ ] rename
+- [ ] ~~list~~
+- [ ] scan
+- [ ] ~~presign~~
+- [ ] blocking
+
+## Configuration
+
+- `root`: Set the working directory of `OpenDAL`
+- `connection_string`: Set the connection string of mongodb server
+- `database`: Set the database of mongodb
+- `bucket`: Set the bucket of mongodb gridfs
+- `chunk_size`: Set the chunk size of mongodb gridfs
+
+## Example
+
+### Via Builder
+
+```rust
+use anyhow::Result;
+use opendal::services::Gridfs;
+use opendal::Operator;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let mut builder = Gridfs::default();
+ builder.root("/");
+
builder.connection_string("mongodb://myUser:myPassword@localhost:27017/myAuthDB");
+ builder.database("your_database");
+ builder.bucket("your_bucket");
+ // The chunk size in bytes used to break the user file into chunks.
+ builder.chunk_size(255);
+
+ let op = Operator::new(builder)?.finish();
+ Ok(())
+}
+```
diff --git a/core/src/services/gridfs/mod.rs b/core/src/services/gridfs/mod.rs
new file mode 100644
index 000000000..4623f8cb0
--- /dev/null
+++ b/core/src/services/gridfs/mod.rs
@@ -0,0 +1,19 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+pub use backend::GridFsBuilder as Gridfs;
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index cf8000632..dbc509ed3 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -69,6 +69,11 @@ mod ghac;
#[cfg(feature = "services-ghac")]
pub use ghac::Ghac;
+#[cfg(feature = "services-gridfs")]
+mod gridfs;
+#[cfg(feature = "services-gridfs")]
+pub use gridfs::Gridfs;
+
#[cfg(feature = "services-hdfs")]
mod hdfs;
#[cfg(feature = "services-hdfs")]
diff --git a/core/src/types/operator/builder.rs
b/core/src/types/operator/builder.rs
index 8f34f79ae..3efbcddf7 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -183,6 +183,8 @@ impl Operator {
Scheme::Gcs => Self::from_map::<services::Gcs>(map)?.finish(),
#[cfg(feature = "services-ghac")]
Scheme::Ghac => Self::from_map::<services::Ghac>(map)?.finish(),
+ #[cfg(feature = "services-gridfs")]
+ Scheme::Gridfs =>
Self::from_map::<services::Gridfs>(map)?.finish(),
#[cfg(feature = "services-hdfs")]
Scheme::Hdfs => Self::from_map::<services::Hdfs>(map)?.finish(),
#[cfg(feature = "services-http")]
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index 4699835d9..ec582a637 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -127,6 +127,8 @@ pub enum Scheme {
Azfile,
/// [mongodb](crate::services::mongodb): MongoDB Services
Mongodb,
+ /// [gridfs](crate::services::gridfs): MongoDB Gridfs Services
+ Gridfs,
/// Custom that allow users to implement services outside of OpenDAL.
///
/// # NOTE
@@ -288,6 +290,7 @@ impl FromStr for Scheme {
"gcs" => Ok(Scheme::Gcs),
"gdrive" => Ok(Scheme::Gdrive),
"ghac" => Ok(Scheme::Ghac),
+ "gridfs" => Ok(Scheme::Gridfs),
"hdfs" => Ok(Scheme::Hdfs),
"http" | "https" => Ok(Scheme::Http),
"ftp" | "ftps" => Ok(Scheme::Ftp),
@@ -340,6 +343,7 @@ impl From<Scheme> for &'static str {
Scheme::Fs => "fs",
Scheme::Gcs => "gcs",
Scheme::Ghac => "ghac",
+ Scheme::Gridfs => "gridfs",
Scheme::Hdfs => "hdfs",
Scheme::Http => "http",
Scheme::Foundationdb => "foundationdb",
diff --git a/website/docs/services/gridfs.mdx b/website/docs/services/gridfs.mdx
new file mode 100644
index 000000000..d33137f75
--- /dev/null
+++ b/website/docs/services/gridfs.mdx
@@ -0,0 +1,68 @@
+---
+title: Gridfs
+---
+
+[Gridfs](https://www.mongodb.com/docs/manual/core/gridfs/) services support.
+
+import Docs from '../../../core/src/services/gridfs/docs.md'
+
+<Docs components={props.components} />
+
+### Via Config
+
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+<Tabs>
+ <TabItem value="rust" label="Rust" default>
+
+```rust
+use anyhow::Result;
+use opendal::Operator;
+use opendal::Scheme;
+use std::collections::HashMap;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+ let mut map = HashMap::new();
+ map.insert("connection_string".to_string(),
"connection_string".to_string());
+ map.insert("database".to_string(), "database".to_string());
+ map.insert("bucket".to_string(), "bucket".to_string());
+ let op: Operator = Operator::via_map(Scheme::Gridfs, map)?;
+ Ok(())
+}
+```
+
+ </TabItem>
+ <TabItem value="node.js" label="Node.js">
+
+```javascript
+import { Operator } from "opendal";
+
+async function main() {
+ const config = {
+ connection_string: "connection_string",
+ database: "database",
+ bucket: "bucket",
+ };
+ const op = new Operator("gridfs", config);
+}
+```
+
+ </TabItem>
+ <TabItem value="python" label="Python">
+
+```python
+import opendal
+
+config = {
+ "connection_string": "connection_string",
+ "database": "database",
+ "bucket": "bucket",
+}
+
+op = opendal.Operator("gridfs", **config)
+```
+
+ </TabItem>
+</Tabs>
\ No newline at end of file