This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new f3ef4bddc refactor(services/fs): extract implementation to core (#6317)
f3ef4bddc is described below

commit f3ef4bddc9034eb0e98190730ea3d76a05160bb2
Author: Erick Guan <297343+erickg...@users.noreply.github.com>
AuthorDate: Fri Jun 20 07:50:14 2025 +0200

    refactor(services/fs): extract implementation to core (#6317)
---
 core/src/services/fs/backend.rs           | 194 ++++--------------------------
 core/src/services/fs/core.rs              | 175 +++++++++++++++++++++++++++
 core/src/services/fs/delete.rs            |   1 -
 core/src/services/fs/{mod.rs => error.rs} |  30 ++---
 core/src/services/fs/mod.rs               |   2 +
 5 files changed, 211 insertions(+), 191 deletions(-)

diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index 593a00d25..08e679cd9 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -15,11 +15,9 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::io::SeekFrom;
 use std::path::PathBuf;
 use std::sync::Arc;
 
-use chrono::DateTime;
 use log::debug;
 
 use super::core::*;
@@ -124,8 +122,6 @@ impl Builder for FsBuilder {
                 ErrorKind::Unexpected,
                 "canonicalize of root directory failed",
             )
-            .with_operation("Builder::build")
-            .with_context("root", root.to_string_lossy())
             .set_source(e)
         })?;
 
@@ -202,35 +198,12 @@ impl Access for FsBackend {
     }
 
     async fn create_dir(&self, path: &str, _: OpCreateDir) -> 
Result<RpCreateDir> {
-        let p = self.core.root.join(path.trim_end_matches('/'));
-
-        tokio::fs::create_dir_all(&p)
-            .await
-            .map_err(new_std_io_error)?;
-
+        self.core.fs_create_dir(path).await?;
         Ok(RpCreateDir::default())
     }
 
     async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> {
-        let p = self.core.root.join(path.trim_end_matches('/'));
-
-        let meta = tokio::fs::metadata(&p).await.map_err(new_std_io_error)?;
-
-        let mode = if meta.is_dir() {
-            EntryMode::DIR
-        } else if meta.is_file() {
-            EntryMode::FILE
-        } else {
-            EntryMode::Unknown
-        };
-        let m = Metadata::new(mode)
-            .with_content_length(meta.len())
-            .with_last_modified(
-                meta.modified()
-                    .map(DateTime::from)
-                    .map_err(new_std_io_error)?,
-            );
-
+        let m = self.core.fs_stat(path).await?;
         Ok(RpStat::new(m))
     }
 
