This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new c5258fdfa fix(service/fs): handle if_not_exists flag to raise 
ConditionNotMatch error (#6326)
c5258fdfa is described below

commit c5258fdfa16d9b0dcafc3c1b2be414e0c3930850
Author: Kingsword <[email protected]>
AuthorDate: Wed Jun 25 13:45:11 2025 +0800

    fix(service/fs): handle if_not_exists flag to raise ConditionNotMatch error 
(#6326)
    
    * fix(service/fs): handle if_not_exists flag to raise ConditionNotMatch 
error
    
    * refactor: simplify atomic write path handling logic
    
    * chore: format code
    
    * fix: if_not_exists
    
    * refactor fs write logic
    
    Signed-off-by: Xuanwo <[email protected]>
    
    ---------
    
    Signed-off-by: Xuanwo <[email protected]>
    Co-authored-by: Xuanwo <[email protected]>
---
 core/src/services/fs/backend.rs | 13 +++---
 core/src/services/fs/core.rs    | 64 ++++++++++++--------------
 core/src/services/fs/writer.rs  | 99 ++++++++++++++++++++++-------------------
 3 files changed, 88 insertions(+), 88 deletions(-)

diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index 9f6762d6e..d7d50107f 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -227,21 +227,18 @@ impl Access for FsBackend {
     }
 
     async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite, 
Self::Writer)> {
-        let (target_path, tmp_path) = self.core.prepare_write(path, 
&op).await?;
-        let file = self
-            .core
-            .fs_write(&target_path, tmp_path.as_ref(), &op)
-            .await?;
+        let is_append = op.append();
+        let concurrent = op.concurrent();
 
-        let writer = FsWriter::new(target_path, tmp_path, file);
+        let writer = FsWriter::create(self.core.clone(), path, op).await?;
 
-        let writer = if op.append() {
+        let writer = if is_append {
             FsWriters::One(writer)
         } else {
             FsWriters::Two(oio::PositionWriter::new(
                 self.info().clone(),
                 writer,
-                op.concurrent(),
+                concurrent,
             ))
         };
 
diff --git a/core/src/services/fs/core.rs b/core/src/services/fs/core.rs
index 9777f832e..fa5a9d102 100644
--- a/core/src/services/fs/core.rs
+++ b/core/src/services/fs/core.rs
@@ -112,37 +112,7 @@ impl FsCore {
         Ok(f)
     }
 
-    pub async fn prepare_write(
-        &self,
-        path: &str,
-        op: &OpWrite,
-    ) -> Result<(PathBuf, Option<PathBuf>)> {
-        if let Some(atomic_write_dir) = &self.atomic_write_dir {
-            let target_path = self.ensure_write_abs_path(&self.root, 
path).await?;
-            let tmp_path = self
-                .ensure_write_abs_path(atomic_write_dir, 
&build_tmp_path_of(path))
-                .await?;
-
-            // If the target file exists, we should append to the end of it 
directly.
-            let should_append = op.append()
-                && tokio::fs::try_exists(&target_path)
-                    .await
-                    .map_err(new_std_io_error)?;
-            let tmp_path = (!should_append).then_some(tmp_path);
-
-            Ok((target_path, tmp_path))
-        } else {
-            let p = self.ensure_write_abs_path(&self.root, path).await?;
-            Ok((p, None))
-        }
-    }
-
-    pub async fn fs_write(
-        &self,
-        target_path: &PathBuf,
-        tmp_path: Option<&PathBuf>,
-        op: &OpWrite,
-    ) -> Result<tokio::fs::File> {
+    pub async fn fs_write(&self, path: &PathBuf, op: &OpWrite) -> 
Result<tokio::fs::File> {
         let mut open_options = tokio::fs::OpenOptions::new();
         if op.if_not_exists() {
             open_options.create_new(true);
@@ -158,14 +128,38 @@ impl FsCore {
             open_options.truncate(true);
         }
 
-        let f = open_options
-            .open(tmp_path.unwrap_or(target_path))
-            .await
-            .map_err(parse_error)?;
+        let f = open_options.open(path).await.map_err(parse_error)?;
 
         Ok(f)
     }
 
+    /// This function is used to build a tempfile for writing.
+    ///
+    /// We don't care about the OpWrite since every check should be performed 
on target path directly.
+    pub async fn fs_tempfile_write(
+        &self,
+        path: &str,
+    ) -> Result<(tokio::fs::File, Option<PathBuf>)> {
+        let Some(atomic_write_dir) = self.atomic_write_dir.as_ref() else {
+            return Err(Error::new(ErrorKind::Unexpected, "fs didn't configure 
atomic_write_dir, but we're still entering the tempfile logic. This might be a 
bug."));
+        };
+
+        let tmp_path = self
+            .ensure_write_abs_path(atomic_write_dir, &build_tmp_path_of(path))
+            .await?;
+
+        let mut open_options = tokio::fs::OpenOptions::new();
+
+        // tempfile should always be new file.
+        open_options.create_new(true);
+        open_options.write(true);
+        open_options.truncate(true);
+
+        let f = open_options.open(&tmp_path).await.map_err(parse_error)?;
+
+        Ok((f, Some(tmp_path)))
+    }
+
     pub async fn fs_list(&self, path: &str) -> 
Result<Option<tokio::fs::ReadDir>> {
         let p = self.root.join(path.trim_end_matches('/'));
 
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 2b02ced16..97d75f785 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -18,45 +18,64 @@
 use std::fs::File;
 use std::io::Write;
 use std::path::PathBuf;
+use std::sync::Arc;
 
 use bytes::Buf;
 use tokio::io::AsyncWriteExt;
 
 use crate::raw::*;
+use crate::services::fs::core::FsCore;
 use crate::*;
 
-pub type FsWriters =
-    TwoWays<FsWriter<tokio::fs::File>, 
oio::PositionWriter<FsWriter<tokio::fs::File>>>;
+pub type FsWriters = TwoWays<FsWriter, oio::PositionWriter<FsWriter>>;
 
-pub struct FsWriter<F> {
+pub struct FsWriter {
     target_path: PathBuf,
-    tmp_path: Option<PathBuf>,
-
-    f: Option<F>,
+    /// The temp_path is used to specify whether we should move to target_path 
after the file has been closed.
+    temp_path: Option<PathBuf>,
+    f: tokio::fs::File,
 }
 
-impl<F> FsWriter<F> {
-    pub fn new(target_path: PathBuf, tmp_path: Option<PathBuf>, f: F) -> Self {
-        Self {
-            target_path,
-            tmp_path,
-
-            f: Some(f),
+impl FsWriter {
+    pub async fn create(core: Arc<FsCore>, path: &str, op: OpWrite) -> 
Result<Self> {
+        let target_path = core.ensure_write_abs_path(&core.root, path).await?;
+
+        // Create a target file using our OpWrite to check for permissions and 
existence.
+        //
+        // If target check passed, we can go decide which path we should go 
for writing.
+        let target_file = core.fs_write(&target_path, &op).await?;
+
+        // file is created success with append.
+        let is_append = op.append();
+        // file is created success with if_not_exists.
+        let is_exist = !op.if_not_exists();
+
+        let (mut f, mut temp_path) = (target_file, None);
+        if core.atomic_write_dir.is_some() {
+            // The only case we allow write in place is the file
+            // exists and users request for append writing.
+            if !(is_append && is_exist) {
+                (f, temp_path) = core.fs_tempfile_write(path).await?;
+            }
         }
+
+        Ok(Self {
+            target_path,
+            temp_path,
+            f,
+        })
     }
 }
 
 /// # Safety
 ///
 /// We will only take `&mut Self` reference for FsWriter.
-unsafe impl<F> Sync for FsWriter<F> {}
+unsafe impl Sync for FsWriter {}
 
-impl oio::Write for FsWriter<tokio::fs::File> {
+impl oio::Write for FsWriter {
     async fn write(&mut self, mut bs: Buffer) -> Result<()> {
-        let f = self.f.as_mut().expect("FsWriter must be initialized");
-
         while bs.has_remaining() {
-            let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?;
+            let n = self.f.write(bs.chunk()).await.map_err(new_std_io_error)?;
             bs.advance(n);
         }
 
@@ -64,33 +83,25 @@ impl oio::Write for FsWriter<tokio::fs::File> {
     }
 
     async fn close(&mut self) -> Result<Metadata> {
-        let f = self.f.as_mut().expect("FsWriter must be initialized");
-        f.flush().await.map_err(new_std_io_error)?;
-        f.sync_all().await.map_err(new_std_io_error)?;
+        self.f.flush().await.map_err(new_std_io_error)?;
+        self.f.sync_all().await.map_err(new_std_io_error)?;
 
-        if let Some(tmp_path) = &self.tmp_path {
-            tokio::fs::rename(tmp_path, &self.target_path)
+        if let Some(temp_path) = &self.temp_path {
+            tokio::fs::rename(temp_path, &self.target_path)
                 .await
                 .map_err(new_std_io_error)?;
         }
 
-        let file_meta = f.metadata().await.map_err(new_std_io_error)?;
-        let mode = if file_meta.is_file() {
-            EntryMode::FILE
-        } else if file_meta.is_dir() {
-            EntryMode::DIR
-        } else {
-            EntryMode::Unknown
-        };
-        let meta = Metadata::new(mode)
+        let file_meta = self.f.metadata().await.map_err(new_std_io_error)?;
+        let meta = Metadata::new(EntryMode::FILE)
             .with_content_length(file_meta.len())
             
.with_last_modified(file_meta.modified().map_err(new_std_io_error)?.into());
         Ok(meta)
     }
 
     async fn abort(&mut self) -> Result<()> {
-        if let Some(tmp_path) = &self.tmp_path {
-            tokio::fs::remove_file(tmp_path)
+        if let Some(temp_path) = &self.temp_path {
+            tokio::fs::remove_file(temp_path)
                 .await
                 .map_err(new_std_io_error)
         } else {
@@ -102,11 +113,10 @@ impl oio::Write for FsWriter<tokio::fs::File> {
     }
 }
 
-impl oio::PositionWrite for FsWriter<tokio::fs::File> {
+impl oio::PositionWrite for FsWriter {
     async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> {
-        let f = self.f.as_ref().expect("FsWriter must be initialized");
-
-        let f = f
+        let f = self
+            .f
             .try_clone()
             .await
             .map_err(new_std_io_error)?
@@ -132,9 +142,8 @@ impl oio::PositionWrite for FsWriter<tokio::fs::File> {
     }
 
     async fn close(&self) -> Result<Metadata> {
-        let f = self.f.as_ref().expect("FsWriter must be initialized");
-
-        let mut f = f
+        let mut f = self
+            .f
             .try_clone()
             .await
             .map_err(new_std_io_error)?
@@ -144,8 +153,8 @@ impl oio::PositionWrite for FsWriter<tokio::fs::File> {
         f.flush().map_err(new_std_io_error)?;
         f.sync_all().map_err(new_std_io_error)?;
 
-        if let Some(tmp_path) = &self.tmp_path {
-            tokio::fs::rename(tmp_path, &self.target_path)
+        if let Some(temp_path) = &self.temp_path {
+            tokio::fs::rename(temp_path, &self.target_path)
                 .await
                 .map_err(new_std_io_error)?;
         }
@@ -165,8 +174,8 @@ impl oio::PositionWrite for FsWriter<tokio::fs::File> {
     }
 
     async fn abort(&self) -> Result<()> {
-        if let Some(tmp_path) = &self.tmp_path {
-            tokio::fs::remove_file(tmp_path)
+        if let Some(temp_path) = &self.temp_path {
+            tokio::fs::remove_file(temp_path)
                 .await
                 .map_err(new_std_io_error)
         } else {

Reply via email to