This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs-object-store.git


The following commit(s) were added to refs/heads/main by this push:
     new dd0afcb  feat(local): add fsync to LocalFileSystem for durability 
(#643)
dd0afcb is described below

commit dd0afcbc75d22a76f871101125bcc8782f6ef85f
Author: Pierre Barre <[email protected]>
AuthorDate: Wed Jun 17 13:48:52 2026 +0200

    feat(local): add fsync to LocalFileSystem for durability (#643)
    
    * feat(local): add opt-in fsync to LocalFileSystem for durability
    
    * Fix docs
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 src/local.rs | 339 ++++++++++++++++++++++++++++++++++++++++++++++++++---------
 1 file changed, 287 insertions(+), 52 deletions(-)

diff --git a/src/local.rs b/src/local.rs
index b3f3f7f..a627e2d 100644
--- a/src/local.rs
+++ b/src/local.rs
@@ -114,6 +114,9 @@ pub(crate) enum Error {
     #[error("Filenames containing trailing '/#\\d+/' are not supported: {}", 
path)]
     InvalidPath { path: String },
 
+    #[error("Unable to sync data to disk for {}: {}", path.display(), source)]
+    UnableToSyncFile { source: io::Error, path: PathBuf },
+
     #[error("Upload aborted")]
     Aborted,
 }
@@ -228,6 +231,8 @@ pub struct LocalFileSystem {
     config: Arc<Config>,
     // if you want to delete empty directories when deleting files
     automatic_cleanup: bool,
+    // if true, fsync written files and their parent directories after writes
+    fsync: bool,
 }
 
 #[derive(Debug)]
@@ -255,6 +260,7 @@ impl LocalFileSystem {
                 root: Url::parse("file:///").unwrap(),
             }),
             automatic_cleanup: false,
+            fsync: false,
         }
     }
 
@@ -273,6 +279,7 @@ impl LocalFileSystem {
                 root: absolute_path_to_url(path)?,
             }),
             automatic_cleanup: false,
+            fsync: false,
         })
     }
 
@@ -286,6 +293,25 @@ impl LocalFileSystem {
         self.automatic_cleanup = automatic_cleanup;
         self
     }
