This is an automated email from the ASF dual-hosted git repository. xuanwo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push: new f3ef4bddc refactor(services/fs): extract implementation to core (#6317) f3ef4bddc is described below commit f3ef4bddc9034eb0e98190730ea3d76a05160bb2 Author: Erick Guan <297343+erickg...@users.noreply.github.com> AuthorDate: Fri Jun 20 07:50:14 2025 +0200 refactor(services/fs): extract implementation to core (#6317) --- core/src/services/fs/backend.rs | 194 ++++-------------------------- core/src/services/fs/core.rs | 175 +++++++++++++++++++++++++++ core/src/services/fs/delete.rs | 1 - core/src/services/fs/{mod.rs => error.rs} | 30 ++--- core/src/services/fs/mod.rs | 2 + 5 files changed, 211 insertions(+), 191 deletions(-) diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs index 593a00d25..08e679cd9 100644 --- a/core/src/services/fs/backend.rs +++ b/core/src/services/fs/backend.rs @@ -15,11 +15,9 @@ // specific language governing permissions and limitations // under the License. -use std::io::SeekFrom; use std::path::PathBuf; use std::sync::Arc; -use chrono::DateTime; use log::debug; use super::core::*; @@ -124,8 +122,6 @@ impl Builder for FsBuilder { ErrorKind::Unexpected, "canonicalize of root directory failed", ) - .with_operation("Builder::build") - .with_context("root", root.to_string_lossy()) .set_source(e) })?; @@ -202,35 +198,12 @@ impl Access for FsBackend { } async fn create_dir(&self, path: &str, _: OpCreateDir) -> Result<RpCreateDir> { - let p = self.core.root.join(path.trim_end_matches('/')); - - tokio::fs::create_dir_all(&p) - .await - .map_err(new_std_io_error)?; - + self.core.fs_create_dir(path).await?; Ok(RpCreateDir::default()) } async fn stat(&self, path: &str, _: OpStat) -> Result<RpStat> { - let p = self.core.root.join(path.trim_end_matches('/')); - - let meta = tokio::fs::metadata(&p).await.map_err(new_std_io_error)?; - - let mode = if meta.is_dir() { - EntryMode::DIR - } else if meta.is_file() { - EntryMode::FILE - } else { - EntryMode::Unknown - }; - let m = Metadata::new(mode) - .with_content_length(meta.len()) - .with_last_modified( - meta.modified() - .map(DateTime::from) - .map_err(new_std_io_error)?, - ); - + let m = self.core.fs_stat(path).await?; Ok(RpStat::new(m)) } @@ -244,22 +217,7 @@ impl Access for FsBackend { /// /// Benchmark could be found [here](https://gist.github.com/Xuanwo/48f9cfbc3022ea5f865388bb62e1a70f) async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - let p = self.core.root.join(path.trim_end_matches('/')); - - let mut f = tokio::fs::OpenOptions::new() - .read(true) - .open(&p) - .await - .map_err(new_std_io_error)?; - - if args.range().offset() != 0 { - use tokio::io::AsyncSeekExt; - - f.seek(SeekFrom::Start(args.range().offset())) - .await - .map_err(new_std_io_error)?; - } - + let f = self.core.fs_read(path, &args).await?; let r = FsReader::new( self.core.clone(), f, @@ -269,80 +227,25 @@ impl Access for FsBackend { } async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, Self::Writer)> { - let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.core.atomic_write_dir { - let target_path = self - .core - .ensure_write_abs_path(&self.core.root, path) - .await?; - let tmp_path = self - .core - .ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path)) - .await?; - - // If the target file exists, we should append to the end of it directly. - if op.append() - && tokio::fs::try_exists(&target_path) - .await - .map_err(new_std_io_error)? - { - (target_path, None) - } else { - (target_path, Some(tmp_path)) - } - } else { - let p = self - .core - .ensure_write_abs_path(&self.core.root, path) - .await?; - - (p, None) - }; - - let mut open_options = tokio::fs::OpenOptions::new(); - if op.if_not_exists() { - open_options.create_new(true); - } else { - open_options.create(true); - } - - open_options.write(true); - - if op.append() { - open_options.append(true); - } else { - open_options.truncate(true); - } - - let f = open_options - .open(tmp_path.as_ref().unwrap_or(&target_path)) - .await - .map_err(|e| { - match e.kind() { - std::io::ErrorKind::AlreadyExists => { - // Map io AlreadyExists to opendal ConditionNotMatch - Error::new( - ErrorKind::ConditionNotMatch, - "The file already exists in the filesystem", - ) - .set_source(e) - } - _ => new_std_io_error(e), - } - })?; + let (target_path, tmp_path) = self.core.prepare_write(path, &op).await?; + let file = self + .core + .fs_write(&target_path, tmp_path.as_ref(), &op) + .await?; - let w = FsWriter::new(target_path, tmp_path, f); + let writer = FsWriter::new(target_path, tmp_path, file); - let w = if op.append() { - FsWriters::One(w) + let writer = if op.append() { + FsWriters::One(writer) } else { FsWriters::Two(oio::PositionWriter::new( self.info().clone(), - w, + writer, op.concurrent(), )) }; - Ok((RpWrite::default(), w)) + Ok((RpWrite::default(), writer)) } async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> { @@ -353,77 +256,22 @@ impl Access for FsBackend { } async fn list(&self, path: &str, _: OpList) -> Result<(RpList, Self::Lister)> { - let p = self.core.root.join(path.trim_end_matches('/')); - - let f = match tokio::fs::read_dir(&p).await { - Ok(rd) => rd, - Err(e) => { - return match e.kind() { - // Return empty list if the directory not found - std::io::ErrorKind::NotFound => Ok((RpList::default(), None)), - // TODO: enable after our MSRV has been raised to 1.83 - // - // If the path is not a directory, return an empty list - // - // The path could be a file or a symbolic link in this case. - // Returning a NotADirectory error to the user isn't helpful; instead, - // providing an empty directory is a more user-friendly. In fact, the dir - // `path/` does not exist. - // std::io::ErrorKind::NotADirectory => Ok((RpList::default(), None)), - _ => { - // TODO: remove this after we have MSRV 1.83 - #[cfg(unix)] - if e.raw_os_error() == Some(20) { - // On unix 20: Not a directory - return Ok((RpList::default(), None)); - } - #[cfg(windows)] - if e.raw_os_error() == Some(267) { - // On windows 267: DIRECTORY - return Ok((RpList::default(), None)); - } - - Err(new_std_io_error(e)) - } - }; + match self.core.fs_list(path).await? { + Some(f) => { + let rd = FsLister::new(&self.core.root, path, f); + Ok((RpList::default(), Some(rd))) } - }; - - let rd = FsLister::new(&self.core.root, path, f); - Ok((RpList::default(), Some(rd))) + None => Ok((RpList::default(), None)), + } } async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result<RpCopy> { - let from = self.core.root.join(from.trim_end_matches('/')); - - // try to get the metadata of the source file to ensure it exists - tokio::fs::metadata(&from).await.map_err(new_std_io_error)?; - - let to = self - .core - .ensure_write_abs_path(&self.core.root, to.trim_end_matches('/')) - .await?; - - tokio::fs::copy(from, to).await.map_err(new_std_io_error)?; - + self.core.fs_copy(from, to).await?; Ok(RpCopy::default()) } async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result<RpRename> { - let from = self.core.root.join(from.trim_end_matches('/')); - - // try to get the metadata of the source file to ensure it exists - tokio::fs::metadata(&from).await.map_err(new_std_io_error)?; - - let to = self - .core - .ensure_write_abs_path(&self.core.root, to.trim_end_matches('/')) - .await?; - - tokio::fs::rename(from, to) - .await - .map_err(new_std_io_error)?; - + self.core.fs_rename(from, to).await?; Ok(RpRename::default()) } } diff --git a/core/src/services/fs/core.rs b/core/src/services/fs/core.rs index 20a85e233..40cb50a63 100644 --- a/core/src/services/fs/core.rs +++ b/core/src/services/fs/core.rs @@ -15,12 +15,15 @@ // specific language governing permissions and limitations // under the License. +use std::io::SeekFrom; use std::path::Path; use std::path::PathBuf; use std::sync::Arc; +use chrono::DateTime; use uuid::Uuid; +use super::error::*; use crate::raw::*; use crate::*; @@ -60,6 +63,178 @@ impl FsCore { Ok(p) } + + pub async fn fs_create_dir(&self, path: &str) -> Result<()> { + let p = self.root.join(path.trim_end_matches('/')); + tokio::fs::create_dir_all(&p) + .await + .map_err(new_std_io_error)?; + Ok(()) + } + + pub async fn fs_stat(&self, path: &str) -> Result<Metadata> { + let p = self.root.join(path.trim_end_matches('/')); + let meta = tokio::fs::metadata(&p).await.map_err(new_std_io_error)?; + + let mode = if meta.is_dir() { + EntryMode::DIR + } else if meta.is_file() { + EntryMode::FILE + } else { + EntryMode::Unknown + }; + let m = Metadata::new(mode) + .with_content_length(meta.len()) + .with_last_modified( + meta.modified() + .map(DateTime::from) + .map_err(new_std_io_error)?, + ); + + Ok(m) + } + + pub async fn fs_read(&self, path: &str, args: &OpRead) -> Result<tokio::fs::File> { + let p = self.root.join(path.trim_end_matches('/')); + + let mut f = tokio::fs::OpenOptions::new() + .read(true) + .open(&p) + .await + .map_err(new_std_io_error)?; + + if args.range().offset() != 0 { + use tokio::io::AsyncSeekExt; + f.seek(SeekFrom::Start(args.range().offset())) + .await + .map_err(new_std_io_error)?; + } + + Ok(f) + } + + pub async fn prepare_write( + &self, + path: &str, + op: &OpWrite, + ) -> Result<(PathBuf, Option<PathBuf>)> { + let (target_path, tmp_path) = if let Some(atomic_write_dir) = &self.atomic_write_dir { + let target_path = self.ensure_write_abs_path(&self.root, path).await?; + let tmp_path = self + .ensure_write_abs_path(atomic_write_dir, &tmp_file_of(path)) + .await?; + + // If the target file exists, we should append to the end of it directly. + if op.append() + && tokio::fs::try_exists(&target_path) + .await + .map_err(new_std_io_error)? + { + (target_path, None) + } else { + (target_path, Some(tmp_path)) + } + } else { + let p = self.ensure_write_abs_path(&self.root, path).await?; + (p, None) + }; + + Ok((target_path, tmp_path)) + } + + pub async fn fs_write( + &self, + target_path: &PathBuf, + tmp_path: Option<&PathBuf>, + op: &OpWrite, + ) -> Result<tokio::fs::File> { + let mut open_options = tokio::fs::OpenOptions::new(); + if op.if_not_exists() { + open_options.create_new(true); + } else { + open_options.create(true); + } + + open_options.write(true); + + if op.append() { + open_options.append(true); + } else { + open_options.truncate(true); + } + + let f = open_options + .open(tmp_path.unwrap_or(target_path)) + .await + .map_err(parse_error)?; + + Ok(f) + } + + pub async fn fs_list(&self, path: &str) -> Result<Option<tokio::fs::ReadDir>> { + let p = self.root.join(path.trim_end_matches('/')); + + match tokio::fs::read_dir(&p).await { + Ok(rd) => Ok(Some(rd)), + Err(e) => { + match e.kind() { + // Return empty list if the directory not found + std::io::ErrorKind::NotFound => Ok(None), + // TODO: enable after our MSRV has been raised to 1.83 + // + // If the path is not a directory, return an empty list + // + // The path could be a file or a symbolic link in this case. + // Returning a NotADirectory error to the user isn't helpful; instead, + // providing an empty directory is a more user-friendly. In fact, the dir + // `path/` does not exist. + // std::io::ErrorKind::NotADirectory => Ok((RpList::default(), None)), + _ => { + // TODO: remove this after we have MSRV 1.83 + #[cfg(unix)] + if e.raw_os_error() == Some(20) { + // On unix 20: Not a directory + return Ok(None); + } + #[cfg(windows)] + if e.raw_os_error() == Some(267) { + // On windows 267: DIRECTORY + return Ok(None); + } + + Err(new_std_io_error(e)) + } + } + } + } + } + + pub async fn fs_copy(&self, from: &str, to: &str) -> Result<()> { + let from = self.root.join(from.trim_end_matches('/')); + // try to get the metadata of the source file to ensure it exists + tokio::fs::metadata(&from).await.map_err(new_std_io_error)?; + + let to = self + .ensure_write_abs_path(&self.root, to.trim_end_matches('/')) + .await?; + + tokio::fs::copy(from, to).await.map_err(new_std_io_error)?; + Ok(()) + } + + pub async fn fs_rename(&self, from: &str, to: &str) -> Result<()> { + let from = self.root.join(from.trim_end_matches('/')); + tokio::fs::metadata(&from).await.map_err(new_std_io_error)?; + + let to = self + .ensure_write_abs_path(&self.root, to.trim_end_matches('/')) + .await?; + + tokio::fs::rename(from, to) + .await + .map_err(new_std_io_error)?; + Ok(()) + } } #[inline] diff --git a/core/src/services/fs/delete.rs b/core/src/services/fs/delete.rs index f14e6d13e..cd896aae9 100644 --- a/core/src/services/fs/delete.rs +++ b/core/src/services/fs/delete.rs @@ -44,7 +44,6 @@ impl oio::OneShotDelete for FsDeleter { } else { tokio::fs::remove_file(&p).await.map_err(new_std_io_error)?; } - Ok(()) } Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()), diff --git a/core/src/services/fs/mod.rs b/core/src/services/fs/error.rs similarity index 67% copy from core/src/services/fs/mod.rs copy to core/src/services/fs/error.rs index caf858386..b22599752 100644 --- a/core/src/services/fs/mod.rs +++ b/core/src/services/fs/error.rs @@ -15,21 +15,17 @@ // specific language governing permissions and limitations // under the License. -#[cfg(feature = "services-fs")] -mod core; -#[cfg(feature = "services-fs")] -mod delete; -#[cfg(feature = "services-fs")] -mod lister; -#[cfg(feature = "services-fs")] -mod reader; -#[cfg(feature = "services-fs")] -mod writer; +use crate::raw::*; +use crate::*; -#[cfg(feature = "services-fs")] -mod backend; -#[cfg(feature = "services-fs")] -pub use backend::FsBuilder as Fs; - -mod config; -pub use config::FsConfig; +/// Parse error response into Error. +pub(super) fn parse_error(e: std::io::Error) -> Error { + match e.kind() { + std::io::ErrorKind::AlreadyExists => Error::new( + ErrorKind::ConditionNotMatch, + "The file already exists in the filesystem", + ) + .set_source(e), + _ => new_std_io_error(e), + } +} diff --git a/core/src/services/fs/mod.rs b/core/src/services/fs/mod.rs index caf858386..cbfefd3cf 100644 --- a/core/src/services/fs/mod.rs +++ b/core/src/services/fs/mod.rs @@ -20,6 +20,8 @@ mod core; #[cfg(feature = "services-fs")] mod delete; #[cfg(feature = "services-fs")] +mod error; +#[cfg(feature = "services-fs")] mod lister; #[cfg(feature = "services-fs")] mod reader;