@@ -244,22 +217,7 @@ impl Access for FsBackend {
     ///
     /// Benchmark could be found 
[here](https://gist.github.com/Xuanwo/48f9cfbc3022ea5f865388bb62e1a70f)
     async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, 
Self::Reader)> {
-        let p = self.core.root.join(path.trim_end_matches('/'));
-
-        let mut f = tokio::fs::OpenOptions::new()
-            .read(true)
-            .open(&p)
-            .await
-            .map_err(new_std_io_error)?;
-
-        if args.range().offset() != 0 {
-            use tokio::io::AsyncSeekExt;
-
-            f.seek(SeekFrom::Start(args.range().offset()))
-                .await
-                .map_err(new_std_io_error)?;
-        }
-
+        let f = self.core.fs_read(path, &args).await?;
         let r = FsReader::new(
             self.core.clone(),
             f,
@@ -269,80 +227,25 @@ impl Access for FsBackend {
     }
 
     async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let (target_path, tmp_path) = if let Some(atomic_write_dir) = 
&self.core.atomic_write_dir {
-            let target_path = self
-                .core
-                .ensure_write_abs_path(&self.core.root, path)
-                .await?;
-            let tmp_path = self
-                .core
-                .ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path))
-                .await?;
-
-            // If the target file exists, we should append to the end of it 
directly.
-            if op.append()
-                && tokio::fs::try_exists(&target_path)
-                    .await
-                    .map_err(new_std_io_error)?
-            {
-                (target_path, None)
-            } else {
-                (target_path, Some(tmp_path))
-            }
-        } else {
-            let p = self
-                .core
-                .ensure_write_abs_path(&self.core.root, path)
-                .await?;
-
-            (p, None)
-        };
-
-        let mut open_options = tokio::fs::OpenOptions::new();
-        if op.if_not_exists() {
-            open_options.create_new(true);
-        } else {
-            open_options.create(true);
-        }
-
-        open_options.write(true);
-
-        if op.append() {
-            open_options.append(true);
-        } else {
-            open_options.truncate(true);
-        }
-
-        let f = open_options
-            .open(tmp_path.as_ref().unwrap_or(&target_path))
-            .await
-            .map_err(|e| {
-                match e.kind() {
-                    std::io::ErrorKind::AlreadyExists => {
-                        // Map io AlreadyExists to opendal ConditionNotMatch
-                        Error::new(
-                            ErrorKind::ConditionNotMatch,
-                            "The file already exists in the filesystem",
-                        )
-                        .set_source(e)
-                    }
-                    _ => new_std_io_error(e),
-                }
-            })?;
+        let (target_path, tmp_path) = self.core.prepare_write(path, 
&op).await?;
+        let file = self
+            .core
+            .fs_write(&target_path, tmp_path.as_ref(), &op)
+            .await?;
 
-        let w = FsWriter::new(target_path, tmp_path, f);
+        let writer = FsWriter::new(target_path, tmp_path, file);
 
-        let w = if op.append() {
-            FsWriters::One(w)
+        let writer = if op.append() {
+            FsWriters::One(writer)
         } else {
             FsWriters::Two(oio::PositionWriter::new(
                 self.info().clone(),
-                w,
+                writer,
                 op.concurrent(),
             ))
         };
 
-        Ok((RpWrite::default(), w))
+        Ok((RpWrite::default(), writer))
     }
 
     async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
@@ -353,77 +256,22 @@ impl Access for FsBackend {
     }
 
     async fn list(&self, path: &str, _: OpList) -> Result<(RpList, 
Self::Lister)> {
-        let p = self.core.root.join(path.trim_end_matches('/'));
-
-        let f = match tokio::fs::read_dir(&p).await {
-            Ok(rd) => rd,
-            Err(e) => {
-                return match e.kind() {
-                    // Return empty list if the directory not found
-                    std::io::ErrorKind::NotFound => Ok((RpList::default(), 
None)),
-                    // TODO: enable after our MSRV has been raised to 1.83
-                    //
-                    // If the path is not a directory, return an empty list
-                    //
-                    // The path could be a file or a symbolic link in this 
case.
-                    // Returning a NotADirectory error to the user isn't 
helpful; instead,
-                    // providing an empty directory is a more user-friendly. 
In fact, the dir
-                    // `path/` does not exist.
-                    // std::io::ErrorKind::NotADirectory => 
Ok((RpList::default(), None)),
-                    _ => {
-                        // TODO: remove this after we have MSRV 1.83
-                        #[cfg(unix)]
-                        if e.raw_os_error() == Some(20) {
-                            // On unix 20: Not a directory
-                            return Ok((RpList::default(), None));
-                        }
-                        #[cfg(windows)]
-                        if e.raw_os_error() == Some(267) {
-                            // On windows 267: DIRECTORY
-                            return Ok((RpList::default(), None));
-                        }
-
-                        Err(new_std_io_error(e))
-                    }
-                };
+        match self.core.fs_list(path).await? {
+            Some(f) => {
+                let rd = FsLister::new(&self.core.root, path, f);
+                Ok((RpList::default(), Some(rd)))
             }
-        };
-
-        let rd = FsLister::new(&self.core.root, path, f);
-        Ok((RpList::default(), Some(rd)))
+            None => Ok((RpList::default(), None)),
+        }
     }
 
     async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> 
Result<RpCopy> {
-        let from = self.core.root.join(from.trim_end_matches('/'));
-
-        // try to get the metadata of the source file to ensure it exists
-        tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
-
-        let to = self
-            .core
-            .ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))
-            .await?;
-
-        tokio::fs::copy(from, to).await.map_err(new_std_io_error)?;
-
+        self.core.fs_copy(from, to).await?;
         Ok(RpCopy::default())
     }
 
     async fn rename(&self, from: &str, to: &str, _args: OpRename) -> 
Result<RpRename> {
-        let from = self.core.root.join(from.trim_end_matches('/'));
-
-        // try to get the metadata of the source file to ensure it exists
-        tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
-
-        let to = self
-            .core
-            .ensure_write_abs_path(&self.core.root, to.trim_end_matches('/'))
-            .await?;
-
-        tokio::fs::rename(from, to)
-            .await
-            .map_err(new_std_io_error)?;
-
+        self.core.fs_rename(from, to).await?;
         Ok(RpRename::default())
     }
 }
