This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new c96a170f1 feat: add mongodb gridfs service support (#3491)
c96a170f1 is described below

commit c96a170f1a878e688f23cdb5b21b21989575a5a7
Author: taobo <[email protected]>
AuthorDate: Tue Nov 7 15:51:41 2023 +0800

    feat: add mongodb gridfs service support (#3491)
    
    * feat: add mongodb gridfs service support
    
    * fix: behavior tests error
    
    * fix: optimize code
    
    * refactor: expose service name from GridFs to Gridfs
---
 .env.example                        |   5 +
 core/Cargo.toml                     |   1 +
 core/src/services/gridfs/backend.rs | 291 ++++++++++++++++++++++++++++++++++++
 core/src/services/gridfs/docs.md    |  47 ++++++
 core/src/services/gridfs/mod.rs     |  19 +++
 core/src/services/mod.rs            |   5 +
 core/src/types/operator/builder.rs  |   2 +
 core/src/types/scheme.rs            |   4 +
 website/docs/services/gridfs.mdx    |  68 +++++++++
 9 files changed, 442 insertions(+)

diff --git a/.env.example b/.env.example
index e57e121c7..32eee467b 100644
--- a/.env.example
+++ b/.env.example
@@ -161,3 +161,8 @@ OPENDAL_SWIFT_ACCOUNT=<account>
 OPENDAL_SWIFT_CONTAINER=<container>
 OPENDAL_SWIFT_ROOT=/path/to/dir
 OPENDAL_SWIFT_TOKEN=<token>
+# gridfs
+OPENDAL_GRIDFS_CONNECTION_STRING=mongodb://localhost:27017
+OPENDAL_GRIDFS_DATABASE=<database>
+OPENDAL_GRIDFS_BUCKET=<fs>
+OPENDAL_GRIDFS_CHUNK_SIZE=<chunk_size>
diff --git a/core/Cargo.toml b/core/Cargo.toml
index b7f44a86d..6917eee7e 100644
--- a/core/Cargo.toml
+++ b/core/Cargo.toml
@@ -146,6 +146,7 @@ services-gcs = [
 ]
 services-gdrive = []
 services-ghac = []
+services-gridfs = ["dep:mongodb"]
 services-hdfs = ["dep:hdrs"]
 services-http = []
 services-ipfs = ["dep:prost"]
diff --git a/core/src/services/gridfs/backend.rs 
b/core/src/services/gridfs/backend.rs
new file mode 100644
index 000000000..606b464d3
--- /dev/null
+++ b/core/src/services/gridfs/backend.rs
@@ -0,0 +1,291 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use async_trait::async_trait;
+use futures::{AsyncWriteExt, StreamExt};
+use mongodb::bson::doc;
+use mongodb::options::{ClientOptions, GridFsBucketOptions, GridFsFindOptions};
+use mongodb::GridFsBucket;
+use std::fmt::{Debug, Formatter};
+use tokio::sync::OnceCell;
+
+use crate::raw::adapters::kv;
+use crate::raw::new_std_io_error;
+use crate::*;
+
+#[doc = include_str!("docs.md")]
+#[derive(Default)]
+pub struct GridFsBuilder {
+    connection_string: Option<String>,
+    database: Option<String>,
+    bucket: Option<String>,
+    chunk_size: Option<u32>,
+    root: Option<String>,
+}
+
+impl Debug for GridFsBuilder {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("GridFsBuilder")
+            .field("database", &self.database)
+            .field("bucket", &self.bucket)
+            .field("chunk_size", &self.chunk_size)
+            .field("root", &self.root)
+            .finish()
+    }
+}
+
+impl GridFsBuilder {
+    /// Set the connection_string of the MongoDB service.
+    ///
+    /// This connection string is used to connect to the MongoDB service. It 
typically follows the format:
+    ///
+    /// ## Format
+    ///
+    /// 
`mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]`
+    ///
+    /// Examples:
+    ///
+    /// - Connecting to a local MongoDB instance: `mongodb://localhost:27017`
+    /// - Using authentication: 
`mongodb://myUser:myPassword@localhost:27017/myAuthDB`
+    /// - Specifying authentication mechanism: 
`mongodb://myUser:myPassword@localhost:27017/myAuthDB?authMechanism=SCRAM-SHA-256`
+    ///
+    /// ## Options
+    ///
+    /// - `authMechanism`: Specifies the authentication method to use. 
Examples include `SCRAM-SHA-1`, `SCRAM-SHA-256`, and `MONGODB-AWS`.
+    /// - ... (any other options you wish to highlight)
+    ///
+    /// For more information, please refer to [MongoDB Connection String URI 
Format](https://docs.mongodb.com/manual/reference/connection-string/).
+    pub fn connection_string(&mut self, v: &str) -> &mut Self {
+        if !v.is_empty() {
+            self.connection_string = Some(v.to_string());
+        }
+        self
+    }
+
+    /// Set the working directory, all operations will be performed under it.
+    ///
+    /// default: "/"
+    pub fn root(&mut self, root: &str) -> &mut Self {
+        if !root.is_empty() {
+            self.root = Some(root.to_owned());
+        }
+        self
+    }
+
+    /// Set the database name of the MongoDB GridFs service to read/write.
+    pub fn database(&mut self, database: &str) -> &mut Self {
+        if !database.is_empty() {
+            self.database = Some(database.to_string());
+        }
+        self
+    }
+
+    /// Set the buctet name of the MongoDB GridFs service to read/write.
+    ///
+    /// Default to `fs` if not specified.
+    pub fn bucket(&mut self, bucket: &str) -> &mut Self {
+        if !bucket.is_empty() {
+            self.bucket = Some(bucket.to_string());
+        }
+        self
+    }
+
+    /// Set the chunk size of the MongoDB GridFs service used to break the 
user file into chunks.
+    ///
+    /// Default to `255 KiB` if not specified.
+    pub fn chunk_size(&mut self, chunk_size: u32) -> &mut Self {
+        if chunk_size > 0 {
+            self.chunk_size = Some(chunk_size);
+        }
+        self
+    }
+}
+
+impl Builder for GridFsBuilder {
+    const SCHEME: Scheme = Scheme::Mongodb;
+
+    type Accessor = GridFsBackend;
+
+    fn from_map(map: std::collections::HashMap<String, String>) -> Self {
+        let mut builder = Self::default();
+
+        map.get("connection_string")
+            .map(|v| builder.connection_string(v));
+        map.get("database").map(|v| builder.database(v));
+        map.get("bucket").map(|v| builder.bucket(v));
+        map.get("chunk_size")
+            .map(|v| builder.chunk_size(v.parse::<u32>().unwrap_or_default()));
+        map.get("root").map(|v| builder.root(v));
+
+        builder
+    }
+
+    fn build(&mut self) -> Result<Self::Accessor> {
+        let conn = match &self.connection_string.clone() {
+            Some(v) => v.clone(),
+            None => {
+                return Err(
+                    Error::new(ErrorKind::InvalidInput, "connection_string is 
required")
+                        .with_context("service", Scheme::Gridfs),
+                )
+            }
+        };
+        let database = match &self.database.clone() {
+            Some(v) => v.clone(),
+            None => {
+                return Err(Error::new(ErrorKind::InvalidInput, "database is 
required")
+                    .with_context("service", Scheme::Gridfs))
+            }
+        };
+        let bucket = match &self.bucket.clone() {
+            Some(v) => v.clone(),
+            None => "fs".to_string(),
+        };
+        let chunk_size = self.chunk_size.unwrap_or(255);
+
+        Ok(GridFsBackend::new(Adapter {
+            connection_string: conn,
+            database,
+            bucket,
+            chunk_size,
+            bucket_instance: OnceCell::new(),
+        }))
+    }
+}
+
+pub type GridFsBackend = kv::Backend<Adapter>;
+
+#[derive(Clone)]
+pub struct Adapter {
+    connection_string: String,
+    database: String,
+    bucket: String,
+    chunk_size: u32,
+    bucket_instance: OnceCell<GridFsBucket>,
+}
+
+impl Debug for Adapter {
+    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Adapter")
+            .field("database", &self.database)
+            .field("bucket", &self.bucket)
+            .field("chunk_size", &self.chunk_size)
+            .finish()
+    }
+}
+
+impl Adapter {
+    async fn get_bucket(&self) -> Result<&GridFsBucket> {
+        self.bucket_instance
+            .get_or_try_init(|| async {
+                let client_options = 
ClientOptions::parse(&self.connection_string)
+                    .await
+                    .map_err(parse_mongodb_error)?;
+                let client =
+                    
mongodb::Client::with_options(client_options).map_err(parse_mongodb_error)?;
+                let bucket_options = GridFsBucketOptions::builder()
+                    .bucket_name(Some(self.bucket.clone()))
+                    .chunk_size_bytes(Some(self.chunk_size))
+                    .build();
+                let bucket = client
+                    .database(&self.database)
+                    .gridfs_bucket(bucket_options);
+                Ok(bucket)
+            })
+            .await
+    }
+}
+
+#[async_trait]
+impl kv::Adapter for Adapter {
+    fn metadata(&self) -> kv::Metadata {
+        kv::Metadata::new(
+            Scheme::Gridfs,
+            &format!("{}/{}", self.database, self.bucket),
+            Capability {
+                read: true,
+                write: true,
+                ..Default::default()
+            },
+        )
+    }
+
+    async fn get(&self, path: &str) -> Result<Option<Vec<u8>>> {
+        let bucket = self.get_bucket().await?;
+        let filter = doc! { "filename": path };
+        let options = GridFsFindOptions::builder().limit(Some(1)).build();
+        let mut cursor = bucket
+            .find(filter, options)
+            .await
+            .map_err(parse_mongodb_error)?;
+
+        match cursor.next().await {
+            Some(doc) => {
+                let mut destination = Vec::new();
+                let file_id = doc.map_err(parse_mongodb_error)?.id;
+                bucket
+                    .download_to_futures_0_3_writer(file_id, &mut destination)
+                    .await
+                    .map_err(parse_mongodb_error)?;
+                Ok(Some(destination))
+            }
+            None => Ok(None),
+        }
+    }
+
+    async fn set(&self, path: &str, value: &[u8]) -> Result<()> {
+        let bucket = self.get_bucket().await?;
+        // delete old file if exists
+        let filter = doc! { "filename": path };
+        let options = GridFsFindOptions::builder().limit(Some(1)).build();
+        let mut cursor = bucket
+            .find(filter, options)
+            .await
+            .map_err(parse_mongodb_error)?;
+        if let Some(doc) = cursor.next().await {
+            let file_id = doc.map_err(parse_mongodb_error)?.id;
+            bucket.delete(file_id).await.map_err(parse_mongodb_error)?;
+        }
+        // set new file
+        let mut upload_stream = bucket.open_upload_stream(path, None);
+        upload_stream
+            .write_all(value)
+            .await
+            .map_err(new_std_io_error)?;
+        upload_stream.close().await.map_err(new_std_io_error)?;
+
+        Ok(())
+    }
+
+    async fn delete(&self, path: &str) -> Result<()> {
+        let bucket = self.get_bucket().await?;
+        let filter = doc! { "filename": path };
+        let mut cursor = bucket
+            .find(filter, None)
+            .await
+            .map_err(parse_mongodb_error)?;
+        while let Some(doc) = cursor.next().await {
+            let file_id = doc.map_err(parse_mongodb_error)?.id;
+            bucket.delete(file_id).await.map_err(parse_mongodb_error)?;
+        }
+        Ok(())
+    }
+}
+
+fn parse_mongodb_error(err: mongodb::error::Error) -> Error {
+    Error::new(ErrorKind::Unexpected, "mongodb error").set_source(err)
+}
diff --git a/core/src/services/gridfs/docs.md b/core/src/services/gridfs/docs.md
new file mode 100644
index 000000000..b69a3392b
--- /dev/null
+++ b/core/src/services/gridfs/docs.md
@@ -0,0 +1,47 @@
+## Capabilities
+
+This service can be used to:
+
+- [x] stat
+- [x] read
+- [x] write
+- [x] create_dir
+- [x] delete
+- [ ] copy
+- [ ] rename
+- [ ] ~~list~~
+- [ ] scan
+- [ ] ~~presign~~
+- [ ] blocking
+
+## Configuration
+
+- `root`: Set the working directory of `OpenDAL`
+- `connection_string`: Set the connection string of mongodb server
+- `database`: Set the database of mongodb
+- `bucket`: Set the bucket of mongodb gridfs
+- `chunk_size`: Set the chunk size of mongodb gridfs
+
+## Example
+
+### Via Builder
+
+```rust
+use anyhow::Result;
+use opendal::services::Gridfs;
+use opendal::Operator;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    let mut builder = Gridfs::default();
+    builder.root("/");
+    
builder.connection_string("mongodb://myUser:myPassword@localhost:27017/myAuthDB");
+    builder.database("your_database");
+    builder.bucket("your_bucket");
+    // The chunk size in bytes used to break the user file into chunks.
+    builder.chunk_size(255);
+
+    let op = Operator::new(builder)?.finish();
+    Ok(())
+}
+```
diff --git a/core/src/services/gridfs/mod.rs b/core/src/services/gridfs/mod.rs
new file mode 100644
index 000000000..4623f8cb0
--- /dev/null
+++ b/core/src/services/gridfs/mod.rs
@@ -0,0 +1,19 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+mod backend;
+pub use backend::GridFsBuilder as Gridfs;
diff --git a/core/src/services/mod.rs b/core/src/services/mod.rs
index cf8000632..dbc509ed3 100644
--- a/core/src/services/mod.rs
+++ b/core/src/services/mod.rs
@@ -69,6 +69,11 @@ mod ghac;
 #[cfg(feature = "services-ghac")]
 pub use ghac::Ghac;
 
+#[cfg(feature = "services-gridfs")]
+mod gridfs;
+#[cfg(feature = "services-gridfs")]
+pub use gridfs::Gridfs;
+
 #[cfg(feature = "services-hdfs")]
 mod hdfs;
 #[cfg(feature = "services-hdfs")]
diff --git a/core/src/types/operator/builder.rs 
b/core/src/types/operator/builder.rs
index 8f34f79ae..3efbcddf7 100644
--- a/core/src/types/operator/builder.rs
+++ b/core/src/types/operator/builder.rs
@@ -183,6 +183,8 @@ impl Operator {
             Scheme::Gcs => Self::from_map::<services::Gcs>(map)?.finish(),
             #[cfg(feature = "services-ghac")]
             Scheme::Ghac => Self::from_map::<services::Ghac>(map)?.finish(),
+            #[cfg(feature = "services-gridfs")]
+            Scheme::Gridfs => 
Self::from_map::<services::Gridfs>(map)?.finish(),
             #[cfg(feature = "services-hdfs")]
             Scheme::Hdfs => Self::from_map::<services::Hdfs>(map)?.finish(),
             #[cfg(feature = "services-http")]
diff --git a/core/src/types/scheme.rs b/core/src/types/scheme.rs
index 4699835d9..ec582a637 100644
--- a/core/src/types/scheme.rs
+++ b/core/src/types/scheme.rs
@@ -127,6 +127,8 @@ pub enum Scheme {
     Azfile,
     /// [mongodb](crate::services::mongodb): MongoDB Services
     Mongodb,
+    /// [gridfs](crate::services::gridfs): MongoDB Gridfs Services
+    Gridfs,
     /// Custom that allow users to implement services outside of OpenDAL.
     ///
     /// # NOTE
@@ -288,6 +290,7 @@ impl FromStr for Scheme {
             "gcs" => Ok(Scheme::Gcs),
             "gdrive" => Ok(Scheme::Gdrive),
             "ghac" => Ok(Scheme::Ghac),
+            "gridfs" => Ok(Scheme::Gridfs),
             "hdfs" => Ok(Scheme::Hdfs),
             "http" | "https" => Ok(Scheme::Http),
             "ftp" | "ftps" => Ok(Scheme::Ftp),
@@ -340,6 +343,7 @@ impl From<Scheme> for &'static str {
             Scheme::Fs => "fs",
             Scheme::Gcs => "gcs",
             Scheme::Ghac => "ghac",
+            Scheme::Gridfs => "gridfs",
             Scheme::Hdfs => "hdfs",
             Scheme::Http => "http",
             Scheme::Foundationdb => "foundationdb",
diff --git a/website/docs/services/gridfs.mdx b/website/docs/services/gridfs.mdx
new file mode 100644
index 000000000..d33137f75
--- /dev/null
+++ b/website/docs/services/gridfs.mdx
@@ -0,0 +1,68 @@
+---
+title: Gridfs
+---
+
+[Gridfs](https://www.mongodb.com/docs/manual/core/gridfs/) services support.
+
+import Docs from '../../../core/src/services/gridfs/docs.md'
+
+<Docs components={props.components} />
+
+### Via Config
+
+import Tabs from '@theme/Tabs';
+import TabItem from '@theme/TabItem';
+
+<Tabs>
+  <TabItem value="rust" label="Rust" default>
+
+```rust
+use anyhow::Result;
+use opendal::Operator;
+use opendal::Scheme;
+use std::collections::HashMap;
+
+#[tokio::main]
+async fn main() -> Result<()> {
+    let mut map = HashMap::new();
+    map.insert("connection_string".to_string(), 
"connection_string".to_string());
+    map.insert("database".to_string(), "database".to_string());
+    map.insert("bucket".to_string(), "bucket".to_string());
+    let op: Operator = Operator::via_map(Scheme::Gridfs, map)?;
+    Ok(())
+}
+```
+
+  </TabItem>
+  <TabItem value="node.js" label="Node.js">
+
+```javascript
+import { Operator } from "opendal";
+
+async function main() {
+    const config = {
+        connection_string: "connection_string",
+        database: "database",
+        bucket: "bucket",
+    };
+    const op = new Operator("gridfs", config);
+}
+```
+
+  </TabItem>
+  <TabItem value="python" label="Python">
+
+```python
+import opendal
+
+config = {
+    "connection_string": "connection_string",
+    "database": "database",
+    "bucket": "bucket",
+}
+
+op = opendal.Operator("gridfs", **config)
+```
+
+  </TabItem>
+</Tabs>
\ No newline at end of file

Reply via email to