+
+    /// Enable `fsync` after writes for durability
+    ///
+    /// When enabled, [`LocalFileSystem`] calls [`File::sync_all`] on written 
files and fsyncs
+    /// the affected parent directories before a write operation
+    /// ([`put_opts`](ObjectStore::put_opts), 
[`copy_opts`](ObjectStore::copy_opts),
+    /// [`rename_opts`](ObjectStore::rename_opts), and multipart upload 
completion) returns
+    /// success. This guarantees that both the file contents and the directory 
entries pointing
+    /// to them are durable on stable storage, matching the implicit 
durability contract of
+    /// remote object stores such as S3 or GCS.
+    ///
+    /// This trades write throughput for durability and is **disabled by 
default**.
+    ///
+    /// Note that directory fsync is only performed on Unix; on other 
platforms (e.g. Windows)
+    /// it is a no-op, as directories cannot be portably opened and synced.
+    pub fn with_fsync(mut self, fsync: bool) -> Self {
+        self.fsync = fsync;
+        self
+    }
 }
 
 impl Config {
@@ -374,8 +400,9 @@ impl ObjectStore for LocalFileSystem {
         }
 
         let path = self.path_to_filesystem(location)?;
+        let fsync = self.fsync;
         maybe_spawn_blocking(move || {
-            let (mut file, staging_path) = new_staged_upload(&path)?;
+            let (mut file, staging_path) = new_staged_upload(&path, fsync)?;
             let mut e_tag = None;
 
             let err = match payload.iter().try_for_each(|x| file.write_all(x)) 
{
@@ -385,39 +412,26 @@ impl ObjectStore for LocalFileSystem {
                         path: path.to_string_lossy().to_string(),
                     })?;
                     e_tag = Some(get_etag(&metadata));
-                    // Explicitly close the file, checking for errors that 
would be silently ignored by drop.
-                    // On network filesystems (e.g. NFS), close can fail and 
indicate data loss.
-                    //
-                    // This also ensures the file is closed before rename, 
which is required by some FUSE
-                    // filesystems (e.g. Blobfuse) to trigger the upload 
operation.
-                    close_file(file).map_err(|source| 
Error::UnableToCopyDataToFile { source })?;
+                    // Atomically publish the staged file. When fsync is 
enabled the publish
+                    // helpers flush the file's contents and the destination's 
parent directory to
+                    // disk first, so a successful return is durable; the 
fsync calls are bundled
+                    // into the helpers so a file-system modification can 
never be left unsynced.
                     match opts.mode {
-                        PutMode::Overwrite => match 
std::fs::rename(&staging_path, &path) {
-                            Ok(_) => None,
-                            Err(source) => Some(Error::UnableToRenameFile { 
source }),
-                        },
-                        PutMode::Create => match 
std::fs::hard_link(&staging_path, &path) {
-                            Ok(_) => {
-                                let _ = std::fs::remove_file(&staging_path); 
// Attempt to cleanup
-                                None
-                            }
-                            Err(source) => match source.kind() {
-                                ErrorKind::AlreadyExists => 
Some(Error::AlreadyExists {
-                                    path: path.to_str().unwrap().to_string(),
-                                    source,
-                                }),
-                                _ => Some(Error::UnableToRenameFile { source 
}),
-                            },
-                        },
+                        PutMode::Overwrite => {
+                            finish_staged_rename(file, &staging_path, &path, 
fsync).err()
+                        }
+                        PutMode::Create => {
+                            finish_staged_hard_link(file, &staging_path, 
&path, fsync).err()
+                        }
                         PutMode::Update(_) => unreachable!(),
                     }
                 }
-                Err(source) => Some(Error::UnableToCopyDataToFile { source }),
+                Err(source) => Some(Error::UnableToCopyDataToFile { source 
}.into()),
             };
 
             if let Some(err) = err {
                 let _ = std::fs::remove_file(&staging_path); // Attempt to 
cleanup
-                return Err(err.into());
+                return Err(err);
             }
 
             Ok(PutResult {
@@ -442,8 +456,8 @@ impl ObjectStore for LocalFileSystem {
         }
 
         let dest = self.path_to_filesystem(location)?;
-        let (file, src) = new_staged_upload(&dest)?;
-        Ok(Box::new(LocalUpload::new(src, dest, file)))
+        let (file, src) = new_staged_upload(&dest, self.fsync)?;
+        Ok(Box::new(LocalUpload::new(src, dest, file, self.fsync)))
     }
 
     async fn get_opts(&self, location: &Path, options: GetOptions) -> 
Result<GetResult> {
@@ -579,6 +593,7 @@ impl ObjectStore for LocalFileSystem {
 
         let from = self.path_to_filesystem(from)?;
         let to = self.path_to_filesystem(to)?;
+        let fsync = self.fsync;
 
         match mode {
             CopyMode::Overwrite => {
@@ -592,17 +607,22 @@ impl ObjectStore for LocalFileSystem {
                 maybe_spawn_blocking(move || {
                     loop {
                         let staged = staged_upload_path(&to, &id.to_string());
+                        // Stage via a temporary hard link; the source is 
already durable so the
+                        // staging link itself needs no fsync (the publish 
rename below fsyncs the
+                        // shared parent directory).
                         match std::fs::hard_link(&from, &staged) {
-                            Ok(_) => {
-                                return std::fs::rename(&staged, 
&to).map_err(|source| {
+                            // `rename` bundles in the fsync of `to`'s parent 
directory.
+                            Ok(_) => match rename(&staged, &to, fsync) {
+                                Ok(_) => return Ok(()),
+                                Err(source) => {
                                     let _ = std::fs::remove_file(&staged); // 
Attempt to clean up
-                                    Error::UnableToCopyFile { from, to, source 
}.into()
-                                });
-                            }
+                                    return Err(Error::UnableToCopyFile { from, 
to, source }.into());
+                                }
+                            },
                             Err(source) => match source.kind() {
                                 ErrorKind::AlreadyExists => id += 1,
                                 ErrorKind::NotFound => match from.exists() {
-                                    true => create_parent_dirs(&to, source)?,
+                                    true => create_parent_dirs(&to, source, 
fsync)?,
                                     false => {
                                         return Err(Error::NotFound { path: 
from, source }.into());
                                     }
@@ -619,7 +639,9 @@ impl ObjectStore for LocalFileSystem {
             CopyMode::Create => {
                 maybe_spawn_blocking(move || {
                     loop {
-                        match std::fs::hard_link(&from, &to) {
+                        // The source is an existing object that is already 
durable, so no file
+                        // sync is needed; `hard_link` bundles in the fsync of 
`to`'s parent dir.
+                        match hard_link(&from, &to, fsync) {
                             Ok(_) => return Ok(()),
                             Err(source) => match source.kind() {
                                 ErrorKind::AlreadyExists => {
@@ -630,7 +652,7 @@ impl ObjectStore for LocalFileSystem {
                                     .into());
                                 }
                                 ErrorKind::NotFound => match from.exists() {
-                                    true => create_parent_dirs(&to, source)?,
+                                    true => create_parent_dirs(&to, source, 
fsync)?,
                                     false => {
                                         return Err(Error::NotFound { path: 
from, source }.into());
                                     }
@@ -658,13 +680,18 @@ impl ObjectStore for LocalFileSystem {
             RenameTargetMode::Overwrite => {
                 let from = self.path_to_filesystem(from)?;
                 let to = self.path_to_filesystem(to)?;
+                let fsync = self.fsync;
                 maybe_spawn_blocking(move || {
                     loop {
-                        match std::fs::rename(&from, &to) {
+                        // Unlike multipart `complete`, there is no freshly 
written file to
+                        // `sync_all` here: `from` is an existing, 
already-durable object and a
+                        // rename only mutates directory entries. `rename` 
bundles in the fsync of
+                        // both affected directories (destination, and source 
if it differs).
+                        match rename(&from, &to, fsync) {
                             Ok(_) => return Ok(()),
                             Err(source) => match source.kind() {
                                 ErrorKind::NotFound => match from.exists() {
-                                    true => create_parent_dirs(&to, source)?,
+                                    true => create_parent_dirs(&to, source, 
fsync)?,
                                     false => {
                                         return Err(Error::NotFound { path: 
from, source }.into());
                                     }
@@ -816,23 +843,168 @@ impl LocalFileSystem {
 }
 
 /// Creates the parent directories of `path` or returns an error based on 
`source` if no parent
-fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> 
{
+///
+/// When `fsync` is true, every directory created here is fsynced, up to and 
including the first
+/// pre-existing ancestor (whose entry list also changed), so the new 
directory entries are durable.
+fn create_parent_dirs(path: &std::path::Path, source: io::Error, fsync: bool) 
-> Result<()> {
     let parent = path.parent().ok_or_else(|| {
         let path = path.to_path_buf();
         Error::UnableToCreateFile { path, source }
     })?;
 
+    // Record the deepest already-existing ancestor *before* creating any 
directories, so that
+    // afterwards we know exactly which directories are new and need to be 
fsynced.
+    let first_existing = fsync.then(|| {
+        let mut dir = parent;
+        while !dir.exists() {
+            match dir.parent() {
+                Some(p) => dir = p,
+                None => break,
+            }
+        }
+        dir.to_path_buf()
+    });
+
     std::fs::create_dir_all(parent).map_err(|source| {
         let path = parent.into();
         Error::UnableToCreateDir { source, path }
     })?;
+
+    if let Some(first_existing) = first_existing {
+        // Walk from `parent` up to `first_existing`, fsyncing each directory 
whose entries changed.
+        let mut dir = parent;
+        loop {
+            fsync_dir(dir).map_err(|source| Error::UnableToSyncFile {
+                source,
+                path: dir.into(),
+            })?;
+            if dir == first_existing {
+                break;
+            }
+            dir = match dir.parent() {
+                Some(p) => p,
+                None => break,
+            };
+        }
+    }
+    Ok(())
+}
+
+/// Renames `from` to `to`, then — when `fsync` is enabled — fsyncs the 
destination's parent
+/// directory (and the source's too, if it differs) so the moved directory 
entries are durable.
+///
+/// The directory fsync is bundled in deliberately: every durable rename goes 
through here, so the
+/// post-rename fsync can never be forgotten at an individual call site.
+fn rename(from: &std::path::Path, to: &std::path::Path, fsync: bool) -> 
io::Result<()> {
+    std::fs::rename(from, to)?;
+    if fsync {
+        fsync_parent_dir(to)?;
+        // A cross-directory move also removes an entry from the source 
directory.
+        if from.parent() != to.parent() {
+            fsync_parent_dir(from)?;
+        }
+    }
+    Ok(())
+}
+
+/// Hard-links `original` to `link`, then — when `fsync` is enabled — fsyncs 
`link`'s parent
+/// directory so the new directory entry is durable.
+///
+/// As with [`rename`], the directory fsync is bundled in so it cannot be 
forgotten at a call site.
+fn hard_link(original: &std::path::Path, link: &std::path::Path, fsync: bool) 
-> io::Result<()> {
+    std::fs::hard_link(original, link)?;
+    if fsync {
+        fsync_parent_dir(link)?;
+    }
+    Ok(())
+}
+
+/// Durably publishes the freshly-written staging file `file` (located at 
`src`) to `dest` via a
+/// rename.
+///
+/// When `fsync` is enabled, the file's contents are flushed before — and 
`dest`'s parent
+/// directory after — the rename, so a successful return is durable. The file 
is always closed
+/// before the rename (checking for close errors): required for NFS error 
detection and for some
+/// FUSE filesystems (e.g. Blobfuse) that only commit the data on close.
+fn finish_staged_rename(
+    file: File,
+    src: &std::path::Path,
+    dest: &std::path::Path,
+    fsync: bool,
+) -> Result<()> {
+    sync_and_close(file, src, fsync)?;
+    rename(src, dest, fsync).map_err(|source| Error::UnableToRenameFile { 
source })?;
+    Ok(())
+}
+
+/// Like [`finish_staged_rename`] but publishes via a hard link 
(`PutMode::Create` semantics): the
+/// staging file is linked to `dest` and then removed. Returns 
[`Error::AlreadyExists`] if `dest`
+/// already exists.
+fn finish_staged_hard_link(
+    file: File,
+    src: &std::path::Path,
+    dest: &std::path::Path,
+    fsync: bool,
+) -> Result<()> {
+    sync_and_close(file, src, fsync)?;
+    match hard_link(src, dest, fsync) {
+        Ok(()) => {
+            let _ = std::fs::remove_file(src); // Attempt to cleanup
+            Ok(())
+        }
+        Err(source) => match source.kind() {
+            ErrorKind::AlreadyExists => Err(Error::AlreadyExists {
+                path: dest.to_str().unwrap().to_string(),
+                source,
+            }
+            .into()),
+            _ => Err(Error::UnableToRenameFile { source }.into()),
+        },
+    }
+}
+
+/// Flushes the freshly-written `file`'s contents to disk (when `fsync` is 
enabled) and then
+/// closes it, checking for close errors that dropping the [`File`] would 
silently ignore.
+fn sync_and_close(file: File, path: &std::path::Path, fsync: bool) -> 
Result<()> {
+    if fsync {
+        file.sync_all().map_err(|source| Error::UnableToSyncFile {
+            source,
+            path: path.into(),
+        })?;
+    }
+    close_file(file).map_err(|source| Error::UnableToCopyDataToFile { source 
})?;
     Ok(())
 }
 
+/// Fsyncs the parent directory of `path` so a change to its directory entries 
(e.g. one just made
+/// by a rename or hard link) is durable. A no-op when `path` has no parent.
+fn fsync_parent_dir(path: &std::path::Path) -> io::Result<()> {
+    match path.parent() {
+        Some(parent) => fsync_dir(parent),
+        None => Ok(()),
+    }
+}
+
+/// Fsyncs `dir_path` so that changes to its directory entries are durable.
+///
+/// This is only meaningful on Unix; on other platforms (e.g. Windows) 
directories cannot be
+/// portably opened as a [`File`] and synced, so this is a no-op.
+fn fsync_dir(dir_path: &std::path::Path) -> io::Result<()> {
+    #[cfg(target_family = "unix")]
+    {
+        File::open(dir_path)?.sync_all()
+    }
+    #[cfg(not(target_family = "unix"))]
+    {
+        let _ = dir_path;
+        Ok(())
+    }
+}
+
 /// Generates a unique file path `{base}#{suffix}`, returning the opened 
`File` and `path`
 ///
-/// Creates any directories if necessary
-fn new_staged_upload(base: &std::path::Path) -> Result<(File, PathBuf)> {
+/// Creates any directories if necessary, fsyncing them when `fsync` is enabled
+fn new_staged_upload(base: &std::path::Path, fsync: bool) -> Result<(File, 
PathBuf)> {
     let mut multipart_id = 1;
     loop {
         let suffix = multipart_id.to_string();
@@ -842,7 +1014,7 @@ fn new_staged_upload(base: &std::path::Path) -> 
Result<(File, PathBuf)> {
             Ok(f) => return Ok((f, path)),
             Err(source) => match source.kind() {
                 ErrorKind::AlreadyExists => multipart_id += 1,
-                ErrorKind::NotFound => create_parent_dirs(&path, source)?,
+                ErrorKind::NotFound => create_parent_dirs(&path, source, 
fsync)?,
                 _ => return Err(Error::UnableToOpenFile { source, path 
}.into()),
             },
         }
@@ -865,6 +1037,8 @@ struct LocalUpload {
     src: Option<PathBuf>,
     /// The next offset to write into the file
     offset: u64,
+    /// Whether to fsync the file and its parent directory on completion
+    fsync: bool,
 }
 
 #[derive(Debug)]
@@ -874,7 +1048,7 @@ struct UploadState {
 }
 
 impl LocalUpload {
-    pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File) -> Self {
+    pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File, fsync: bool) -> 
Self {
         Self {
             state: Arc::new(UploadState {
                 dest,
@@ -882,6 +1056,7 @@ impl LocalUpload {
             }),
             src: Some(src),
             offset: 0,
+            fsync,
         }
     }
 }
@@ -913,6 +1088,7 @@ impl MultipartUpload for LocalUpload {
     async fn complete(&mut self) -> Result<PutResult> {
         let src = self.src.take().ok_or(Error::Aborted)?;
         let s = Arc::clone(&self.state);
+        let fsync = self.fsync;
         maybe_spawn_blocking(move || {
             // Ensure no inflight writes
             let mut guard = s.file.lock();
@@ -923,15 +1099,10 @@ impl MultipartUpload for LocalUpload {
                 path: src.to_string_lossy().to_string(),
             })?;
 
-            // Explicitly close the file, checking for errors that would be 
silently ignored by drop.
-            // On network filesystems (e.g. NFS), close can fail and indicate 
data loss.
-            //
-            // This also ensures the file is closed before rename, which is 
required by some FUSE
-            // filesystems (e.g. Blobfuse) to trigger the upload operation.
-            close_file(file).map_err(|source| Error::UnableToCopyDataToFile { 
source })?;
-
-            std::fs::rename(&src, &s.dest)
-                .map_err(|source| Error::UnableToRenameFile { source })?;
+            // Durably publish the freshly-written staging file: flush its 
contents, close it, then
+            // rename it into place and fsync the destination's parent 
directory (the fsync calls
+            // are bundled into the helper and only run when fsync is enabled).
+            finish_staged_rename(file, &src, &s.dest, fsync)?;
 
             Ok(PutResult {
                 e_tag: Some(get_etag(&metadata)),
@@ -1324,6 +1495,70 @@ mod tests {
         put_opts(&integration, false).await;
     }
 
+    #[tokio::test]
+    #[cfg(target_family = "unix")]
+    async fn file_test_fsync() {
+        // Run the full integration suite with fsync enabled to ensure the 
durability code
+        // paths (file sync + directory fsync on put/copy/rename/multipart, 
including recursive
+        // directory creation) behave identically to the default.
+        let root = TempDir::new().unwrap();
+        let integration = LocalFileSystem::new_with_prefix(root.path())
+            .unwrap()
+            .with_fsync(true);
+
+        put_get_delete_list(&integration).await;
+        list_with_offset_exclusivity(&integration).await;
+        get_opts(&integration).await;
+        list_uses_directories_correctly(&integration).await;
+        list_with_delimiter(&integration).await;
+        rename_and_copy(&integration).await;
+        copy_if_not_exists(&integration).await;
+        copy_rename_nonexistent_object(&integration).await;
+        stream_get(&integration).await;
+        put_opts(&integration, false).await;
+    }
+
+    #[tokio::test]
+    async fn fsync_creates_nested_dirs() {
+        // Exercises the recursive directory fsync in `create_parent_dirs`: 
every directory
+        // component is newly created, so each must be synced up to the 
pre-existing root.
+        let root = TempDir::new().unwrap();
+        let integration = LocalFileSystem::new_with_prefix(root.path())
+            .unwrap()
+            .with_fsync(true);
+
+        let data = Bytes::from("arbitrary data");
+
+        // `put` (overwrite) into a deeply nested, non-existent directory tree
+        let location = Path::from("a/b/c/d/put_file");
+        integration
+            .put(&location, data.clone().into())
+            .await
+            .unwrap();
+        let read = integration
+            .get(&location)
+            .await
+            .unwrap()
+            .bytes()
+            .await
+            .unwrap();
+        assert_eq!(read, data);
+
+        // multipart upload into another nested tree
+        let location = Path::from("e/f/g/multipart_file");
+        let mut upload = integration.put_multipart(&location).await.unwrap();
+        upload.put_part(data.clone().into()).await.unwrap();
+        upload.complete().await.unwrap();
+        let read = integration
+            .get(&location)
+            .await
+            .unwrap()
+            .bytes()
+            .await
+            .unwrap();
+        assert_eq!(read, data);
+    }
+
     #[test]
     #[cfg(target_family = "unix")]
     fn test_non_tokio() {

Reply via email to