diff --git a/core/src/services/fs/core.rs b/core/src/services/fs/core.rs
index 20a85e233..40cb50a63 100644
--- a/core/src/services/fs/core.rs
+++ b/core/src/services/fs/core.rs
@@ -15,12 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::io::SeekFrom;
 use std::path::Path;
 use std::path::PathBuf;
 use std::sync::Arc;
 
+use chrono::DateTime;
 use uuid::Uuid;
 
+use super::error::*;
 use crate::raw::*;
 use crate::*;
 
@@ -60,6 +63,178 @@ impl FsCore {
 
         Ok(p)
     }
+
+    pub async fn fs_create_dir(&self, path: &str) -> Result<()> {
+        let p = self.root.join(path.trim_end_matches('/'));
+        tokio::fs::create_dir_all(&p)
+            .await
+            .map_err(new_std_io_error)?;
+        Ok(())
+    }
+
+    pub async fn fs_stat(&self, path: &str) -> Result<Metadata> {
+        let p = self.root.join(path.trim_end_matches('/'));
+        let meta = tokio::fs::metadata(&p).await.map_err(new_std_io_error)?;
+
+        let mode = if meta.is_dir() {
+            EntryMode::DIR
+        } else if meta.is_file() {
+            EntryMode::FILE
+        } else {
+            EntryMode::Unknown
+        };
+        let m = Metadata::new(mode)
+            .with_content_length(meta.len())
+            .with_last_modified(
+                meta.modified()
+                    .map(DateTime::from)
+                    .map_err(new_std_io_error)?,
+            );
+
+        Ok(m)
+    }
+
+    pub async fn fs_read(&self, path: &str, args: &OpRead) -> 
Result<tokio::fs::File> {
+        let p = self.root.join(path.trim_end_matches('/'));
+
+        let mut f = tokio::fs::OpenOptions::new()
+            .read(true)
+            .open(&p)
+            .await
+            .map_err(new_std_io_error)?;
+
+        if args.range().offset() != 0 {
+            use tokio::io::AsyncSeekExt;
+            f.seek(SeekFrom::Start(args.range().offset()))
+                .await
+                .map_err(new_std_io_error)?;
+        }
+
+        Ok(f)
+    }
+
+    pub async fn prepare_write(
+        &self,
+        path: &str,
+        op: &OpWrite,
+    ) -> Result<(PathBuf, Option<PathBuf>)> {
+        let (target_path, tmp_path) = if let Some(atomic_write_dir) = 
&self.atomic_write_dir {
+            let target_path = self.ensure_write_abs_path(&self.root, 
path).await?;
+            let tmp_path = self
+                .ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path))
+                .await?;
+
+            // If the target file exists, we should append to the end of it 
directly.
+            if op.append()
+                && tokio::fs::try_exists(&target_path)
+                    .await
+                    .map_err(new_std_io_error)?
+            {
+                (target_path, None)
+            } else {
+                (target_path, Some(tmp_path))
+            }
+        } else {
+            let p = self.ensure_write_abs_path(&self.root, path).await?;
+            (p, None)
+        };
+
+        Ok((target_path, tmp_path))
+    }
+
+    pub async fn fs_write(
+        &self,
+        target_path: &PathBuf,
+        tmp_path: Option<&PathBuf>,
+        op: &OpWrite,
+    ) -> Result<tokio::fs::File> {
+        let mut open_options = tokio::fs::OpenOptions::new();
+        if op.if_not_exists() {
+            open_options.create_new(true);
+        } else {
+            open_options.create(true);
+        }
+
+        open_options.write(true);
+
+        if op.append() {
+            open_options.append(true);
+        } else {
+            open_options.truncate(true);
+        }
+
+        let f = open_options
+            .open(tmp_path.unwrap_or(target_path))
+            .await
+            .map_err(parse_error)?;
+
+        Ok(f)
+    }
+
+    pub async fn fs_list(&self, path: &str) -> 
Result<Option<tokio::fs::ReadDir>> {
+        let p = self.root.join(path.trim_end_matches('/'));
+
+        match tokio::fs::read_dir(&p).await {
+            Ok(rd) => Ok(Some(rd)),
+            Err(e) => {
+                match e.kind() {
+                    // Return empty list if the directory not found
+                    std::io::ErrorKind::NotFound => Ok(None),
+                    // TODO: enable after our MSRV has been raised to 1.83
+                    //
+                    // If the path is not a directory, return an empty list
+                    //
+                    // The path could be a file or a symbolic link in this 
case.
+                    // Returning a NotADirectory error to the user isn't 
helpful; instead,
+                    // providing an empty directory is a more user-friendly. 
In fact, the dir
+                    // `path/` does not exist.
+                    // std::io::ErrorKind::NotADirectory => 
Ok((RpList::default(), None)),
+                    _ => {
+                        // TODO: remove this after we have MSRV 1.83
+                        #[cfg(unix)]
+                        if e.raw_os_error() == Some(20) {
+                            // On unix 20: Not a directory
+                            return Ok(None);
+                        }
+                        #[cfg(windows)]
+                        if e.raw_os_error() == Some(267) {
+                            // On windows 267: DIRECTORY
+                            return Ok(None);
+                        }
+
+                        Err(new_std_io_error(e))
+                    }
+                }
+            }
+        }
+    }
+
+    pub async fn fs_copy(&self, from: &str, to: &str) -> Result<()> {
+        let from = self.root.join(from.trim_end_matches('/'));
+        // try to get the metadata of the source file to ensure it exists
+        tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
+
+        let to = self
+            .ensure_write_abs_path(&self.root, to.trim_end_matches('/'))
+            .await?;
+
+        tokio::fs::copy(from, to).await.map_err(new_std_io_error)?;
+        Ok(())
+    }
+
+    pub async fn fs_rename(&self, from: &str, to: &str) -> Result<()> {
+        let from = self.root.join(from.trim_end_matches('/'));
+        tokio::fs::metadata(&from).await.map_err(new_std_io_error)?;
+
+        let to = self
+            .ensure_write_abs_path(&self.root, to.trim_end_matches('/'))
+            .await?;
+
+        tokio::fs::rename(from, to)
+            .await
+            .map_err(new_std_io_error)?;
+        Ok(())
+    }
 }
 
 #[inline]
