This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new bfe5c9847 refactor: migrate gridfs service from adapter::kv to impl 
Access directly (#6726)
bfe5c9847 is described below

commit bfe5c9847a7a022a067d4570c9964f1bcb8eb578
Author: Qinxuan Chen <[email protected]>
AuthorDate: Wed Oct 22 17:33:06 2025 +0800

    refactor: migrate gridfs service from adapter::kv to impl Access directly 
(#6726)
---
 core/src/services/gridfs/backend.rs             | 112 ++++++++++++++++++++----
 core/src/services/gridfs/config.rs              |   4 +-
 core/src/services/gridfs/core.rs                |  39 ++-------
 core/src/services/gridfs/{mod.rs => deleter.rs} |  29 ++++--
 core/src/services/gridfs/docs.md                |   5 +-
 core/src/services/gridfs/mod.rs                 |   5 +-
 core/src/services/gridfs/writer.rs              |  59 +++++++++++++
 7 files changed, 197 insertions(+), 56 deletions(-)

diff --git a/core/src/services/gridfs/backend.rs 
b/core/src/services/gridfs/backend.rs
index 85eb3c268..81948f1cd 100644
--- a/core/src/services/gridfs/backend.rs
+++ b/core/src/services/gridfs/backend.rs
@@ -15,15 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::fmt::Debug;
-use std::fmt::Formatter;
+use std::sync::Arc;
 
-use mongodb::bson::doc;
 use tokio::sync::OnceCell;
 
 use super::config::GridfsConfig;
-use super::core::GridFsCore;
-use crate::raw::adapters::kv;
+use super::core::*;
+use super::deleter::GridfsDeleter;
+use super::writer::GridfsWriter;
 use crate::raw::*;
 use crate::*;
 
@@ -33,14 +32,6 @@ pub struct GridfsBuilder {
     pub(super) config: GridfsConfig,
 }
 
-impl Debug for GridfsBuilder {
-    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
-        let mut d = f.debug_struct("GridFsBuilder");
-        d.field("config", &self.config);
-        d.finish_non_exhaustive()
-    }
-}
-
 impl GridfsBuilder {
     /// Set the connection_string of the MongoDB service.
     ///
@@ -145,7 +136,7 @@ impl Builder for GridfsBuilder {
                 .as_str(),
         );
 
-        Ok(GridFsBackend::new(GridFsCore {
+        Ok(GridfsBackend::new(GridfsCore {
             connection_string: conn,
             database,
             bucket,
@@ -156,4 +147,95 @@ impl Builder for GridfsBuilder {
     }
 }
 
-pub type GridFsBackend = kv::Backend<GridFsCore>;
+/// Backend for Gridfs services.
+#[derive(Clone, Debug)]
+pub struct GridfsBackend {
+    core: Arc<GridfsCore>,
+    root: String,
+    info: Arc<AccessorInfo>,
+}
+
+impl GridfsBackend {
+    pub fn new(core: GridfsCore) -> Self {
+        let info = AccessorInfo::default();
+        info.set_scheme(Scheme::Gridfs.into_static());
+        info.set_name(&format!("{}/{}", core.database, core.bucket));
+        info.set_root("/");
+        info.set_native_capability(Capability {
+            read: true,
+            stat: true,
+            write: true,
+            write_can_empty: true,
+            delete: true,
+            shared: true,
+            ..Default::default()
+        });
+
+        Self {
+            core: Arc::new(core),
+            root: "/".to_string(),
+            info: Arc::new(info),
+        }
+    }
+
+    fn with_normalized_root(mut self, root: String) -> Self {
+        self.info.set_root(&root);
+        self.root = root;
+        self
+    }
+}
+
+impl Access for GridfsBackend {
+    type Reader = Buffer;
+    type Writer = GridfsWriter;
+    type Lister = ();
+    type Deleter = oio::OneShotDeleter<GridfsDeleter>;
+
+    fn info(&self) -> Arc<AccessorInfo> {
+        self.info.clone()
+    }
+
+    async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
+        let p = build_abs_path(&self.root, path);
+
+        if p == build_abs_path(&self.root, "") {
+            Ok(RpStat::new(Metadata::new(EntryMode::DIR)))
+        } else {
+            let bs = self.core.get(&p).await?;
+            match bs {
+                Some(bs) => Ok(RpStat::new(
+                    
Metadata::new(EntryMode::FILE).with_content_length(bs.len() as u64),
+                )),
+                None => Err(Error::new(ErrorKind::NotFound, "kv not found in 
gridfs")),
+            }
+        }
+    }
+
+    async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
+        let p = build_abs_path(&self.root, path);
+        let bs = match self.core.get(&p).await? {
+            Some(bs) => bs,
+            None => {
+                return Err(Error::new(ErrorKind::NotFound, "kv not found in 
gridfs"));
+            }
+        };
+        Ok((RpRead::new(), bs.slice(args.range().to_range_as_usize())))
+    }
+
+    async fn write(&self, path: &str, _: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
+        let p = build_abs_path(&self.root, path);
+        Ok((RpWrite::new(), GridfsWriter::new(self.core.clone(), p)))
+    }
+
+    async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
+        Ok((
+            RpDelete::default(),
+            oio::OneShotDeleter::new(GridfsDeleter::new(self.core.clone(), 
self.root.clone())),
+        ))
+    }
+
+    async fn list(&self, path: &str, _: OpList) -> Result<(RpList, 
Self::Lister)> {
+        let _ = build_abs_path(&self.root, path);
+        Ok((RpList::default(), ()))
+    }
+}
diff --git a/core/src/services/gridfs/config.rs 
b/core/src/services/gridfs/config.rs
index 03db7752a..4f92c01b1 100644
--- a/core/src/services/gridfs/config.rs
+++ b/core/src/services/gridfs/config.rs
@@ -18,10 +18,11 @@
 use std::fmt::Debug;
 use std::fmt::Formatter;
 
-use super::backend::GridfsBuilder;
 use serde::Deserialize;
 use serde::Serialize;
 
+use super::backend::GridfsBuilder;
+
 /// Config for Grid file system support.
 #[derive(Default, Serialize, Deserialize, Clone, PartialEq, Eq)]
 #[serde(default)]
@@ -52,6 +53,7 @@ impl Debug for GridfsConfig {
 
 impl crate::Configurator for GridfsConfig {
     type Builder = GridfsBuilder;
+
     fn from_uri(uri: &crate::types::OperatorUri) -> crate::Result<Self> {
         let mut map = uri.options().clone();
 
diff --git a/core/src/services/gridfs/core.rs b/core/src/services/gridfs/core.rs
index 0b4f4b194..4861e11bd 100644
--- a/core/src/services/gridfs/core.rs
+++ b/core/src/services/gridfs/core.rs
@@ -26,17 +26,11 @@ use mongodb::options::ClientOptions;
 use mongodb::options::GridFsBucketOptions;
 use tokio::sync::OnceCell;
 
-use crate::Buffer;
-use crate::Capability;
-use crate::Error;
-use crate::ErrorKind;
-use crate::Result;
-use crate::Scheme;
-use crate::raw::adapters::kv;
-use crate::raw::new_std_io_error;
+use crate::raw::*;
+use crate::*;
 
 #[derive(Clone)]
-pub struct GridFsCore {
+pub struct GridfsCore {
     pub connection_string: String,
     pub database: String,
     pub bucket: String,
@@ -44,7 +38,7 @@ pub struct GridFsCore {
     pub bucket_instance: OnceCell<GridFsBucket>,
 }
 
-impl Debug for GridFsCore {
+impl Debug for GridfsCore {
     fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
         f.debug_struct("GridfsCore")
             .field("database", &self.database)
@@ -54,7 +48,7 @@ impl Debug for GridFsCore {
     }
 }
 
-impl GridFsCore {
+impl GridfsCore {
     async fn get_bucket(&self) -> Result<&GridFsBucket> {
         self.bucket_instance
             .get_or_try_init(|| async {
@@ -74,25 +68,8 @@ impl GridFsCore {
             })
             .await
     }
-}
-
-impl kv::Adapter for GridFsCore {
-    type Scanner = (); // Replace with the actual Scanner type.
-
-    fn info(&self) -> kv::Info {
-        kv::Info::new(
-            Scheme::Gridfs,
-            &format!("{}/{}", self.database, self.bucket),
-            Capability {
-                read: true,
-                write: true,
-                shared: true,
-                ..Default::default()
-            },
-        )
-    }
 
-    async fn get(&self, path: &str) -> Result<Option<Buffer>> {
+    pub async fn get(&self, path: &str) -> Result<Option<Buffer>> {
         let bucket = self.get_bucket().await?;
         let filter = doc! { "filename": path };
         let Some(doc) = 
bucket.find_one(filter).await.map_err(parse_mongodb_error)? else {
@@ -112,7 +89,7 @@ impl kv::Adapter for GridFsCore {
         Ok(Some(Buffer::from(destination)))
     }
 
-    async fn set(&self, path: &str, value: Buffer) -> Result<()> {
+    pub async fn set(&self, path: &str, value: Buffer) -> Result<()> {
         let bucket = self.get_bucket().await?;
 
         // delete old file if exists
@@ -136,7 +113,7 @@ impl kv::Adapter for GridFsCore {
         Ok(())
     }
 
-    async fn delete(&self, path: &str) -> Result<()> {
+    pub async fn delete(&self, path: &str) -> Result<()> {
         let bucket = self.get_bucket().await?;
         let filter = doc! { "filename": path };
         let Some(doc) = 
bucket.find_one(filter).await.map_err(parse_mongodb_error)? else {
diff --git a/core/src/services/gridfs/mod.rs 
b/core/src/services/gridfs/deleter.rs
similarity index 60%
copy from core/src/services/gridfs/mod.rs
copy to core/src/services/gridfs/deleter.rs
index f1c7c7453..798041872 100644
--- a/core/src/services/gridfs/mod.rs
+++ b/core/src/services/gridfs/deleter.rs
@@ -15,9 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
-mod backend;
-pub use backend::GridfsBuilder as Gridfs;
-mod core;
+use std::sync::Arc;
 
-mod config;
-pub use config::GridfsConfig;
+use super::core::*;
+use crate::raw::oio;
+use crate::raw::*;
+use crate::*;
+
+pub struct GridfsDeleter {
+    core: Arc<GridfsCore>,
+    root: String,
+}
+
+impl GridfsDeleter {
+    pub fn new(core: Arc<GridfsCore>, root: String) -> Self {
+        Self { core, root }
+    }
+}
+
+impl oio::OneShotDelete for GridfsDeleter {
+    async fn delete_once(&self, path: String, _: OpDelete) -> Result<()> {
+        let p = build_abs_path(&self.root, &path);
+        self.core.delete(&p).await?;
+        Ok(())
+    }
+}
diff --git a/core/src/services/gridfs/docs.md b/core/src/services/gridfs/docs.md
index 74bbae06d..720bf1024 100644
--- a/core/src/services/gridfs/docs.md
+++ b/core/src/services/gridfs/docs.md
@@ -2,16 +2,15 @@
 
 This service can be used to:
 
+- [ ] create_dir
 - [x] stat
 - [x] read
 - [x] write
-- [x] create_dir
 - [x] delete
 - [ ] copy
 - [ ] rename
-- [ ] ~~list~~
+- [ ] list
 - [ ] ~~presign~~
-- [ ] blocking
 
 ## Configuration
 
diff --git a/core/src/services/gridfs/mod.rs b/core/src/services/gridfs/mod.rs
index f1c7c7453..8458f5b6b 100644
--- a/core/src/services/gridfs/mod.rs
+++ b/core/src/services/gridfs/mod.rs
@@ -16,8 +16,11 @@
 // under the License.
 
 mod backend;
-pub use backend::GridfsBuilder as Gridfs;
 mod core;
+mod deleter;
+mod writer;
+
+pub use backend::GridfsBuilder as Gridfs;
 
 mod config;
 pub use config::GridfsConfig;
diff --git a/core/src/services/gridfs/writer.rs 
b/core/src/services/gridfs/writer.rs
new file mode 100644
index 000000000..f1249a816
--- /dev/null
+++ b/core/src/services/gridfs/writer.rs
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use super::core::*;
+use crate::raw::oio;
+use crate::*;
+
+pub struct GridfsWriter {
+    core: Arc<GridfsCore>,
+    path: String,
+    buffer: oio::QueueBuf,
+}
+
+impl GridfsWriter {
+    pub fn new(core: Arc<GridfsCore>, path: String) -> Self {
+        Self {
+            core,
+            path,
+            buffer: oio::QueueBuf::new(),
+        }
+    }
+}
+
+impl oio::Write for GridfsWriter {
+    async fn write(&mut self, bs: Buffer) -> Result<()> {
+        self.buffer.push(bs);
+        Ok(())
+    }
+
+    async fn close(&mut self) -> Result<Metadata> {
+        let buf = self.buffer.clone().collect();
+        let length = buf.len() as u64;
+        self.core.set(&self.path, buf).await?;
+
+        let meta = 
Metadata::new(EntryMode::from_path(&self.path)).with_content_length(length);
+        Ok(meta)
+    }
+
+    async fn abort(&mut self) -> Result<()> {
+        self.buffer.clear();
+        Ok(())
+    }
+}

Reply via email to