This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new c5258fdfa fix(service/fs): handle if_not_exists flag to raise
ConditionNotMatch error (#6326)
c5258fdfa is described below
commit c5258fdfa16d9b0dcafc3c1b2be414e0c3930850
Author: Kingsword <[email protected]>
AuthorDate: Wed Jun 25 13:45:11 2025 +0800
fix(service/fs): handle if_not_exists flag to raise ConditionNotMatch error
(#6326)
* fix(service/fs): handle if_not_exists flag to raise ConditionNotMatch
error
* refactor: simplify atomic write path handling logic
* chore: format code
* fix: if_not_exists
* refactor fs write logic
Signed-off-by: Xuanwo <[email protected]>
---------
Signed-off-by: Xuanwo <[email protected]>
Co-authored-by: Xuanwo <[email protected]>
---
core/src/services/fs/backend.rs | 13 +++---
core/src/services/fs/core.rs | 64 ++++++++++++--------------
core/src/services/fs/writer.rs | 99 ++++++++++++++++++++++-------------------
3 files changed, 88 insertions(+), 88 deletions(-)
diff --git a/core/src/services/fs/backend.rs b/core/src/services/fs/backend.rs
index 9f6762d6e..d7d50107f 100644
--- a/core/src/services/fs/backend.rs
+++ b/core/src/services/fs/backend.rs
@@ -227,21 +227,18 @@ impl Access for FsBackend {
}
async fn write(&self, path: &str, op: OpWrite) -> Result<(RpWrite,
Self::Writer)> {
- let (target_path, tmp_path) = self.core.prepare_write(path,
&op).await?;
- let file = self
- .core
- .fs_write(&target_path, tmp_path.as_ref(), &op)
- .await?;
+ let is_append = op.append();
+ let concurrent = op.concurrent();
- let writer = FsWriter::new(target_path, tmp_path, file);
+ let writer = FsWriter::create(self.core.clone(), path, op).await?;
- let writer = if op.append() {
+ let writer = if is_append {
FsWriters::One(writer)
} else {
FsWriters::Two(oio::PositionWriter::new(
self.info().clone(),
writer,
- op.concurrent(),
+ concurrent,
))
};
diff --git a/core/src/services/fs/core.rs b/core/src/services/fs/core.rs
index 9777f832e..fa5a9d102 100644
--- a/core/src/services/fs/core.rs
+++ b/core/src/services/fs/core.rs
@@ -112,37 +112,7 @@ impl FsCore {
Ok(f)
}
- pub async fn prepare_write(
- &self,
- path: &str,
- op: &OpWrite,
- ) -> Result<(PathBuf, Option<PathBuf>)> {
- if let Some(atomic_write_dir) = &self.atomic_write_dir {
- let target_path = self.ensure_write_abs_path(&self.root,
path).await?;
- let tmp_path = self
- .ensure_write_abs_path(atomic_write_dir,
&build_tmp_path_of(path))
- .await?;
-
- // If the target file exists, we should append to the end of it
directly.
- let should_append = op.append()
- && tokio::fs::try_exists(&target_path)
- .await
- .map_err(new_std_io_error)?;
- let tmp_path = (!should_append).then_some(tmp_path);
-
- Ok((target_path, tmp_path))
- } else {
- let p = self.ensure_write_abs_path(&self.root, path).await?;
- Ok((p, None))
- }
- }
-
- pub async fn fs_write(
- &self,
- target_path: &PathBuf,
- tmp_path: Option<&PathBuf>,
- op: &OpWrite,
- ) -> Result<tokio::fs::File> {
+ pub async fn fs_write(&self, path: &PathBuf, op: &OpWrite) ->
Result<tokio::fs::File> {
let mut open_options = tokio::fs::OpenOptions::new();
if op.if_not_exists() {
open_options.create_new(true);
@@ -158,14 +128,38 @@ impl FsCore {
open_options.truncate(true);
}
- let f = open_options
- .open(tmp_path.unwrap_or(target_path))
- .await
- .map_err(parse_error)?;
+ let f = open_options.open(path).await.map_err(parse_error)?;
Ok(f)
}
+ /// This function is used to build a tempfile for writing.
+ ///
+ /// We don't care about the OpWrite since every check should be performed
on target path directly.
+ pub async fn fs_tempfile_write(
+ &self,
+ path: &str,
+ ) -> Result<(tokio::fs::File, Option<PathBuf>)> {
+ let Some(atomic_write_dir) = self.atomic_write_dir.as_ref() else {
+ return Err(Error::new(ErrorKind::Unexpected, "fs didn't configure
atomic_write_dir, but we're still entering the tempfile logic. This might be a
bug."));
+ };
+
+ let tmp_path = self
+ .ensure_write_abs_path(atomic_write_dir, &build_tmp_path_of(path))
+ .await?;
+
+ let mut open_options = tokio::fs::OpenOptions::new();
+
+ // tempfile should always be new file.
+ open_options.create_new(true);
+ open_options.write(true);
+ open_options.truncate(true);
+
+ let f = open_options.open(&tmp_path).await.map_err(parse_error)?;
+
+ Ok((f, Some(tmp_path)))
+ }
+
pub async fn fs_list(&self, path: &str) ->
Result<Option<tokio::fs::ReadDir>> {
let p = self.root.join(path.trim_end_matches('/'));
diff --git a/core/src/services/fs/writer.rs b/core/src/services/fs/writer.rs
index 2b02ced16..97d75f785 100644
--- a/core/src/services/fs/writer.rs
+++ b/core/src/services/fs/writer.rs
@@ -18,45 +18,64 @@
use std::fs::File;
use std::io::Write;
use std::path::PathBuf;
+use std::sync::Arc;
use bytes::Buf;
use tokio::io::AsyncWriteExt;
use crate::raw::*;
+use crate::services::fs::core::FsCore;
use crate::*;
-pub type FsWriters =
- TwoWays<FsWriter<tokio::fs::File>,
oio::PositionWriter<FsWriter<tokio::fs::File>>>;
+pub type FsWriters = TwoWays<FsWriter, oio::PositionWriter<FsWriter>>;
-pub struct FsWriter<F> {
+pub struct FsWriter {
target_path: PathBuf,
- tmp_path: Option<PathBuf>,
-
- f: Option<F>,
+ /// The temp_path is used to specify whether we should move to target_path
after the file has been closed.
+ temp_path: Option<PathBuf>,
+ f: tokio::fs::File,
}
-impl<F> FsWriter<F> {
- pub fn new(target_path: PathBuf, tmp_path: Option<PathBuf>, f: F) -> Self {
- Self {
- target_path,
- tmp_path,
-
- f: Some(f),
+impl FsWriter {
+ pub async fn create(core: Arc<FsCore>, path: &str, op: OpWrite) ->
Result<Self> {
+ let target_path = core.ensure_write_abs_path(&core.root, path).await?;
+
+ // Create a target file using our OpWrite to check for permissions and
existence.
+ //
+ // If target check passed, we can go decide which path we should go
for writing.
+ let target_file = core.fs_write(&target_path, &op).await?;
+
+ // file is created success with append.
+ let is_append = op.append();
+ // file is created success with if_not_exists.
+ let is_exist = !op.if_not_exists();
+
+ let (mut f, mut temp_path) = (target_file, None);
+ if core.atomic_write_dir.is_some() {
+ // The only case we allow write in place is the file
+ // exists and users request for append writing.
+ if !(is_append && is_exist) {
+ (f, temp_path) = core.fs_tempfile_write(path).await?;
+ }
}
+
+ Ok(Self {
+ target_path,
+ temp_path,
+ f,
+ })
}
}
/// # Safety
///
/// We will only take `&mut Self` reference for FsWriter.
-unsafe impl<F> Sync for FsWriter<F> {}
+unsafe impl Sync for FsWriter {}
-impl oio::Write for FsWriter<tokio::fs::File> {
+impl oio::Write for FsWriter {
async fn write(&mut self, mut bs: Buffer) -> Result<()> {
- let f = self.f.as_mut().expect("FsWriter must be initialized");
-
while bs.has_remaining() {
- let n = f.write(bs.chunk()).await.map_err(new_std_io_error)?;
+ let n = self.f.write(bs.chunk()).await.map_err(new_std_io_error)?;
bs.advance(n);
}
@@ -64,33 +83,25 @@ impl oio::Write for FsWriter<tokio::fs::File> {
}
async fn close(&mut self) -> Result<Metadata> {
- let f = self.f.as_mut().expect("FsWriter must be initialized");
- f.flush().await.map_err(new_std_io_error)?;
- f.sync_all().await.map_err(new_std_io_error)?;
+ self.f.flush().await.map_err(new_std_io_error)?;
+ self.f.sync_all().await.map_err(new_std_io_error)?;
- if let Some(tmp_path) = &self.tmp_path {
- tokio::fs::rename(tmp_path, &self.target_path)
+ if let Some(temp_path) = &self.temp_path {
+ tokio::fs::rename(temp_path, &self.target_path)
.await
.map_err(new_std_io_error)?;
}
- let file_meta = f.metadata().await.map_err(new_std_io_error)?;
- let mode = if file_meta.is_file() {
- EntryMode::FILE
- } else if file_meta.is_dir() {
- EntryMode::DIR
- } else {
- EntryMode::Unknown
- };
- let meta = Metadata::new(mode)
+ let file_meta = self.f.metadata().await.map_err(new_std_io_error)?;
+ let meta = Metadata::new(EntryMode::FILE)
.with_content_length(file_meta.len())
.with_last_modified(file_meta.modified().map_err(new_std_io_error)?.into());
Ok(meta)
}
async fn abort(&mut self) -> Result<()> {
- if let Some(tmp_path) = &self.tmp_path {
- tokio::fs::remove_file(tmp_path)
+ if let Some(temp_path) = &self.temp_path {
+ tokio::fs::remove_file(temp_path)
.await
.map_err(new_std_io_error)
} else {
@@ -102,11 +113,10 @@ impl oio::Write for FsWriter<tokio::fs::File> {
}
}
-impl oio::PositionWrite for FsWriter<tokio::fs::File> {
+impl oio::PositionWrite for FsWriter {
async fn write_all_at(&self, offset: u64, buf: Buffer) -> Result<()> {
- let f = self.f.as_ref().expect("FsWriter must be initialized");
-
- let f = f
+ let f = self
+ .f
.try_clone()
.await
.map_err(new_std_io_error)?
@@ -132,9 +142,8 @@ impl oio::PositionWrite for FsWriter<tokio::fs::File> {
}
async fn close(&self) -> Result<Metadata> {
- let f = self.f.as_ref().expect("FsWriter must be initialized");
-
- let mut f = f
+ let mut f = self
+ .f
.try_clone()
.await
.map_err(new_std_io_error)?
@@ -144,8 +153,8 @@ impl oio::PositionWrite for FsWriter<tokio::fs::File> {
f.flush().map_err(new_std_io_error)?;
f.sync_all().map_err(new_std_io_error)?;
- if let Some(tmp_path) = &self.tmp_path {
- tokio::fs::rename(tmp_path, &self.target_path)
+ if let Some(temp_path) = &self.temp_path {
+ tokio::fs::rename(temp_path, &self.target_path)
.await
.map_err(new_std_io_error)?;
}
@@ -165,8 +174,8 @@ impl oio::PositionWrite for FsWriter<tokio::fs::File> {
}
async fn abort(&self) -> Result<()> {
- if let Some(tmp_path) = &self.tmp_path {
- tokio::fs::remove_file(tmp_path)
+ if let Some(temp_path) = &self.temp_path {
+ tokio::fs::remove_file(temp_path)
.await
.map_err(new_std_io_error)
} else {