diff --git a/core/src/services/fs/delete.rs b/core/src/services/fs/delete.rs
index f14e6d13e..cd896aae9 100644
--- a/core/src/services/fs/delete.rs
+++ b/core/src/services/fs/delete.rs
@@ -44,7 +44,6 @@ impl oio::OneShotDelete for FsDeleter {
                 } else {
                     
tokio::fs::remove_file(&p).await.map_err(new_std_io_error)?;
                 }
-
                 Ok(())
             }
             Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
diff --git a/core/src/services/fs/mod.rs b/core/src/services/fs/error.rs
similarity index 67%
copy from core/src/services/fs/mod.rs
copy to core/src/services/fs/error.rs
index caf858386..b22599752 100644
--- a/core/src/services/fs/mod.rs
+++ b/core/src/services/fs/error.rs
@@ -15,21 +15,17 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#[cfg(feature = "services-fs")]
-mod core;
-#[cfg(feature = "services-fs")]
-mod delete;
-#[cfg(feature = "services-fs")]
-mod lister;
-#[cfg(feature = "services-fs")]
-mod reader;
-#[cfg(feature = "services-fs")]
-mod writer;
+use crate::raw::*;
+use crate::*;
 
-#[cfg(feature = "services-fs")]
-mod backend;
-#[cfg(feature = "services-fs")]
-pub use backend::FsBuilder as Fs;
-
-mod config;
-pub use config::FsConfig;
+/// Parse error response into Error.
+pub(super) fn parse_error(e: std::io::Error) -> Error {
+    match e.kind() {
+        std::io::ErrorKind::AlreadyExists => Error::new(
+            ErrorKind::ConditionNotMatch,
+            "The file already exists in the filesystem",
+        )
+        .set_source(e),
+        _ => new_std_io_error(e),
+    }
+}
diff --git a/core/src/services/fs/mod.rs b/core/src/services/fs/mod.rs
index caf858386..cbfefd3cf 100644
--- a/core/src/services/fs/mod.rs
+++ b/core/src/services/fs/mod.rs
@@ -20,6 +20,8 @@ mod core;
 #[cfg(feature = "services-fs")]
 mod delete;
 #[cfg(feature = "services-fs")]
+mod error;
+#[cfg(feature = "services-fs")]
 mod lister;
 #[cfg(feature = "services-fs")]
 mod reader;

Reply via email to